Merge branch 'master' of github.com:grpc/grpc into compression-accept-encoding

pull/2533/head
David Garcia Quintas 9 years ago
commit c43648f250
  1. 33
      BUILD
  2. 24
      INSTALL
  3. 315
      Makefile
  4. 101
      build.json
  5. 6
      doc/connection-backoff-interop-test-description.md
  6. 9
      doc/connection-backoff.md
  7. 2
      doc/connectivity-semantics-and-api.md
  8. 70
      doc/health-checking.md
  9. 5
      gRPC.podspec
  10. 4
      include/grpc++/async_unary_call.h
  11. 34
      include/grpc++/auth_context.h
  12. 4
      include/grpc++/byte_buffer.h
  13. 8
      include/grpc++/client_context.h
  14. 2
      include/grpc++/completion_queue.h
  15. 9
      include/grpc++/dynamic_thread_pool.h
  16. 4
      include/grpc++/generic_stub.h
  17. 4
      include/grpc++/impl/README.md
  18. 32
      include/grpc++/impl/call.h
  19. 1
      include/grpc++/impl/grpc_library.h
  20. 15
      include/grpc++/impl/rpc_service_method.h
  21. 6
      include/grpc++/impl/serialization_traits.h
  22. 8
      include/grpc++/impl/sync_no_cxx11.h
  23. 21
      include/grpc++/impl/thd_no_cxx11.h
  24. 18
      include/grpc++/server.h
  25. 12
      include/grpc++/server_builder.h
  26. 2
      include/grpc++/server_context.h
  27. 9
      include/grpc++/stream.h
  28. 4
      include/grpc/byte_buffer.h
  29. 55
      include/grpc/census.h
  30. 4
      include/grpc/compression.h
  31. 100
      include/grpc/grpc.h
  32. 41
      include/grpc/grpc_zookeeper.h
  33. 2
      include/grpc/status.h
  34. 2
      include/grpc/support/alloc.h
  35. 2
      include/grpc/support/atm.h
  36. 2
      include/grpc/support/atm_gcc_atomic.h
  37. 2
      include/grpc/support/atm_gcc_sync.h
  38. 37
      include/grpc/support/atm_win32.h
  39. 2
      include/grpc/support/cmdline.h
  40. 2
      include/grpc/support/cpu.h
  41. 2
      include/grpc/support/histogram.h
  42. 2
      include/grpc/support/host_port.h
  43. 2
      include/grpc/support/log.h
  44. 2
      include/grpc/support/log_win32.h
  45. 5
      include/grpc/support/port_platform.h
  46. 2
      include/grpc/support/slice.h
  47. 2
      include/grpc/support/string_util.h
  48. 2
      include/grpc/support/subprocess.h
  49. 35
      include/grpc/support/sync.h
  50. 14
      include/grpc/support/sync_generic.h
  51. 2
      include/grpc/support/sync_posix.h
  52. 2
      include/grpc/support/sync_win32.h
  53. 2
      include/grpc/support/thd.h
  54. 3
      include/grpc/support/time.h
  55. 4
      include/grpc/support/tls.h
  56. 10
      include/grpc/support/tls_gcc.h
  57. 10
      include/grpc/support/tls_msvc.h
  58. 10
      include/grpc/support/useful.h
  59. 2
      src/compiler/csharp_generator_helpers.h
  60. 13
      src/compiler/generator_helpers.h
  61. 37
      src/compiler/objective_c_generator.cc
  62. 28
      src/compiler/objective_c_plugin.cc
  63. 2
      src/core/channel/census_filter.h
  64. 39
      src/core/channel/client_channel.c
  65. 7
      src/core/channel/client_channel.h
  66. 43
      src/core/channel/compress_filter.c
  67. 2
      src/core/channel/compress_filter.h
  68. 2
      src/core/channel/http_client_filter.h
  69. 2
      src/core/channel/http_server_filter.h
  70. 2
      src/core/channel/noop_filter.h
  71. 3
      src/core/client_config/resolvers/dns_resolver.c
  72. 501
      src/core/client_config/resolvers/zookeeper_resolver.c
  73. 42
      src/core/client_config/resolvers/zookeeper_resolver.h
  74. 2
      src/core/client_config/subchannel.h
  75. 10
      src/core/client_config/subchannel_factory_decorators/add_channel_arg.c
  76. 5
      src/core/client_config/subchannel_factory_decorators/add_channel_arg.h
  77. 4
      src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
  78. 5
      src/core/client_config/subchannel_factory_decorators/merge_channel_args.h
  79. 6
      src/core/compression/algorithm.c
  80. 8
      src/core/debug/trace.c
  81. 2
      src/core/debug/trace.h
  82. 6
      src/core/httpcli/format_request.c
  83. 2
      src/core/httpcli/format_request.h
  84. 2
      src/core/httpcli/parser.h
  85. 5
      src/core/iomgr/alarm.c
  86. 2
      src/core/iomgr/alarm.h
  87. 10
      src/core/iomgr/alarm_heap.c
  88. 2
      src/core/iomgr/alarm_heap.h
  89. 2
      src/core/iomgr/alarm_internal.h
  90. 3
      src/core/iomgr/endpoint.c
  91. 5
      src/core/iomgr/endpoint.h
  92. 2
      src/core/iomgr/endpoint_pair.h
  93. 20
      src/core/iomgr/endpoint_pair_windows.c
  94. 35
      src/core/iomgr/iocp_windows.c
  95. 10
      src/core/iomgr/iocp_windows.h
  96. 2
      src/core/iomgr/iomgr.h
  97. 2
      src/core/iomgr/iomgr_internal.h
  98. 2
      src/core/iomgr/iomgr_posix.c
  99. 2
      src/core/iomgr/iomgr_posix.h
  100. 2
      src/core/iomgr/iomgr_windows.c
  101. Some files were not shown because too many files have changed in this diff Show More

33
BUILD

@ -52,7 +52,6 @@ cc_library(
"src/core/support/string_win32.h",
"src/core/support/thd_internal.h",
"src/core/support/alloc.c",
"src/core/support/cancellable.c",
"src/core/support/cmdline.c",
"src/core/support/cpu_iphone.c",
"src/core/support/cpu_linux.c",
@ -96,7 +95,6 @@ cc_library(
"include/grpc/support/atm_gcc_atomic.h",
"include/grpc/support/atm_gcc_sync.h",
"include/grpc/support/atm_win32.h",
"include/grpc/support/cancellable_platform.h",
"include/grpc/support/cmdline.h",
"include/grpc/support/cpu.h",
"include/grpc/support/histogram.h",
@ -202,6 +200,7 @@ cc_library(
"src/core/iomgr/tcp_server.h",
"src/core/iomgr/tcp_windows.h",
"src/core/iomgr/time_averaged_stats.h",
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
"src/core/json/json.h",
@ -326,6 +325,7 @@ cc_library(
"src/core/iomgr/tcp_server_windows.c",
"src/core/iomgr/tcp_windows.c",
"src/core/iomgr/time_averaged_stats.c",
"src/core/iomgr/udp_server.c",
"src/core/iomgr/wakeup_fd_eventfd.c",
"src/core/iomgr/wakeup_fd_nospecial.c",
"src/core/iomgr/wakeup_fd_pipe.c",
@ -400,6 +400,7 @@ cc_library(
],
deps = [
"//external:libssl",
"//external:zlib",
":gpr",
],
)
@ -465,6 +466,7 @@ cc_library(
"src/core/iomgr/tcp_server.h",
"src/core/iomgr/tcp_windows.h",
"src/core/iomgr/time_averaged_stats.h",
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
"src/core/json/json.h",
@ -569,6 +571,7 @@ cc_library(
"src/core/iomgr/tcp_server_windows.c",
"src/core/iomgr/tcp_windows.c",
"src/core/iomgr/time_averaged_stats.c",
"src/core/iomgr/udp_server.c",
"src/core/iomgr/wakeup_fd_eventfd.c",
"src/core/iomgr/wakeup_fd_nospecial.c",
"src/core/iomgr/wakeup_fd_pipe.c",
@ -646,6 +649,26 @@ cc_library(
)
cc_library(
name = "grpc_zookeeper",
srcs = [
"src/core/client_config/resolvers/zookeeper_resolver.h",
"src/core/client_config/resolvers/zookeeper_resolver.c",
],
hdrs = [
"include/grpc/grpc_zookeeper.h",
],
includes = [
"include",
".",
],
deps = [
":gpr",
":grpc",
],
)
cc_library(
name = "grpc++",
srcs = [
@ -690,7 +713,6 @@ cc_library(
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
"include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
@ -778,7 +800,6 @@ cc_library(
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
"include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
@ -889,7 +910,6 @@ objc_library(
name = "gpr_objc",
srcs = [
"src/core/support/alloc.c",
"src/core/support/cancellable.c",
"src/core/support/cmdline.c",
"src/core/support/cpu_iphone.c",
"src/core/support/cpu_linux.c",
@ -933,7 +953,6 @@ objc_library(
"include/grpc/support/atm_gcc_atomic.h",
"include/grpc/support/atm_gcc_sync.h",
"include/grpc/support/atm_win32.h",
"include/grpc/support/cancellable_platform.h",
"include/grpc/support/cmdline.h",
"include/grpc/support/cpu.h",
"include/grpc/support/histogram.h",
@ -1056,6 +1075,7 @@ objc_library(
"src/core/iomgr/tcp_server_windows.c",
"src/core/iomgr/tcp_windows.c",
"src/core/iomgr/time_averaged_stats.c",
"src/core/iomgr/udp_server.c",
"src/core/iomgr/wakeup_fd_eventfd.c",
"src/core/iomgr/wakeup_fd_nospecial.c",
"src/core/iomgr/wakeup_fd_pipe.c",
@ -1193,6 +1213,7 @@ objc_library(
"src/core/iomgr/tcp_server.h",
"src/core/iomgr/tcp_windows.h",
"src/core/iomgr/time_averaged_stats.h",
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
"src/core/json/json.h",

@ -9,25 +9,40 @@ wiki pages:
* If you are in a hurry *
*************************
On Linux (Debian):
Note: you will need to add the Debian 'unstable' distribution to your sources
file first.
Add the following line to your `/etc/apt/sources.list` file:
deb http://ftp.us.debian.org/debian unstable main contrib non-free
Install the gRPC library:
$ [sudo] apt-get install libgrpc-dev
OR
$ git clone https://github.com/grpc/grpc.git
$ cd grpc
$ git submodule update --init
$ make
$ sudo make install
$ [sudo] make install
You don't need anything else than GNU Make, gcc and autotools. Under a Debian
or Ubuntu system, this should boil down to the following packages:
$ apt-get install build-essential autoconf libtool
$ [sudo] apt-get install build-essential autoconf libtool
Building the python wrapper requires the following:
# apt-get install python-all-dev python-virtualenv
$ [sudo] apt-get install python-all-dev python-virtualenv
If you want to install in a different directory than the default /usr/lib, you can
override it on the command line:
# make install prefix=/opt
$ [sudo] make install prefix=/opt
*******************************
@ -132,6 +147,7 @@ We will also need to make openssl and install it appropriately
$ cd <git directory>
$ cd third_party/openssl
$ ./config
$ sudo make install
$ cd ../../

File diff suppressed because one or more lines are too long

@ -7,7 +7,7 @@
"version": {
"major": 0,
"minor": 10,
"micro": 0,
"micro": 1,
"build": 0
}
},
@ -33,7 +33,6 @@
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
"include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
@ -172,6 +171,7 @@
"src/core/iomgr/tcp_server.h",
"src/core/iomgr/tcp_windows.h",
"src/core/iomgr/time_averaged_stats.h",
"src/core/iomgr/udp_server.h",
"src/core/iomgr/wakeup_fd_pipe.h",
"src/core/iomgr/wakeup_fd_posix.h",
"src/core/json/json.h",
@ -275,6 +275,7 @@
"src/core/iomgr/tcp_server_windows.c",
"src/core/iomgr/tcp_windows.c",
"src/core/iomgr/time_averaged_stats.c",
"src/core/iomgr/udp_server.c",
"src/core/iomgr/wakeup_fd_eventfd.c",
"src/core/iomgr/wakeup_fd_nospecial.c",
"src/core/iomgr/wakeup_fd_pipe.c",
@ -368,7 +369,6 @@
"include/grpc/support/atm_gcc_atomic.h",
"include/grpc/support/atm_gcc_sync.h",
"include/grpc/support/atm_win32.h",
"include/grpc/support/cancellable_platform.h",
"include/grpc/support/cmdline.h",
"include/grpc/support/cpu.h",
"include/grpc/support/histogram.h",
@ -403,7 +403,6 @@
],
"src": [
"src/core/support/alloc.c",
"src/core/support/cancellable.c",
"src/core/support/cmdline.c",
"src/core/support/cpu_iphone.c",
"src/core/support/cpu_linux.c",
@ -573,6 +572,28 @@
"secure": "no",
"vs_project_guid": "{46CEDFFF-9692-456A-AA24-38B5D6BCF4C5}"
},
{
"name": "grpc_zookeeper",
"build": "all",
"language": "c",
"public_headers": [
"include/grpc/grpc_zookeeper.h"
],
"headers": [
"src/core/client_config/resolvers/zookeeper_resolver.h"
],
"src": [
"src/core/client_config/resolvers/zookeeper_resolver.c"
],
"deps": [
"gpr",
"grpc"
],
"external_deps": [
"zookeeper"
],
"secure": "no"
},
{
"name": "reconnect_server",
"build": "private",
@ -1122,18 +1143,6 @@
"grpc"
]
},
{
"name": "gpr_cancellable_test",
"build": "test",
"language": "c",
"src": [
"test/core/support/cancellable_test.c"
],
"deps": [
"gpr_test_util",
"gpr"
]
},
{
"name": "gpr_cmdline_test",
"build": "test",
@ -1891,6 +1900,23 @@
"gpr"
]
},
{
"name": "udp_server_test",
"build": "test",
"language": "c",
"src": [
"test/core/iomgr/udp_server_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr_test_util",
"gpr"
],
"platforms": [
"posix"
]
},
{
"name": "uri_parser_test",
"build": "test",
@ -2462,6 +2488,9 @@
"gpr",
"grpc++_test_config"
],
"exclude_configs": [
"tsan"
],
"platforms": [
"mac",
"linux",
@ -2584,6 +2613,26 @@
"gpr"
]
},
{
"name": "shutdown_test",
"build": "test",
"language": "c++",
"src": [
"test/cpp/end2end/shutdown_test.cc"
],
"deps": [
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc_zookeeper",
"grpc",
"gpr_test_util",
"gpr"
],
"external_deps": [
"zookeeper"
]
},
{
"name": "status_test",
"build": "test",
@ -2658,6 +2707,26 @@
"gpr_test_util",
"gpr"
]
},
{
"name": "zookeeper_test",
"build": "test",
"language": "c++",
"src": [
"test/cpp/end2end/zookeeper_test.cc"
],
"deps": [
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc_zookeeper",
"grpc",
"gpr_test_util",
"gpr"
],
"external_deps": [
"zookeeper"
]
}
]
}

@ -31,9 +31,9 @@ Clients should accept these arguments:
* --server_retry_port=PORT
* The server port to connect to for testing backoffs. For example, "8081"
The client must connect to the control port without TLS. The client should
either assert on the server returned backoff status or check the returned
backoffs on its own.
The client must connect to the control port without TLS. The client must connect
to the retry port with TLS. The client should either assert on the server
returned backoff status or check the returned backoffs on its own.
Procedure of client:

@ -44,3 +44,12 @@ different jitter logic.
Alternate implementations must ensure that connection backoffs started at the
same time disperse, and must not attempt connections substantially more often
than the above algorithm.
## Reset Backoff
The back off should be reset to INITIAL_BACKOFF at some time point, so that the
reconnecting behavior is consistent no matter the connection is a newly started
one or a previously disconnected one.
We choose to reset the Backoff when the SETTINGS frame is received, at that time
point, we know for sure that this connection was accepted by the server.

@ -38,7 +38,7 @@ because the server is not yet available), the channel may spend increasingly
large amounts of time in this state.
IDLE: This is the state where the channel is not even trying to create a
connection because of a lack of new or pending RPCs. New channels MAY be created
connection because of a lack of new or pending RPCs. New RPCs MAY be created
in this state. Any attempt to start an RPC on the channel will push the channel
out of this state to connecting. When there has been no RPC activity on a channel
for a specified IDLE_TIMEOUT, i.e., no new or pending (active) RPCs for this

@ -0,0 +1,70 @@
GRPC Health Checking Protocol
================================
Health checks are used to probe whether the server is able to handle rpcs. The
client-to-server health checking can happen from point to point or via some
control system. A server may choose to reply “unhealthy” because it
is not ready to take requests, it is shutting down or some other reason.
The client can act accordingly if the response is not received within some time
window or the response says unhealthy in it.
A GRPC service is used as the health checking mechanism for both simple
client-to-server scenario and other control systems such as load-balancing.
Being a high
level service provides some benefits. Firstly, since it is a GRPC service
itself, doing a health check is in the same format as a normal rpc. Secondly,
it has rich semantics such as per-service health status. Thirdly, as a GRPC
service, it is able reuse all the existing billing, quota infrastructure, etc,
and thus the server has full control over the access of the health checking
service.
## Service Definition
The server should export a service defined in the following proto:
```
syntax = "proto3";
package grpc.health.v1alpha;
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}
```
A client can query the server’s health status by calling the `Check` method, and
a deadline should be set on the rpc. The client can optionally set the service
name it wants to query for health status. The suggested format of service name
is `package_names.ServiceName`, such as `grpc.health.v1alpha.Health`.
The server should register all the services manually and set
the individual status, including an empty service name and its status. For each
request received, if the service name can be found in the registry,
a response must be sent back with an `OK` status and the status field should be
set to `SERVING` or `NOT_SERVING` accordingly. If the service name is not
registered, the server returns a `NOT_FOUND` GRPC status.
The server should use an empty string as the key for server’s
overall health status, so that a client not interested in a specific service can
query the server's status with an empty request. The server can just do exact
matching of the service name without support of any kind of wildcard matching.
However, the service owner has the freedom to implement more complicated
matching semantics that both the client and server agree upon.
A client can declare the server as unhealthy if the rpc is not finished after
some amount of time. The client should be able to handle the case where server
does not have the Health service.

@ -73,7 +73,6 @@ Pod::Spec.new do |s|
'grpc/support/atm_gcc_atomic.h',
'grpc/support/atm_gcc_sync.h',
'grpc/support/atm_win32.h',
'grpc/support/cancellable_platform.h',
'grpc/support/cmdline.h',
'grpc/support/cpu.h',
'grpc/support/histogram.h',
@ -97,7 +96,6 @@ Pod::Spec.new do |s|
'grpc/support/tls_pthread.h',
'grpc/support/useful.h',
'src/core/support/alloc.c',
'src/core/support/cancellable.c',
'src/core/support/cmdline.c',
'src/core/support/cpu_iphone.c',
'src/core/support/cpu_linux.c',
@ -204,6 +202,7 @@ Pod::Spec.new do |s|
'src/core/iomgr/tcp_server.h',
'src/core/iomgr/tcp_windows.h',
'src/core/iomgr/time_averaged_stats.h',
'src/core/iomgr/udp_server.h',
'src/core/iomgr/wakeup_fd_pipe.h',
'src/core/iomgr/wakeup_fd_posix.h',
'src/core/json/json.h',
@ -335,6 +334,7 @@ Pod::Spec.new do |s|
'src/core/iomgr/tcp_server_windows.c',
'src/core/iomgr/tcp_windows.c',
'src/core/iomgr/time_averaged_stats.c',
'src/core/iomgr/udp_server.c',
'src/core/iomgr/wakeup_fd_eventfd.c',
'src/core/iomgr/wakeup_fd_nospecial.c',
'src/core/iomgr/wakeup_fd_pipe.c',
@ -471,6 +471,7 @@ Pod::Spec.new do |s|
'src/core/iomgr/tcp_server.h',
'src/core/iomgr/tcp_windows.h',
'src/core/iomgr/time_averaged_stats.h',
'src/core/iomgr/udp_server.h',
'src/core/iomgr/wakeup_fd_pipe.h',
'src/core/iomgr/wakeup_fd_posix.h',
'src/core/json/json.h',

@ -121,8 +121,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
}
// The response is dropped if the status is not OK.
if (status.ok()) {
finish_buf_.ServerSendStatus(
ctx_->trailing_metadata_, finish_buf_.SendMessage(msg));
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
finish_buf_.SendMessage(msg));
} else {
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
}

@ -34,12 +34,43 @@
#ifndef GRPCXX_AUTH_CONTEXT_H
#define GRPCXX_AUTH_CONTEXT_H
#include <iterator>
#include <vector>
#include <grpc++/auth_property_iterator.h>
#include <grpc++/config.h>
struct grpc_auth_context;
struct grpc_auth_property;
struct grpc_auth_property_iterator;
namespace grpc {
class SecureAuthContext;
typedef std::pair<grpc::string, grpc::string> AuthProperty;
class AuthPropertyIterator
: public std::iterator<std::input_iterator_tag, const AuthProperty> {
public:
~AuthPropertyIterator();
AuthPropertyIterator& operator++();
AuthPropertyIterator operator++(int);
bool operator==(const AuthPropertyIterator& rhs) const;
bool operator!=(const AuthPropertyIterator& rhs) const;
const AuthProperty operator*();
protected:
AuthPropertyIterator();
AuthPropertyIterator(const grpc_auth_property* property,
const grpc_auth_property_iterator* iter);
private:
friend class SecureAuthContext;
const grpc_auth_property* property_;
// The following items form a grpc_auth_property_iterator.
const grpc_auth_context* ctx_;
size_t index_;
const char* name_;
};
class AuthContext {
public:
@ -62,4 +93,3 @@ class AuthContext {
} // namespace grpc
#endif // GRPCXX_AUTH_CONTEXT_H

@ -91,8 +91,8 @@ class SerializationTraits<ByteBuffer, void> {
dest->set_buffer(byte_buffer);
return Status::OK;
}
static Status Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer,
bool* own_buffer) {
static Status Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer,
bool* own_buffer) {
*buffer = source.buffer();
*own_buffer = false;
return Status::OK;

@ -185,7 +185,9 @@ class ClientContext {
// Get and set census context
void set_census_context(struct census_context* ccp) { census_context_ = ccp; }
struct census_context* census_context() const { return census_context_; }
struct census_context* census_context() const {
return census_context_;
}
void TryCancel();
@ -223,15 +225,11 @@ class ClientContext {
void set_call(grpc_call* call,
const std::shared_ptr<ChannelInterface>& channel);
grpc_completion_queue* cq() { return cq_; }
void set_cq(grpc_completion_queue* cq) { cq_ = cq; }
grpc::string authority() { return authority_; }
bool initial_metadata_received_;
std::shared_ptr<ChannelInterface> channel_;
grpc_call* call_;
grpc_completion_queue* cq_;
gpr_timespec deadline_;
grpc::string authority_;
std::shared_ptr<Credentials> creds_;

@ -63,6 +63,7 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class UnknownMethodHandler;
class ChannelInterface;
class ClientContext;
@ -138,6 +139,7 @@ class CompletionQueue : public GrpcLibrary {
friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler;
friend class UnknownMethodHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>

@ -55,11 +55,12 @@ class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface {
private:
class DynamicThread {
public:
DynamicThread(DynamicThreadPool *pool);
public:
DynamicThread(DynamicThreadPool* pool);
~DynamicThread();
private:
DynamicThreadPool *pool_;
private:
DynamicThreadPool* pool_;
std::unique_ptr<grpc::thread> thd_;
void ThreadFunc();
};

@ -52,8 +52,8 @@ class GenericStub GRPC_FINAL {
// begin a call to a named method
std::unique_ptr<GenericClientAsyncReaderWriter> Call(
ClientContext* context, const grpc::string& method,
CompletionQueue* cq, void* tag);
ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag);
private:
std::shared_ptr<ChannelInterface> channel_;

@ -0,0 +1,4 @@
**The APIs in this directory are not stable!**
This directory contains header files that need to be installed but are not part
of the public API. Users should not use these headers directly.

@ -67,14 +67,10 @@ class WriteOptions {
WriteOptions(const WriteOptions& other) : flags_(other.flags_) {}
/// Clear all flags.
inline void Clear() {
flags_ = 0;
}
inline void Clear() { flags_ = 0; }
/// Returns raw flags bitset.
inline gpr_uint32 flags() const {
return flags_;
}
inline gpr_uint32 flags() const { return flags_; }
/// Sets flag for the disabling of compression for the next message write.
///
@ -122,9 +118,7 @@ class WriteOptions {
/// not go out on the wire immediately.
///
/// \sa GRPC_WRITE_BUFFER_HINT
inline bool get_buffer_hint() const {
return GetBit(GRPC_WRITE_BUFFER_HINT);
}
inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
WriteOptions& operator=(const WriteOptions& rhs) {
flags_ = rhs.flags_;
@ -132,17 +126,11 @@ class WriteOptions {
}
private:
void SetBit(const gpr_int32 mask) {
flags_ |= mask;
}
void SetBit(const gpr_int32 mask) { flags_ |= mask; }
void ClearBit(const gpr_int32 mask) {
flags_ &= ~mask;
}
void ClearBit(const gpr_int32 mask) { flags_ &= ~mask; }
bool GetBit(const gpr_int32 mask) const {
return flags_ & mask;
}
bool GetBit(const gpr_int32 mask) const { return flags_ & mask; }
gpr_uint32 flags_;
};
@ -173,6 +161,7 @@ class CallOpSendInitialMetadata {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->flags = 0;
op->reserved = NULL;
op->data.send_initial_metadata.count = initial_metadata_count_;
op->data.send_initial_metadata.metadata = initial_metadata_;
}
@ -206,6 +195,7 @@ class CallOpSendMessage {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_MESSAGE;
op->flags = write_options_.flags();
op->reserved = NULL;
op->data.send_message = send_buf_;
// Flags are per-message: clear them after use.
write_options_.Clear();
@ -248,6 +238,7 @@ class CallOpRecvMessage {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_MESSAGE;
op->flags = 0;
op->reserved = NULL;
op->data.recv_message = &recv_buf_;
}
@ -313,6 +304,7 @@ class CallOpGenericRecvMessage {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_RECV_MESSAGE;
op->flags = 0;
op->reserved = NULL;
op->data.recv_message = &recv_buf_;
}
@ -350,6 +342,7 @@ class CallOpClientSendClose {
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
}
void FinishOp(bool* status, int max_message_size) { send_ = false; }
@ -383,6 +376,7 @@ class CallOpServerSendStatus {
op->data.send_status_from_server.status_details =
send_status_details_.empty() ? nullptr : send_status_details_.c_str();
op->flags = 0;
op->reserved = NULL;
}
void FinishOp(bool* status, int max_message_size) {
@ -416,6 +410,7 @@ class CallOpRecvInitialMetadata {
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &recv_initial_metadata_arr_;
op->flags = 0;
op->reserved = NULL;
}
void FinishOp(bool* status, int max_message_size) {
if (recv_initial_metadata_ == nullptr) return;
@ -453,6 +448,7 @@ class CallOpClientRecvStatus {
op->data.recv_status_on_client.status_details_capacity =
&status_details_capacity_;
op->flags = 0;
op->reserved = NULL;
}
void FinishOp(bool* status, int max_message_size) {

@ -46,5 +46,4 @@ class GrpcLibrary {
} // namespace grpc
#endif // GRPCXX_IMPL_GRPC_LIBRARY_H

@ -208,6 +208,21 @@ class BidiStreamingHandler : public MethodHandler {
ServiceType* service_;
};
// Handle unknown method by returning UNIMPLEMENTED error.
class UnknownMethodHandler : public MethodHandler {
public:
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
Status status(StatusCode::UNIMPLEMENTED, "");
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
};
// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:

@ -37,12 +37,12 @@
namespace grpc {
/// Defines how to serialize and deserialize some type.
///
///
/// Used for hooking different message serialization API's into GRPC.
/// Each SerializationTraits implementation must provide the following
/// functions:
/// static Status Serialize(const Message& msg,
/// grpc_byte_buffer** buffer,
/// grpc_byte_buffer** buffer,
// bool* own_buffer);
/// static Status Deserialize(grpc_byte_buffer* buffer,
/// Message* msg,
@ -57,7 +57,7 @@ namespace grpc {
/// msg. max_message_size is passed in as a bound on the maximum number of
/// message bytes Deserialize should accept.
///
/// Both functions return a Status, allowing them to explain what went
/// Both functions return a Status, allowing them to explain what went
/// wrong if required.
template <class Message,
class UnusedButHereForPartialTemplateSpecialization = void>

@ -38,7 +38,7 @@
namespace grpc {
template<class mutex>
template <class mutex>
class lock_guard;
class condition_variable;
@ -46,6 +46,7 @@ class mutex {
public:
mutex() { gpr_mu_init(&mu_); }
~mutex() { gpr_mu_destroy(&mu_); }
private:
::gpr_mu mu_;
template <class mutex>
@ -58,6 +59,7 @@ class lock_guard {
public:
lock_guard(mutex &mu) : mu_(mu), locked(true) { gpr_mu_lock(&mu.mu_); }
~lock_guard() { unlock_internal(); }
protected:
void lock_internal() {
if (!locked) gpr_mu_lock(&mu_.mu_);
@ -67,6 +69,7 @@ class lock_guard {
if (locked) gpr_mu_unlock(&mu_.mu_);
locked = false;
}
private:
mutex &mu_;
bool locked;
@ -76,7 +79,7 @@ class lock_guard {
template <class mutex>
class unique_lock : public lock_guard<mutex> {
public:
unique_lock(mutex &mu) : lock_guard<mutex>(mu) { }
unique_lock(mutex &mu) : lock_guard<mutex>(mu) {}
void lock() { this->lock_internal(); }
void unlock() { this->unlock_internal(); }
};
@ -92,6 +95,7 @@ class condition_variable {
}
void notify_one() { gpr_cv_signal(&cv_); }
void notify_all() { gpr_cv_broadcast(&cv_); }
private:
gpr_cv cv_;
};

@ -40,7 +40,8 @@ namespace grpc {
class thread {
public:
template<class T> thread(void (T::*fptr)(), T *obj) {
template <class T>
thread(void (T::*fptr)(), T *obj) {
func_ = new thread_function<T>(fptr, obj);
joined_ = false;
start();
@ -53,28 +54,28 @@ class thread {
gpr_thd_join(thd_);
joined_ = true;
}
private:
void start() {
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
gpr_thd_new(&thd_, thread_func, (void *) func_, &options);
gpr_thd_new(&thd_, thread_func, (void *)func_, &options);
}
static void thread_func(void *arg) {
thread_function_base *func = (thread_function_base *) arg;
thread_function_base *func = (thread_function_base *)arg;
func->call();
}
class thread_function_base {
public:
virtual ~thread_function_base() { }
virtual ~thread_function_base() {}
virtual void call() = 0;
};
template<class T>
template <class T>
class thread_function : public thread_function_base {
public:
thread_function(void (T::*fptr)(), T *obj)
: fptr_(fptr)
, obj_(obj) { }
thread_function(void (T::*fptr)(), T *obj) : fptr_(fptr), obj_(obj) {}
virtual void call() { (obj_->*fptr_)(); }
private:
void (T::*fptr_)();
T *obj_;
@ -84,8 +85,8 @@ class thread {
bool joined_;
// Disallow copy and assign.
thread(const thread&);
void operator=(const thread&);
thread(const thread &);
void operator=(const thread &);
};
} // namespace grpc

@ -64,7 +64,14 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
~Server();
// Shutdown the server, block until all rpc processing finishes.
void Shutdown();
// Forcefully terminate pending calls after deadline expires.
template <class T>
void Shutdown(const T& deadline) {
ShutdownInternal(TimePoint<T>(deadline).raw_time());
}
// Shutdown the server, waiting for all rpc processing to finish.
void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
// Block waiting for all work to complete (the server must either
// be shutting down or some other thread must call Shutdown for this
@ -85,8 +92,9 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
int max_message_size, grpc_compression_options compression_options);
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance.
bool RegisterService(const grpc::string *host, RpcService* service);
bool RegisterAsyncService(const grpc::string *host, AsynchronousService* service);
bool RegisterService(const grpc::string* host, RpcService* service);
bool RegisterAsyncService(const grpc::string* host,
AsynchronousService* service);
void RegisterAsyncGenericService(AsyncGenericService* service);
// Add a listening port. Can be called multiple times.
int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
@ -99,6 +107,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
void ShutdownInternal(gpr_timespec deadline);
class BaseAsyncRequest : public CompletionQueueTag {
public:
BaseAsyncRequest(Server* server, ServerContext* context,
@ -229,6 +239,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
grpc::condition_variable callback_cv_;
std::list<SyncRequest>* sync_methods_;
std::unique_ptr<RpcServiceMethod> unknown_method_;
bool has_generic_service_;
// Pointer to the c grpc server.
grpc_server* const server_;

@ -78,7 +78,7 @@ class ServerBuilder {
// BuildAndStart().
// Only matches requests with :authority \a host
ServerBuilder& RegisterService(const grpc::string& host,
SynchronousService* service);
SynchronousService* service);
// Register an asynchronous service.
// This call does not take ownership of the service or completion queue.
@ -86,7 +86,7 @@ class ServerBuilder {
// instance returned by BuildAndStart().
// Only matches requests with :authority \a host
ServerBuilder& RegisterAsyncService(const grpc::string& host,
AsynchronousService* service);
AsynchronousService* service);
// Set max message size in bytes.
ServerBuilder& SetMaxMessageSize(int max_message_size);
@ -119,9 +119,10 @@ class ServerBuilder {
};
typedef std::unique_ptr<grpc::string> HostString;
template <class T> struct NamedService {
template <class T>
struct NamedService {
explicit NamedService(T* s) : service(s) {}
NamedService(const grpc::string& h, T *s)
NamedService(const grpc::string& h, T* s)
: host(new grpc::string(h)), service(s) {}
HostString host;
T* service;
@ -130,7 +131,8 @@ class ServerBuilder {
int max_message_size_;
grpc_compression_options compression_options_;
std::vector<std::unique_ptr<NamedService<RpcService>>> services_;
std::vector<std::unique_ptr<NamedService<AsynchronousService>>> async_services_;
std::vector<std::unique_ptr<NamedService<AsynchronousService>>>
async_services_;
std::vector<Port> ports_;
std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;

@ -73,6 +73,7 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class UnknownMethodHandler;
class Call;
class CallOpBuffer;
@ -159,6 +160,7 @@ class ServerContext {
friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler;
friend class UnknownMethodHandler;
friend class ::grpc::ClientContext;
// Prevent copying.

@ -85,9 +85,7 @@ class WriterInterface {
// Returns false when the stream has been closed.
virtual bool Write(const W& msg, const WriteOptions& options) = 0;
inline bool Write(const W& msg) {
return Write(msg, WriteOptions());
}
inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
};
template <class R>
@ -640,9 +638,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
}
// The response is dropped if the status is not OK.
if (status.ok()) {
finish_ops_.ServerSendStatus(
ctx_->trailing_metadata_,
finish_ops_.SendMessage(msg));
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
finish_ops_.SendMessage(msg));
} else {
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
}

@ -47,8 +47,12 @@ typedef enum {
} grpc_byte_buffer_type;
struct grpc_byte_buffer {
void *reserved;
grpc_byte_buffer_type type;
union {
struct {
void *reserved[8];
} reserved;
struct {
grpc_compression_algorithm compression;
gpr_slice_buffer slice_buffer;

@ -104,6 +104,61 @@ int census_context_deserialize(const char *buffer, census_context **context);
* future census calls will result in undefined behavior. */
void census_context_destroy(census_context *context);
/* Max number of characters in tag key */
#define CENSUS_MAX_TAG_KEY_LENGTH 20
/* Max number of tag value characters */
#define CENSUS_MAX_TAG_VALUE_LENGTH 50
/* A Census tag set is a collection of key:value string pairs; these form the
basis against which Census metrics will be recorded. Keys are unique within
a tag set. All contexts have an associated tag set. */
typedef struct census_tag_set census_tag_set;
/* Returns a pointer to a newly created, empty tag set. If size_hint > 0,
indicates that the tag set is intended to hold approximately that number
of tags. */
census_tag_set *census_tag_set_create(size_t size_hint);
/* Add a new tag key/value to an existing tag set; if the tag key already exists
in the tag set, then its value is overwritten with the new one. Can also be
used to delete a tag, by specifying a NULL value. If key is NULL, returns
the number of tags in the tag set.
Return values:
-1: invalid length key or value
non-negative value: the number of tags in the tag set. */
int census_tag_set_add(census_tag_set *tags, const char *key,
const char *value);
/* Destroys a tag set. This function must be called to prevent memory leaks.
Once called, the tag set cannot be used again. */
void census_tag_set_destroy(census_tag_set *tags);
/* Get a contexts tag set. */
census_tag_set *census_context_tag_set(census_context *context);
/* A read-only representation of a tag for use by census clients. */
typedef struct {
size_t key_len; /* Number of bytes in tag key. */
const char *key; /* A pointer to the tag key. May not be null-terminated. */
size_t value_len; /* Number of bytes in tag value. */
const char *value; /* Pointer to the tag value. May not be null-terminated. */
} census_tag_const;
/* Used to iterate through a tag sets contents. */
typedef struct census_tag_set_iterator census_tag_set_iterator;
/* Open a tag set for iteration. The tag set must not be modified while
iteration is ongoing. Returns an iterator for use in following functions. */
census_tag_set_iterator *census_tag_set_open(census_tag_set *tags);
/* Get the next tag in the tag set, by writing into the 'tag' argument. Returns
1 if there is a "next" tag, 0 if there are no more tags. */
int census_tag_set_next(census_tag_set_iterator *it, census_tag_const *tag);
/* Close an iterator opened by census_tag_set_open(). The iterator will be
invalidated, and should not be used once close is called. */
void census_tag_set_close(census_tag_set_iterator *it);
/* A census statistic to be recorded comprises two parts: an ID for the
* particular statistic and the value to be recorded against it. */
typedef struct {

@ -75,7 +75,9 @@ int grpc_compression_algorithm_parse(const char *name, size_t name_length,
grpc_compression_algorithm *algorithm);
/** Updates \a name with the encoding name corresponding to a valid \a
* algorithm. Returns 1 upon success, 0 otherwise. */
* algorithm. Note that the string returned through \a name upon success is
* statically allocated and shouldn't be freed. Returns 1 upon success, 0
* otherwise. */
int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name);

@ -181,7 +181,9 @@ typedef enum grpc_call_error {
GRPC_CALL_ERROR_INVALID_MESSAGE,
/** completion queue for notification has not been registered with the
server */
GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE
GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE,
/** this batch of operations leads to more operations than allowed */
GRPC_CALL_ERROR_BATCH_TOO_BIG
} grpc_call_error;
/* Write Flags: */
@ -200,13 +202,14 @@ typedef struct grpc_metadata {
const char *key;
const char *value;
size_t value_length;
gpr_uint32 flags;
/** The following fields are reserved for grpc internal use.
There is no need to initialize them, and they will be set to garbage
during
calls to grpc. */
struct {
void *obfuscated[3];
void *obfuscated[4];
} internal_data;
} grpc_metadata;
@ -249,6 +252,7 @@ typedef struct {
char *host;
size_t host_capacity;
gpr_timespec deadline;
void *reserved;
} grpc_call_details;
void grpc_call_details_init(grpc_call_details *details);
@ -256,31 +260,44 @@ void grpc_call_details_destroy(grpc_call_details *details);
typedef enum {
/** Send initial metadata: one and only one instance MUST be sent for each
call, unless the call was cancelled - in which case this can be skipped */
call, unless the call was cancelled - in which case this can be skipped.
This op completes after all bytes of metadata have been accepted by
outgoing flow control. */
GRPC_OP_SEND_INITIAL_METADATA = 0,
/** Send a message: 0 or more of these operations can occur for each call */
/** Send a message: 0 or more of these operations can occur for each call.
This op completes after all bytes for the message have been accepted by
outgoing flow control. */
GRPC_OP_SEND_MESSAGE,
/** Send a close from the client: one and only one instance MUST be sent from
the client, unless the call was cancelled - in which case this can be
skipped */
skipped.
This op completes after all bytes for the call (including the close)
have passed outgoing flow control. */
GRPC_OP_SEND_CLOSE_FROM_CLIENT,
/** Send status from the server: one and only one instance MUST be sent from
the server unless the call was cancelled - in which case this can be
skipped */
skipped.
This op completes after all bytes for the call (including the status)
have passed outgoing flow control. */
GRPC_OP_SEND_STATUS_FROM_SERVER,
/** Receive initial metadata: one and only one MUST be made on the client,
must not be made on the server */
must not be made on the server.
This op completes after all initial metadata has been read from the
peer. */
GRPC_OP_RECV_INITIAL_METADATA,
/** Receive a message: 0 or more of these operations can occur for each call
*/
/** Receive a message: 0 or more of these operations can occur for each call.
This op completes after all bytes of the received message have been
read, or after a half-close has been received on this call. */
GRPC_OP_RECV_MESSAGE,
/** Receive status on the client: one and only one must be made on the client.
This operation always succeeds, meaning ops paired with this operation
will also appear to succeed, even though they may not have. In that case
the status will indicate some failure. */
This operation always succeeds, meaning ops paired with this operation
will also appear to succeed, even though they may not have. In that case
the status will indicate some failure.
This op completes after all activity on the call has completed. */
GRPC_OP_RECV_STATUS_ON_CLIENT,
/** Receive close on the server: one and only one must be made on the
server */
server.
This op completes after the close has been received by the server. */
GRPC_OP_RECV_CLOSE_ON_SERVER
} grpc_op_type;
@ -291,7 +308,13 @@ typedef struct grpc_op {
grpc_op_type op;
/** Write flags bitset for grpc_begin_messages */
gpr_uint32 flags;
/** Reserved for future usage */
void *reserved;
union {
/** Reserved for future usage */
struct {
void *reserved[8];
} reserved;
struct {
size_t count;
grpc_metadata *metadata;
@ -353,6 +376,16 @@ typedef struct grpc_op {
} data;
} grpc_op;
/** Registers a plugin to be initialized and destroyed with the library.
The \a init and \a destroy functions will be invoked as part of
\a grpc_init() and \a grpc_shutdown(), respectively.
Note that these functions can be invoked an arbitrary number of times
(and hence so will \a init and \a destroy).
It is safe to pass NULL to either argument. Plugins are destroyed in
the reverse order they were initialized. */
void grpc_register_plugin(void (*init)(void), void (*destroy)(void));
/* Propagation bits: this can be bitwise or-ed to form propagation_mask for
* grpc_call */
/** Propagate deadline */
@ -365,8 +398,8 @@ typedef struct grpc_op {
/* Default propagation mask: clients of the core API are encouraged to encode
deltas from this in their implementations... ie write:
GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline
propagation. Doing so gives flexibility in the future to define new
GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline
propagation. Doing so gives flexibility in the future to define new
propagation types that are default inherited or not. */
#define GRPC_PROPAGATE_DEFAULTS \
((gpr_uint32)(( \
@ -393,7 +426,7 @@ void grpc_shutdown(void);
const char *grpc_version_string(void);
/** Create a completion queue */
grpc_completion_queue *grpc_completion_queue_create(void);
grpc_completion_queue *grpc_completion_queue_create(void *reserved);
/** Blocks until an event is available, the completion queue is being shut down,
or deadline is reached.
@ -404,7 +437,7 @@ grpc_completion_queue *grpc_completion_queue_create(void);
Callers must not call grpc_completion_queue_next and
grpc_completion_queue_pluck simultaneously on the same completion queue. */
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline);
gpr_timespec deadline, void *reserved);
/** Blocks until an event with tag 'tag' is available, the completion queue is
being shutdown or deadline is reached.
@ -413,12 +446,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
otherwise a grpc_event describing the event that occurred.
Callers must not call grpc_completion_queue_next and
grpc_completion_queue_pluck simultaneously on the same completion queue.
grpc_completion_queue_pluck simultaneously on the same completion queue.
Completion queues support a maximum of GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
concurrently executing plucks at any time. */
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline);
gpr_timespec deadline, void *reserved);
/** Maximum number of outstanding grpc_completion_queue_pluck executions per
completion queue */
@ -454,24 +487,24 @@ void grpc_channel_watch_connectivity_state(
completions are sent to 'completion_queue'. 'method' and 'host' need only
live through the invocation of this function.
If parent_call is non-NULL, it must be a server-side call. It will be used
to propagate properties from the server call to this new client call.
to propagate properties from the server call to this new client call.
*/
grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *parent_call,
gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue,
const char *method, const char *host,
gpr_timespec deadline);
gpr_timespec deadline, void *reserved);
/** Pre-register a method/host pair on a channel. */
void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host);
const char *host, void *reserved);
/** Create a call given a handle returned from grpc_channel_register_call */
grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline);
gpr_timespec deadline, void *reserved);
/** Start a batch of operations defined in the array ops; when complete, post a
completion of type 'tag' to the completion queue bound to the call.
@ -485,7 +518,7 @@ grpc_call *grpc_channel_create_registered_call(
containing just send operations independently from batches containing just
receive operations. */
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag);
size_t nops, void *tag, void *reserved);
/** Returns a newly allocated string representing the endpoint to which this
call is communicating with. The string is in the uri format accepted by
@ -517,10 +550,13 @@ char *grpc_channel_get_target(grpc_channel *channel);
more on this. The data in 'args' need only live through the invocation of
this function. */
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args);
const grpc_channel_args *args,
void *reserved);
/** Create a lame client: this client fails every operation attempted on it. */
grpc_channel *grpc_lame_client_channel_create(const char *target);
grpc_channel *grpc_lame_client_channel_create(const char *target,
grpc_status_code error_code,
const char *error_message);
/** Close and destroy a grpc channel */
void grpc_channel_destroy(grpc_channel *channel);
@ -536,7 +572,7 @@ void grpc_channel_destroy(grpc_channel *channel);
THREAD-SAFETY grpc_call_cancel and grpc_call_cancel_with_status
are thread-safe, and can be called at any point before grpc_call_destroy
is called.*/
grpc_call_error grpc_call_cancel(grpc_call *call);
grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved);
/** Called by clients to cancel an RPC on the server.
Can be called multiple times, from any thread.
@ -546,7 +582,8 @@ grpc_call_error grpc_call_cancel(grpc_call *call);
remote endpoint. */
grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description);
const char *description,
void *reserved);
/** Destroy a call.
THREAD SAFETY: grpc_call_destroy is thread-compatible */
@ -585,14 +622,15 @@ grpc_call_error grpc_server_request_registered_call(
be specified with args. If no additional configuration is needed, args can
be NULL. See grpc_channel_args for more. The data in 'args' need only live
through the invocation of this function. */
grpc_server *grpc_server_create(const grpc_channel_args *args);
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved);
/** Register a completion queue with the server. Must be done for any
notification completion queue that is passed to grpc_server_request_*_call
and to grpc_server_shutdown_and_notify. Must be performed prior to
grpc_server_start. */
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq);
grpc_completion_queue *cq,
void *reserved);
/** Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure.

@ -31,26 +31,29 @@
*
*/
#ifndef GRPC_SUPPORT_CANCELLABLE_PLATFORM_H
#define GRPC_SUPPORT_CANCELLABLE_PLATFORM_H
/** Support zookeeper as alternative name system in addition to DNS
* Zookeeper name in gRPC is represented as a URI:
* zookeeper://host:port/path/service/instance
*
* Where zookeeper is the name system scheme
* host:port is the address of a zookeeper server
* /path/service/instance is the zookeeper name to be resolved
*
* Refer doc/naming.md for more details
*/
#ifndef GRPC_GRPC_ZOOKEEPER_H
#define GRPC_GRPC_ZOOKEEPER_H
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#ifdef __cplusplus
extern "C" {
#endif
struct gpr_cancellable_list_ {
/* a doubly-linked list on cancellable's waiters queue */
struct gpr_cancellable_list_ *next;
struct gpr_cancellable_list_ *prev;
/* The following two fields are arguments to gpr_cv_cancellable_wait() */
gpr_mu *mu;
gpr_cv *cv;
};
/** Register zookeeper name resolver in grpc */
void grpc_zookeeper_register();
/* Internal definition of gpr_cancellable. */
typedef struct {
gpr_mu mu; /* protects waiters and modifications to cancelled */
gpr_atm cancelled;
struct gpr_cancellable_list_ waiters;
} gpr_cancellable;
#ifdef __cplusplus
}
#endif
#endif /* GRPC_SUPPORT_CANCELLABLE_PLATFORM_H */
#endif /* GRPC_GRPC_ZOOKEEPER_H */

@ -160,4 +160,4 @@ typedef enum {
}
#endif
#endif /* GRPC_STATUS_H */
#endif /* GRPC_STATUS_H */

@ -55,4 +55,4 @@ void gpr_free_aligned(void *ptr);
}
#endif
#endif /* GRPC_SUPPORT_ALLOC_H */
#endif /* GRPC_SUPPORT_ALLOC_H */

@ -89,4 +89,4 @@
#error could not determine platform for atm
#endif
#endif /* GRPC_SUPPORT_ATM_H */
#endif /* GRPC_SUPPORT_ATM_H */

@ -69,4 +69,4 @@ static __inline int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
__ATOMIC_RELAXED);
}
#endif /* GRPC_SUPPORT_ATM_GCC_ATOMIC_H */
#endif /* GRPC_SUPPORT_ATM_GCC_ATOMIC_H */

@ -84,4 +84,4 @@ static __inline void gpr_atm_no_barrier_store(gpr_atm *p, gpr_atm value) {
#define gpr_atm_acq_cas(p, o, n) (__sync_bool_compare_and_swap((p), (o), (n)))
#define gpr_atm_rel_cas(p, o, n) gpr_atm_acq_cas((p), (o), (n))
#endif /* GRPC_SUPPORT_ATM_GCC_SYNC_H */
#endif /* GRPC_SUPPORT_ATM_GCC_SYNC_H */

@ -66,31 +66,31 @@ static __inline int gpr_atm_no_barrier_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
/* InterlockedCompareExchangePointerNoFence() not available on vista or
windows7 */
#ifdef GPR_ARCH_64
return o == (gpr_atm)InterlockedCompareExchangeAcquire64((volatile LONGLONG *) p,
(LONGLONG) n, (LONGLONG) o);
return o == (gpr_atm)InterlockedCompareExchangeAcquire64(
(volatile LONGLONG *)p, (LONGLONG)n, (LONGLONG)o);
#else
return o == (gpr_atm)InterlockedCompareExchangeAcquire((volatile LONG *) p,
(LONG) n, (LONG) o);
return o == (gpr_atm)InterlockedCompareExchangeAcquire((volatile LONG *)p,
(LONG)n, (LONG)o);
#endif
}
static __inline int gpr_atm_acq_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
#ifdef GPR_ARCH_64
return o == (gpr_atm)InterlockedCompareExchangeAcquire64((volatile LONGLONG *) p,
(LONGLONG) n, (LONGLONG) o);
return o == (gpr_atm)InterlockedCompareExchangeAcquire64(
(volatile LONGLONG *)p, (LONGLONG)n, (LONGLONG)o);
#else
return o == (gpr_atm)InterlockedCompareExchangeAcquire((volatile LONG *) p,
(LONG) n, (LONG) o);
return o == (gpr_atm)InterlockedCompareExchangeAcquire((volatile LONG *)p,
(LONG)n, (LONG)o);
#endif
}
static __inline int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n) {
#ifdef GPR_ARCH_64
return o == (gpr_atm)InterlockedCompareExchangeRelease64((volatile LONGLONG *) p,
(LONGLONG) n, (LONGLONG) o);
return o == (gpr_atm)InterlockedCompareExchangeRelease64(
(volatile LONGLONG *)p, (LONGLONG)n, (LONGLONG)o);
#else
return o == (gpr_atm)InterlockedCompareExchangeRelease((volatile LONG *) p,
(LONG) n, (LONG) o);
return o == (gpr_atm)InterlockedCompareExchangeRelease((volatile LONG *)p,
(LONG)n, (LONG)o);
#endif
}
@ -110,17 +110,16 @@ static __inline gpr_atm gpr_atm_full_fetch_add(gpr_atm *p, gpr_atm delta) {
#ifdef GPR_ARCH_64
do {
old = *p;
} while (old != (gpr_atm)InterlockedCompareExchange64((volatile LONGLONG *) p,
(LONGLONG) old + delta,
(LONGLONG) old));
} while (old != (gpr_atm)InterlockedCompareExchange64((volatile LONGLONG *)p,
(LONGLONG)old + delta,
(LONGLONG)old));
#else
do {
old = *p;
} while (old != (gpr_atm)InterlockedCompareExchange((volatile LONG *) p,
(LONG) old + delta,
(LONG) old));
} while (old != (gpr_atm)InterlockedCompareExchange(
(volatile LONG *)p, (LONG)old + delta, (LONG)old));
#endif
return old;
}
#endif /* GRPC_SUPPORT_ATM_WIN32_H */
#endif /* GRPC_SUPPORT_ATM_WIN32_H */

@ -94,4 +94,4 @@ char *gpr_cmdline_usage_string(gpr_cmdline *cl, const char *argv0);
}
#endif
#endif /* GRPC_SUPPORT_CMDLINE_H */
#endif /* GRPC_SUPPORT_CMDLINE_H */

@ -54,4 +54,4 @@ unsigned gpr_cpu_current_cpu(void);
} // extern "C"
#endif
#endif /* GRPC_SUPPORT_CPU_H */
#endif /* GRPC_SUPPORT_CPU_H */

@ -73,4 +73,4 @@ void gpr_histogram_merge_contents(gpr_histogram *histogram,
}
#endif
#endif /* GRPC_SUPPORT_HISTOGRAM_H */
#endif /* GRPC_SUPPORT_HISTOGRAM_H */

@ -61,4 +61,4 @@ int gpr_split_host_port(const char *name, char **host, char **port);
}
#endif
#endif /* GRPC_SUPPORT_HOST_PORT_H */
#endif /* GRPC_SUPPORT_HOST_PORT_H */

@ -105,4 +105,4 @@ void gpr_set_log_function(gpr_log_func func);
}
#endif
#endif /* GRPC_SUPPORT_LOG_H */
#endif /* GRPC_SUPPORT_LOG_H */

@ -48,4 +48,4 @@ char *gpr_format_message(DWORD messageid);
}
#endif
#endif /* GRPC_SUPPORT_LOG_WIN32_H */
#endif /* GRPC_SUPPORT_LOG_WIN32_H */

@ -64,7 +64,8 @@
#undef GRPC_NOMINMAX_WAS_NOT_DEFINED
#undef NOMINMAX
#endif /* GRPC_WIN32_LEAN_AND_MEAN_WAS_NOT_DEFINED */
#endif /* defined(_WIN64) || defined(WIN64) || defined(_WIN32) || defined(WIN32) */
#endif /* defined(_WIN64) || defined(WIN64) || defined(_WIN32) || \
defined(WIN32) */
/* Override this file with one for your platform if you need to redefine
things. */
@ -173,6 +174,8 @@
#endif /* _LP64 */
#elif defined(__APPLE__)
#include <TargetConditionals.h>
/* Provides IPV6_RECVPKTINFO */
#define __APPLE_USE_RFC_3542
#ifndef _BSD_SOURCE
#define _BSD_SOURCE
#endif

@ -96,7 +96,7 @@ typedef struct gpr_slice {
#define GPR_SLICE_LENGTH(slice) \
((slice).refcount ? (slice).data.refcounted.length \
: (slice).data.inlined.length)
#define GPR_SLICE_SET_LENGTH(slice, newlen) \
#define GPR_SLICE_SET_LENGTH(slice, newlen) \
((slice).refcount ? ((slice).data.refcounted.length = (size_t)(newlen)) \
: ((slice).data.inlined.length = (gpr_uint8)(newlen)))
#define GPR_SLICE_END_PTR(slice) \

@ -58,4 +58,4 @@ int gpr_asprintf(char **strp, const char *format, ...);
}
#endif
#endif /* GRPC_SUPPORT_STRING_UTIL_H */
#endif /* GRPC_SUPPORT_STRING_UTIL_H */

@ -36,7 +36,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#endif
typedef struct gpr_subprocess gpr_subprocess;

@ -65,7 +65,6 @@
#endif
#include <grpc/support/time.h> /* for gpr_timespec */
#include <grpc/support/cancellable_platform.h>
#ifdef __cplusplus
extern "C" {
@ -121,11 +120,6 @@ void gpr_cv_destroy(gpr_cv *cv);
holds an exclusive lock on *mu. */
int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline);
/* Behave like gpr_cv_wait(cv, mu, abs_deadline), except behave as though
the deadline has expired if *c is cancelled. */
int gpr_cv_cancellable_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline,
gpr_cancellable *c);
/* If any threads are waiting on *cv, wake at least one.
Clients may treat this as an optimization of gpr_cv_broadcast()
for use in the case where waking more than one waiter is not useful.
@ -135,28 +129,6 @@ void gpr_cv_signal(gpr_cv *cv);
/* Wake all threads waiting on *cv. Requires: *cv initialized. */
void gpr_cv_broadcast(gpr_cv *cv);
/* --- Cancellation ---
A gpr_cancellable can be used with gpr_cv_cancellable_wait()
or gpr_event_cancellable_wait() cancel pending waits. */
/* Initialize *c. */
void gpr_cancellable_init(gpr_cancellable *c);
/* Cause *c no longer to be initialized, freeing any memory in use. Requires:
*c initialized; no other concurrent operation on *c. */
void gpr_cancellable_destroy(gpr_cancellable *c);
/* Return non-zero iff *c has been cancelled. Requires *c initialized.
This call is faster than acquiring a mutex on most platforms. */
int gpr_cancellable_is_cancelled(gpr_cancellable *c);
/* Cancel *c. If *c was not previously cancelled, cause
gpr_cancellable_init() to return non-zero, and outstanding and future
calls to gpr_cv_cancellable_wait() and gpr_event_cancellable_wait() to
return immediately indicating a timeout has occurred; otherwise do nothing.
Requires *c initialized.*/
void gpr_cancellable_cancel(gpr_cancellable *c);
/* --- One-time initialization ---
gpr_once must be declared with static storage class, and initialized with
@ -199,11 +171,6 @@ void *gpr_event_get(gpr_event *ev);
on most platforms. */
void *gpr_event_wait(gpr_event *ev, gpr_timespec abs_deadline);
/* Behave like gpr_event_wait(ev, abs_deadline), except behave as though
the deadline has expired if *c is cancelled. */
void *gpr_event_cancellable_wait(gpr_event *ev, gpr_timespec abs_deadline,
gpr_cancellable *c);
/* --- Reference counting ---
These calls act on the type gpr_refcount. It requires no destruction. */
@ -345,4 +312,4 @@ gpr_intptr gpr_stats_read(const gpr_stats_counter *c);
}
#endif
#endif /* GRPC_SUPPORT_SYNC_H */
#endif /* GRPC_SUPPORT_SYNC_H */

@ -38,24 +38,18 @@
#include <grpc/support/atm.h>
/* gpr_event */
typedef struct {
gpr_atm state;
} gpr_event;
typedef struct { gpr_atm state; } gpr_event;
#define GPR_EVENT_INIT \
{ 0 }
/* gpr_refcount */
typedef struct {
gpr_atm count;
} gpr_refcount;
typedef struct { gpr_atm count; } gpr_refcount;
/* gpr_stats_counter */
typedef struct {
gpr_atm value;
} gpr_stats_counter;
typedef struct { gpr_atm value; } gpr_stats_counter;
#define GPR_STATS_INIT \
{ 0 }
#endif /* GRPC_SUPPORT_SYNC_GENERIC_H */
#endif /* GRPC_SUPPORT_SYNC_GENERIC_H */

@ -44,4 +44,4 @@ typedef pthread_once_t gpr_once;
#define GPR_ONCE_INIT PTHREAD_ONCE_INIT
#endif /* GRPC_SUPPORT_SYNC_POSIX_H */
#endif /* GRPC_SUPPORT_SYNC_POSIX_H */

@ -46,4 +46,4 @@ typedef CONDITION_VARIABLE gpr_cv;
typedef INIT_ONCE gpr_once;
#define GPR_ONCE_INIT INIT_ONCE_STATIC_INIT
#endif /* GRPC_SUPPORT_SYNC_WIN32_H */
#endif /* GRPC_SUPPORT_SYNC_WIN32_H */

@ -88,4 +88,4 @@ void gpr_thd_join(gpr_thd_id t);
}
#endif
#endif /* GRPC_SUPPORT_THD_H */
#endif /* GRPC_SUPPORT_THD_H */

@ -84,7 +84,8 @@ void gpr_time_init(void);
gpr_timespec gpr_now(gpr_clock_type clock);
/* Convert a timespec from one clock to another */
gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type target_clock);
gpr_timespec gpr_convert_clock_type(gpr_timespec t,
gpr_clock_type target_clock);
/* Return -ve, 0, or +ve according to whether a < b, a == b, or a > b
respectively. */

@ -47,7 +47,7 @@
GPR_TLS_DECL(foo);
Thread locals always have static scope.
Initializing a thread local (must be done at library initialization
Initializing a thread local (must be done at library initialization
time):
gpr_tls_init(&foo);
@ -58,7 +58,7 @@
gpr_tls_set(&foo, new_value);
Accessing a thread local:
current_value = gpr_tls_get(&foo, value);
current_value = gpr_tls_get(&foo, value);
ALL functions here may be implemented as macros. */

@ -42,10 +42,14 @@ struct gpr_gcc_thread_local {
};
#define GPR_TLS_DECL(name) \
static __thread struct gpr_gcc_thread_local name = {0}
static __thread struct gpr_gcc_thread_local name = {0}
#define gpr_tls_init(tls) do {} while (0)
#define gpr_tls_destroy(tls) do {} while (0)
#define gpr_tls_init(tls) \
do { \
} while (0)
#define gpr_tls_destroy(tls) \
do { \
} while (0)
#define gpr_tls_set(tls, new_value) (((tls)->value) = (new_value))
#define gpr_tls_get(tls) ((tls)->value)

@ -42,10 +42,14 @@ struct gpr_msvc_thread_local {
};
#define GPR_TLS_DECL(name) \
static __declspec(thread) struct gpr_msvc_thread_local name = {0}
static __declspec(thread) struct gpr_msvc_thread_local name = {0}
#define gpr_tls_init(tls) do {} while (0)
#define gpr_tls_destroy(tls) do {} while (0)
#define gpr_tls_init(tls) \
do { \
} while (0)
#define gpr_tls_destroy(tls) \
do { \
} while (0)
#define gpr_tls_set(tls, new_value) (((tls)->value) = (new_value))
#define gpr_tls_get(tls) ((tls)->value)

@ -46,10 +46,10 @@
#define GPR_ARRAY_SIZE(array) (sizeof(array) / sizeof(*(array)))
#define GPR_SWAP(type, a, b) \
do { \
type x = a; \
a = b; \
b = x; \
do { \
type x = a; \
a = b; \
b = x; \
} while (0)
/** Set the \a n-th bit of \a i (a mutable pointer). */
@ -72,4 +72,4 @@
0x0f0f0f0f) % \
255)
#endif /* GRPC_SUPPORT_USEFUL_H */
#endif /* GRPC_SUPPORT_USEFUL_H */

@ -41,7 +41,7 @@ namespace grpc_csharp_generator {
inline bool ServicesFilename(const grpc::protobuf::FileDescriptor *file,
grpc::string *file_name_or_error) {
*file_name_or_error = grpc_generator::FileNameInUpperCamel(file) + "Grpc.cs";
*file_name_or_error = grpc_generator::FileNameInUpperCamel(file, false) + "Grpc.cs";
return true;
}

@ -125,16 +125,23 @@ inline grpc::string LowerUnderscoreToUpperCamel(grpc::string str) {
return result;
}
inline grpc::string FileNameInUpperCamel(const grpc::protobuf::FileDescriptor *file) {
inline grpc::string FileNameInUpperCamel(const grpc::protobuf::FileDescriptor *file,
bool include_package_path) {
std::vector<grpc::string> tokens = tokenize(StripProto(file->name()), "/");
grpc::string result = "";
for (unsigned int i = 0; i < tokens.size() - 1; i++) {
result += tokens[i] + "/";
if (include_package_path) {
for (unsigned int i = 0; i < tokens.size() - 1; i++) {
result += tokens[i] + "/";
}
}
result += LowerUnderscoreToUpperCamel(tokens.back());
return result;
}
inline grpc::string FileNameInUpperCamel(const grpc::protobuf::FileDescriptor *file) {
return FileNameInUpperCamel(file, true);
}
enum MethodType {
METHODTYPE_NO_STREAMING,
METHODTYPE_CLIENT_STREAMING,

@ -44,7 +44,6 @@ using ::google::protobuf::compiler::objectivec::ClassName;
using ::grpc::protobuf::io::Printer;
using ::grpc::protobuf::MethodDescriptor;
using ::grpc::protobuf::ServiceDescriptor;
using ::grpc::string;
using ::std::map;
namespace grpc_objective_c_generator {
@ -52,7 +51,7 @@ namespace {
void PrintProtoRpcDeclarationAsPragma(Printer *printer,
const MethodDescriptor *method,
map<string, string> vars) {
map< ::grpc::string, ::grpc::string> vars) {
vars["client_stream"] = method->client_streaming() ? "stream " : "";
vars["server_stream"] = method->server_streaming() ? "stream " : "";
@ -62,7 +61,7 @@ void PrintProtoRpcDeclarationAsPragma(Printer *printer,
}
void PrintMethodSignature(Printer *printer, const MethodDescriptor *method,
const map<string, string> &vars) {
const map< ::grpc::string, ::grpc::string> &vars) {
// TODO(jcanizales): Print method comments.
printer->Print(vars, "- ($return_type$)$method_name$With");
@ -85,7 +84,7 @@ void PrintMethodSignature(Printer *printer, const MethodDescriptor *method,
}
void PrintSimpleSignature(Printer *printer, const MethodDescriptor *method,
map<string, string> vars) {
map< ::grpc::string, ::grpc::string> vars) {
vars["method_name"] =
grpc_generator::LowercaseFirstLetter(vars["method_name"]);
vars["return_type"] = "void";
@ -93,14 +92,14 @@ void PrintSimpleSignature(Printer *printer, const MethodDescriptor *method,
}
void PrintAdvancedSignature(Printer *printer, const MethodDescriptor *method,
map<string, string> vars) {
map< ::grpc::string, ::grpc::string> vars) {
vars["method_name"] = "RPCTo" + vars["method_name"];
vars["return_type"] = "ProtoRPC *";
PrintMethodSignature(printer, method, vars);
}
inline map<string, string> GetMethodVars(const MethodDescriptor *method) {
map<string, string> res;
inline map< ::grpc::string, ::grpc::string> GetMethodVars(const MethodDescriptor *method) {
map< ::grpc::string, ::grpc::string> res;
res["method_name"] = method->name();
res["request_type"] = method->input_type()->name();
res["response_type"] = method->output_type()->name();
@ -110,7 +109,7 @@ inline map<string, string> GetMethodVars(const MethodDescriptor *method) {
}
void PrintMethodDeclarations(Printer *printer, const MethodDescriptor *method) {
map<string, string> vars = GetMethodVars(method);
map< ::grpc::string, ::grpc::string> vars = GetMethodVars(method);
PrintProtoRpcDeclarationAsPragma(printer, method, vars);
@ -121,7 +120,7 @@ void PrintMethodDeclarations(Printer *printer, const MethodDescriptor *method) {
}
void PrintSimpleImplementation(Printer *printer, const MethodDescriptor *method,
map<string, string> vars) {
map< ::grpc::string, ::grpc::string> vars) {
printer->Print("{\n");
printer->Print(vars, " [[self RPCTo$method_name$With");
if (method->client_streaming()) {
@ -139,7 +138,7 @@ void PrintSimpleImplementation(Printer *printer, const MethodDescriptor *method,
void PrintAdvancedImplementation(Printer *printer,
const MethodDescriptor *method,
map<string, string> vars) {
map< ::grpc::string, ::grpc::string> vars) {
printer->Print("{\n");
printer->Print(vars, " return [self RPCToMethod:@\"$method_name$\"\n");
@ -154,9 +153,9 @@ void PrintAdvancedImplementation(Printer *printer,
printer->Print(" responsesWriteable:[GRXWriteable ");
if (method->server_streaming()) {
printer->Print("writeableWithStreamHandler:eventHandler]];\n");
printer->Print("writeableWithEventHandler:eventHandler]];\n");
} else {
printer->Print("writeableWithSingleValueHandler:handler]];\n");
printer->Print("writeableWithSingleHandler:handler]];\n");
}
printer->Print("}\n");
@ -164,7 +163,7 @@ void PrintAdvancedImplementation(Printer *printer,
void PrintMethodImplementations(Printer *printer,
const MethodDescriptor *method) {
map<string, string> vars = GetMethodVars(method);
map< ::grpc::string, ::grpc::string> vars = GetMethodVars(method);
PrintProtoRpcDeclarationAsPragma(printer, method, vars);
@ -179,14 +178,14 @@ void PrintMethodImplementations(Printer *printer,
} // namespace
string GetHeader(const ServiceDescriptor *service) {
string output;
::grpc::string GetHeader(const ServiceDescriptor *service) {
::grpc::string output;
{
// Scope the output stream so it closes and finalizes output to the string.
grpc::protobuf::io::StringOutputStream output_stream(&output);
Printer printer(&output_stream, '$');
map<string, string> vars = {{"service_class", ServiceClassName(service)}};
map< ::grpc::string, ::grpc::string> vars = {{"service_class", ServiceClassName(service)}};
printer.Print(vars, "@protocol $service_class$ <NSObject>\n\n");
@ -209,14 +208,14 @@ string GetHeader(const ServiceDescriptor *service) {
return output;
}
string GetSource(const ServiceDescriptor *service) {
string output;
::grpc::string GetSource(const ServiceDescriptor *service) {
::grpc::string output;
{
// Scope the output stream so it closes and finalizes output to the string.
grpc::protobuf::io::StringOutputStream output_stream(&output);
Printer printer(&output_stream, '$');
map<string, string> vars = {{"service_name", service->name()},
map< ::grpc::string,::grpc::string> vars = {{"service_name", service->name()},
{"service_class", ServiceClassName(service)},
{"package", service->file()->package()}};

@ -39,44 +39,43 @@
#include "src/compiler/objective_c_generator.h"
#include "src/compiler/objective_c_generator_helpers.h"
using ::grpc::string;
class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
public:
ObjectiveCGrpcGenerator() {}
virtual ~ObjectiveCGrpcGenerator() {}
virtual bool Generate(const grpc::protobuf::FileDescriptor *file,
const string &parameter,
const ::grpc::string &parameter,
grpc::protobuf::compiler::GeneratorContext *context,
string *error) const {
::grpc::string *error) const {
if (file->service_count() == 0) {
// No services. Do nothing.
return true;
}
string file_name = grpc_generator::FileNameInUpperCamel(file);
string prefix = file->options().objc_class_prefix();
::grpc::string file_name = grpc_generator::FileNameInUpperCamel(file);
::grpc::string prefix = file->options().objc_class_prefix();
{
// Generate .pbrpc.h
string imports = string("#import \"") + file_name + ".pbobjc.h\"\n\n"
::grpc::string imports = ::grpc::string("#import \"") + file_name +
".pbobjc.h\"\n\n"
"#import <ProtoRPC/ProtoService.h>\n"
"#import <RxLibrary/GRXWriteable.h>\n"
"#import <RxLibrary/GRXWriter.h>\n";
// TODO(jcanizales): Instead forward-declare the input and output types
// and import the files in the .pbrpc.m
string proto_imports;
::grpc::string proto_imports;
for (int i = 0; i < file->dependency_count(); i++) {
string header = grpc_objective_c_generator::MessageHeaderName(
::grpc::string header = grpc_objective_c_generator::MessageHeaderName(
file->dependency(i));
proto_imports += string("#import \"") + header + "\"\n";
proto_imports += ::grpc::string("#import \"") + header + "\"\n";
}
string declarations;
::grpc::string declarations;
for (int i = 0; i < file->service_count(); i++) {
const grpc::protobuf::ServiceDescriptor *service = file->service(i);
declarations += grpc_objective_c_generator::GetHeader(service);
@ -89,11 +88,12 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
{
// Generate .pbrpc.m
string imports = string("#import \"") + file_name + ".pbrpc.h\"\n\n"
::grpc::string imports = ::grpc::string("#import \"") + file_name +
".pbrpc.h\"\n\n"
"#import <ProtoRPC/ProtoRPC.h>\n"
"#import <RxLibrary/GRXWriter+Immediate.h>\n";
string definitions;
::grpc::string definitions;
for (int i = 0; i < file->service_count(); i++) {
const grpc::protobuf::ServiceDescriptor *service = file->service(i);
definitions += grpc_objective_c_generator::GetSource(service);
@ -108,7 +108,7 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
private:
// Write the given code into the given file.
void Write(grpc::protobuf::compiler::GeneratorContext *context,
const string &filename, const string &code) const {
const ::grpc::string &filename, const ::grpc::string &code) const {
std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
context->Open(filename));
grpc::protobuf::io::CodedOutputStream coded_out(output.get());

@ -41,4 +41,4 @@
extern const grpc_channel_filter grpc_client_census_filter;
extern const grpc_channel_filter grpc_server_census_filter;
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CENSUS_FILTER_H */

@ -84,8 +84,10 @@ typedef struct {
grpc_pollset_set pollset_set;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a resolver,
to watch for state changes from the lb_policy. When a state change is seen, we
/** We create one watcher for each new lb_policy that is returned from a
resolver,
to watch for state changes from the lb_policy. When a state change is seen,
we
update the channel, and create a new watcher */
typedef struct {
channel_data *chand;
@ -380,7 +382,8 @@ static void perform_transport_stream_op(grpc_call_element *elem,
if (lb_policy) {
grpc_transport_stream_op *op = &calld->waiting_op;
grpc_pollset *bind_pollset = op->bind_pollset;
grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata;
grpc_metadata_batch *initial_metadata =
&op->send_ops->ops[0].data.metadata;
GRPC_LB_POLICY_REF(lb_policy, "pick");
gpr_mu_unlock(&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK;
@ -388,13 +391,14 @@ static void perform_transport_stream_op(grpc_call_element *elem,
GPR_ASSERT(op->bind_pollset);
GPR_ASSERT(op->send_ops);
GPR_ASSERT(op->send_ops->nops >= 1);
GPR_ASSERT(
op->send_ops->ops[0].type == GRPC_OP_METADATA);
GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
grpc_iomgr_closure_init(&calld->async_setup_task, picked_target,
calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
&calld->picked_channel, &calld->async_setup_task);
&calld->picked_channel,
&calld->async_setup_task);
GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else if (chand->resolver != NULL) {
@ -430,7 +434,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
perform_transport_stream_op(elem, op, 0);
}
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state);
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
@ -450,7 +455,8 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
gpr_free(w);
}
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) {
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
@ -499,13 +505,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
grpc_connectivity_state_set(&chand->state_tracker, state,
"new_lb+resolver");
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
grpc_connectivity_state_set(&chand->state_tracker, state,
"new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(chand, lb_policy, state);
}
@ -527,6 +533,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
if (old_lb_policy != NULL) {
grpc_lb_policy_shutdown(old_lb_policy);
GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
@ -662,7 +669,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand);
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
}
/* Destructor for channel_data */
@ -746,19 +754,20 @@ void grpc_client_channel_watch_connectivity_state(
gpr_mu_unlock(&chand->mu_config);
}
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) {
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
return &chand->pollset_set;
}
void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
grpc_pollset *pollset) {
grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
}
void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
grpc_pollset *pollset) {
grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
}

@ -59,11 +59,12 @@ void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete);
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem);
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
grpc_channel_element *elem);
void grpc_client_channel_add_interested_party(grpc_channel_element *channel,
grpc_pollset *pollset);
grpc_pollset *pollset);
void grpc_client_channel_del_interested_party(grpc_channel_element *channel,
grpc_pollset *pollset);
grpc_pollset *pollset);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */

@ -53,7 +53,7 @@ typedef struct call_data {
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
/** If true, contents of \a compression_algorithm are authoritative */
/** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
} call_data;
@ -80,7 +80,7 @@ typedef struct channel_data {
*
* Returns 1 if the data was actually compress and 0 otherwise. */
static int compress_send_sb(grpc_compression_algorithm algorithm,
gpr_slice_buffer *slices) {
gpr_slice_buffer *slices) {
int did_compress;
gpr_slice_buffer tmp;
gpr_slice_buffer_init(&tmp);
@ -95,7 +95,7 @@ static int compress_send_sb(grpc_compression_algorithm algorithm,
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@ -127,10 +127,10 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
static int skip_compression(channel_data *channeld, call_data *calld) {
if (calld->has_compression_algorithm) {
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
return 1;
}
return 0; /* we have an actual call-specific algorithm */
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
return 1;
}
return 0; /* we have an actual call-specific algorithm */
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
@ -203,7 +203,7 @@ static void process_send_ops(grpc_call_element *elem,
* given by GRPC_OP_BEGIN_MESSAGE) */
calld->remaining_slice_bytes = sop->data.begin_message.length;
if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
calld->has_compression_algorithm = 1; /* GPR_TRUE */
calld->has_compression_algorithm = 1; /* GPR_TRUE */
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
break;
@ -228,7 +228,7 @@ static void process_send_ops(grpc_call_element *elem,
[calld->compression_algorithm]));
/* convey supported compression algorithms */
grpc_metadata_batch_add_head(
grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->accept_encoding_storage,
GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
@ -305,7 +305,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
const char* supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT-1];
const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
char *accept_encoding_str;
size_t accept_encoding_str_len;
@ -344,23 +344,19 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorithm_name, 0));
if (algo_idx > 0) {
supported_algorithms_names[algo_idx-1] = algorithm_name;
supported_algorithms_names[algo_idx - 1] = algorithm_name;
}
}
/* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
* arrays, as to avoid the heap allocs */
accept_encoding_str =
gpr_strjoin_sep(supported_algorithms_names,
GPR_ARRAY_SIZE(supported_algorithms_names),
", ",
&accept_encoding_str_len);
channeld->mdelem_accept_encoding =
grpc_mdelem_from_metadata_strings(
mdctx,
GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
grpc_mdstr_from_string(mdctx, accept_encoding_str, 0));
accept_encoding_str = gpr_strjoin_sep(
supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names),
", ", &accept_encoding_str_len);
channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
grpc_mdstr_from_string(mdctx, accept_encoding_str, 0));
gpr_free(accept_encoding_str);
GPR_ASSERT(!is_last);
@ -374,8 +370,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
++algo_idx) {
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
}
GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding);

@ -62,4 +62,4 @@
extern const grpc_channel_filter grpc_compress_filter;
#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */
#endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */

@ -41,4 +41,4 @@ extern const grpc_channel_filter grpc_http_client_filter;
#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"
#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */
#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_CLIENT_FILTER_H */

@ -39,4 +39,4 @@
/* Processes metadata on the client side for HTTP2 transports */
extern const grpc_channel_filter grpc_http_server_filter;
#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */
#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_SERVER_FILTER_H */

@ -41,4 +41,4 @@
customize for their own filters */
extern const grpc_channel_filter grpc_no_op_filter;
#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */
#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */

@ -219,7 +219,8 @@ static grpc_resolver *dns_create(
default_host_arg.type = GRPC_ARG_STRING;
default_host_arg.key = GRPC_ARG_DEFAULT_AUTHORITY;
default_host_arg.value.string = host;
subchannel_factory = grpc_subchannel_factory_add_channel_arg(subchannel_factory, &default_host_arg);
subchannel_factory = grpc_subchannel_factory_add_channel_arg(
subchannel_factory, &default_host_arg);
gpr_free(host);
gpr_free(port);

@ -0,0 +1,501 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/client_config/resolvers/zookeeper_resolver.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include <grpc/grpc_zookeeper.h>
#include <zookeeper/zookeeper.h>
#include "src/core/client_config/lb_policies/pick_first.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/support/string.h"
#include "src/core/json/json.h"
/** Zookeeper session expiration time in milliseconds */
#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000
typedef struct {
/** base class: must be first */
grpc_resolver base;
/** refcount */
gpr_refcount refs;
/** name to resolve */
char *name;
/** subchannel factory */
grpc_subchannel_factory *subchannel_factory;
/** load balancing policy factory */
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels);
/** mutex guarding the rest of the state */
gpr_mu mu;
/** are we currently resolving? */
int resolving;
/** which version of resolved_config have we published? */
int published_version;
/** which version of resolved_config is current? */
int resolved_version;
/** pending next completion, or NULL */
grpc_iomgr_closure *next_completion;
/** target config address for next completion */
grpc_client_config **target_config;
/** current (fully resolved) config */
grpc_client_config *resolved_config;
/** zookeeper handle */
zhandle_t *zookeeper_handle;
/** zookeeper resolved addresses */
grpc_resolved_addresses *resolved_addrs;
/** total number of addresses to be resolved */
int resolved_total;
/** number of addresses resolved */
int resolved_num;
} zookeeper_resolver;
static void zookeeper_destroy(grpc_resolver *r);
static void zookeeper_start_resolving_locked(zookeeper_resolver *r);
static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r);
static void zookeeper_shutdown(grpc_resolver *r);
static void zookeeper_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
static const grpc_resolver_vtable zookeeper_resolver_vtable = {
zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error,
zookeeper_next};
static void zookeeper_shutdown(grpc_resolver *resolver) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
grpc_iomgr_add_callback(r->next_completion);
r->next_completion = NULL;
}
zookeeper_close(r->zookeeper_handle);
gpr_mu_unlock(&r->mu);
}
static void zookeeper_channel_saw_error(grpc_resolver *resolver,
struct sockaddr *sa, int len) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (r->resolving == 0) {
zookeeper_start_resolving_locked(r);
}
gpr_mu_unlock(&r->mu);
}
static void zookeeper_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->next_completion == NULL);
r->next_completion = on_complete;
r->target_config = target_config;
if (r->resolved_version == 0 && r->resolving == 0) {
zookeeper_start_resolving_locked(r);
} else {
zookeeper_maybe_finish_next_locked(r);
}
gpr_mu_unlock(&r->mu);
}
/** Zookeeper global watcher for connection management
TODO: better connection management besides logs */
static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type,
int state, const char *path,
void *watcher_ctx) {
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_EXPIRED_SESSION_STATE) {
gpr_log(GPR_ERROR, "Zookeeper session expired");
} else if (state == ZOO_AUTH_FAILED_STATE) {
gpr_log(GPR_ERROR, "Zookeeper authentication failed");
}
}
}
/** Zookeeper watcher triggered by changes to watched nodes
Once triggered, it tries to resolve again to get updated addresses */
static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
const char *path, void *watcher_ctx) {
if (watcher_ctx != NULL) {
zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx;
if (state == ZOO_CONNECTED_STATE) {
gpr_mu_lock(&r->mu);
if (r->resolving == 0) {
zookeeper_start_resolving_locked(r);
}
gpr_mu_unlock(&r->mu);
}
}
}
/** Callback function after getting all resolved addresses
Creates a subchannel for each address */
static void zookeeper_on_resolved(void *arg,
grpc_resolved_addresses *addresses) {
zookeeper_resolver *r = arg;
grpc_client_config *config = NULL;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
size_t i;
if (addresses != NULL) {
config = grpc_client_config_create();
subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
for (i = 0; i < addresses->naddrs; i++) {
memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
args.addr_len = addresses->addrs[i].len;
subchannels[i] = grpc_subchannel_factory_create_subchannel(
r->subchannel_factory, &args);
}
lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs);
grpc_client_config_set_lb_policy(config, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "construction");
grpc_resolved_addresses_destroy(addresses);
gpr_free(subchannels);
}
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving == 1);
r->resolving = 0;
if (r->resolved_config != NULL) {
grpc_client_config_unref(r->resolved_config);
}
r->resolved_config = config;
r->resolved_version++;
zookeeper_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving");
}
/** Callback function for each DNS resolved address */
static void zookeeper_dns_resolved(void *arg,
grpc_resolved_addresses *addresses) {
size_t i;
zookeeper_resolver *r = arg;
int resolve_done = 0;
gpr_mu_lock(&r->mu);
r->resolved_num++;
r->resolved_addrs->addrs =
gpr_realloc(r->resolved_addrs->addrs,
sizeof(grpc_resolved_address) *
(r->resolved_addrs->naddrs + addresses->naddrs));
for (i = 0; i < addresses->naddrs; i++) {
memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr,
addresses->addrs[i].addr, addresses->addrs[i].len);
r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len =
addresses->addrs[i].len;
}
r->resolved_addrs->naddrs += addresses->naddrs;
grpc_resolved_addresses_destroy(addresses);
/** Wait for all addresses to be resolved */
resolve_done = (r->resolved_num == r->resolved_total);
gpr_mu_unlock(&r->mu);
if (resolve_done) {
zookeeper_on_resolved(r, r->resolved_addrs);
}
}
/** Parses JSON format address of a zookeeper node */
static char *zookeeper_parse_address(const char *value, int value_len) {
grpc_json *json;
grpc_json *cur;
const char *host;
const char *port;
char *buffer;
char *address = NULL;
buffer = gpr_malloc(value_len);
memcpy(buffer, value, value_len);
json = grpc_json_parse_string_with_len(buffer, value_len);
if (json != NULL) {
host = NULL;
port = NULL;
for (cur = json->child; cur != NULL; cur = cur->next) {
if (!strcmp(cur->key, "host")) {
host = cur->value;
if (port != NULL) {
break;
}
} else if (!strcmp(cur->key, "port")) {
port = cur->value;
if (host != NULL) {
break;
}
}
}
if (host != NULL && port != NULL) {
gpr_asprintf(&address, "%s:%s", host, port);
}
grpc_json_destroy(json);
}
gpr_free(buffer);
return address;
}
static void zookeeper_get_children_node_completion(int rc, const char *value,
int value_len,
const struct Stat *stat,
const void *arg) {
char *address = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
int resolve_done = 0;
if (rc != 0) {
gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name);
return;
}
address = zookeeper_parse_address(value, value_len);
if (address != NULL) {
/** Further resolves address by DNS */
grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
gpr_free(address);
} else {
gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name);
gpr_mu_lock(&r->mu);
r->resolved_total--;
resolve_done = (r->resolved_num == r->resolved_total);
gpr_mu_unlock(&r->mu);
if (resolve_done) {
zookeeper_on_resolved(r, r->resolved_addrs);
}
}
}
static void zookeeper_get_children_completion(
int rc, const struct String_vector *children, const void *arg) {
char *path;
int status;
int i;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
if (rc != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
return;
}
if (children->count == 0) {
gpr_log(GPR_ERROR, "Error in resolving zookeeper address %s", r->name);
return;
}
r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
r->resolved_addrs->addrs = NULL;
r->resolved_addrs->naddrs = 0;
r->resolved_total = children->count;
/** TODO: Replace expensive heap allocation with stack
if we can get maximum length of zookeeper path */
for (i = 0; i < children->count; i++) {
gpr_asprintf(&path, "%s/%s", r->name, children->data[i]);
status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r,
zookeeper_get_children_node_completion, r);
gpr_free(path);
if (status != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path);
}
}
}
static void zookeeper_get_node_completion(int rc, const char *value,
int value_len,
const struct Stat *stat,
const void *arg) {
int status;
char *address = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
r->resolved_addrs = NULL;
r->resolved_total = 0;
r->resolved_num = 0;
if (rc != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
return;
}
/** If zookeeper node of path r->name does not have address
(i.e. service node), get its children */
address = zookeeper_parse_address(value, value_len);
if (address != NULL) {
r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
r->resolved_addrs->addrs = NULL;
r->resolved_addrs->naddrs = 0;
r->resolved_total = 1;
/** Further resolves address by DNS */
grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
gpr_free(address);
return;
}
status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher,
r, zookeeper_get_children_completion, r);
if (status != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
}
}
static void zookeeper_resolve_address(zookeeper_resolver *r) {
int status;
status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r,
zookeeper_get_node_completion, r);
if (status != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
}
}
static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving");
GPR_ASSERT(r->resolving == 0);
r->resolving = 1;
zookeeper_resolve_address(r);
}
static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config);
}
grpc_iomgr_add_callback(r->next_completion);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
}
static void zookeeper_destroy(grpc_resolver *gr) {
zookeeper_resolver *r = (zookeeper_resolver *)gr;
gpr_mu_destroy(&r->mu);
if (r->resolved_config != NULL) {
grpc_client_config_unref(r->resolved_config);
}
grpc_subchannel_factory_unref(r->subchannel_factory);
gpr_free(r->name);
gpr_free(r);
}
static grpc_resolver *zookeeper_create(
grpc_uri *uri,
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels),
grpc_subchannel_factory *subchannel_factory) {
zookeeper_resolver *r;
size_t length;
char *path = uri->path;
if (0 == strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "No authority specified in zookeeper uri");
return NULL;
}
/** Removes the trailing slash if exists */
length = strlen(path);
if (length > 1 && path[length - 1] == '/') {
path[length - 1] = 0;
}
r = gpr_malloc(sizeof(zookeeper_resolver));
memset(r, 0, sizeof(*r));
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path);
r->subchannel_factory = subchannel_factory;
r->lb_policy_factory = lb_policy_factory;
grpc_subchannel_factory_ref(subchannel_factory);
/** Initializes zookeeper client */
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher,
GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
if (r->zookeeper_handle == NULL) {
gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");
return NULL;
}
return &r->base;
}
static void zookeeper_plugin_init() {
grpc_register_resolver_type("zookeeper",
grpc_zookeeper_resolver_factory_create());
}
void grpc_zookeeper_register() {
grpc_register_plugin(zookeeper_plugin_init, NULL);
}
/*
* FACTORY
*/
static void zookeeper_factory_ref(grpc_resolver_factory *factory) {}
static void zookeeper_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *zookeeper_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory) {
return zookeeper_create(uri, grpc_create_pick_first_lb_policy,
subchannel_factory);
}
static const grpc_resolver_factory_vtable zookeeper_factory_vtable = {
zookeeper_factory_ref, zookeeper_factory_unref,
zookeeper_factory_create_resolver};
static grpc_resolver_factory zookeeper_resolver_factory = {
&zookeeper_factory_vtable};
grpc_resolver_factory *grpc_zookeeper_resolver_factory_create() {
return &zookeeper_resolver_factory;
}

@ -0,0 +1,42 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H
#include "src/core/client_config/resolver_factory.h"
/** Create a zookeeper resolver factory */
grpc_resolver_factory *grpc_zookeeper_resolver_factory_create(void);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */

@ -91,8 +91,10 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify);
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_subchannel *channel,
grpc_pollset *pollset);
/** stop following \a channel's activity through \a pollset. */
void grpc_subchannel_del_interested_party(grpc_subchannel *channel,
grpc_pollset *pollset);

@ -35,9 +35,9 @@
#include "src/core/client_config/subchannel_factory_decorators/merge_channel_args.h"
grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg(
grpc_subchannel_factory *input, const grpc_arg *arg) {
grpc_channel_args args;
args.num_args = 1;
args.args = (grpc_arg *)arg;
return grpc_subchannel_factory_merge_channel_args(input, &args);
grpc_subchannel_factory *input, const grpc_arg *arg) {
grpc_channel_args args;
args.num_args = 1;
args.args = (grpc_arg *)arg;
return grpc_subchannel_factory_merge_channel_args(input, &args);
}

@ -40,6 +40,7 @@
channel_args by adding a new argument; ownership of input, arg is retained
by the caller. */
grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg(
grpc_subchannel_factory *input, const grpc_arg *arg);
grpc_subchannel_factory *input, const grpc_arg *arg);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H */
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H \
*/

@ -50,7 +50,7 @@ static void merge_args_factory_ref(grpc_subchannel_factory *scf) {
static void merge_args_factory_unref(grpc_subchannel_factory *scf) {
merge_args_factory *f = (merge_args_factory *)scf;
if (gpr_unref(&f->refs)) {
grpc_subchannel_factory_unref(f->wrapped);
grpc_subchannel_factory_unref(f->wrapped);
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
@ -73,7 +73,7 @@ static const grpc_subchannel_factory_vtable merge_args_factory_vtable = {
merge_args_factory_create_subchannel};
grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args(
grpc_subchannel_factory *input, const grpc_channel_args *args) {
grpc_subchannel_factory *input, const grpc_channel_args *args) {
merge_args_factory *f = gpr_malloc(sizeof(*f));
f->base.vtable = &merge_args_factory_vtable;
gpr_ref_init(&f->refs, 1);

@ -40,6 +40,7 @@
channel_args by adding a new argument; ownership of input, args is retained
by the caller. */
grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args(
grpc_subchannel_factory *input, const grpc_channel_args *args);
grpc_subchannel_factory *input, const grpc_channel_args *args);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H */
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H \
*/

@ -37,7 +37,7 @@
#include <grpc/compression.h>
#include <grpc/support/useful.h>
int grpc_compression_algorithm_parse(const char* name, size_t name_length,
int grpc_compression_algorithm_parse(const char *name, size_t name_length,
grpc_compression_algorithm *algorithm) {
/* we use strncmp not only because it's safer (even though in this case it
* doesn't matter, given that we are comparing against string literals, but
@ -46,7 +46,7 @@ int grpc_compression_algorithm_parse(const char* name, size_t name_length,
if (name_length == 0) {
return 0;
}
if (strncmp(name, "none", name_length) == 0) {
if (strncmp(name, "identity", name_length) == 0) {
*algorithm = GRPC_COMPRESS_NONE;
} else if (strncmp(name, "gzip", name_length) == 0) {
*algorithm = GRPC_COMPRESS_GZIP;
@ -62,7 +62,7 @@ int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name) {
switch (algorithm) {
case GRPC_COMPRESS_NONE:
*name = "none";
*name = "identity";
break;
case GRPC_COMPRESS_DEFLATE:
*name = "deflate";

@ -61,8 +61,8 @@ static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
size_t np = n + 1;
char *s = gpr_malloc(end - beg + 1);
memcpy(s, beg, end - beg);
s[end-beg] = 0;
*ss = gpr_realloc(*ss, sizeof(char**) * np);
s[end - beg] = 0;
*ss = gpr_realloc(*ss, sizeof(char **) * np);
(*ss)[n] = s;
*ns = np;
}
@ -73,7 +73,7 @@ static void split(const char *s, char ***ss, size_t *ns) {
add(s, s + strlen(s), ss, ns);
} else {
add(s, c, ss, ns);
split(c+1, ss, ns);
split(c + 1, ss, ns);
}
}
@ -125,7 +125,7 @@ int grpc_tracer_set_enabled(const char *name, int enabled) {
}
if (!found) {
gpr_log(GPR_ERROR, "Unknown trace var: '%s'", name);
return 0; /* early return */
return 0; /* early return */
}
}
return 1;

@ -40,4 +40,4 @@ void grpc_register_tracer(const char *name, int *flag);
void grpc_tracer_init(const char *env_var_name);
void grpc_tracer_shutdown(void);
#endif /* GRPC_INTERNAL_CORE_DEBUG_TRACE_H */
#endif /* GRPC_INTERNAL_CORE_DEBUG_TRACE_H */

@ -43,7 +43,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec *buf) {
static void fill_common_header(const grpc_httpcli_request *request,
gpr_strvec *buf) {
size_t i;
gpr_strvec_add(buf, gpr_strdup(request->path));
gpr_strvec_add(buf, gpr_strdup(" HTTP/1.0\r\n"));
@ -52,7 +53,8 @@ static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec *
gpr_strvec_add(buf, gpr_strdup(request->host));
gpr_strvec_add(buf, gpr_strdup("\r\n"));
gpr_strvec_add(buf, gpr_strdup("Connection: close\r\n"));
gpr_strvec_add(buf, gpr_strdup("User-Agent: "GRPC_HTTPCLI_USER_AGENT"\r\n"));
gpr_strvec_add(buf,
gpr_strdup("User-Agent: " GRPC_HTTPCLI_USER_AGENT "\r\n"));
/* user supplied headers */
for (i = 0; i < request->hdr_count; i++) {
gpr_strvec_add(buf, gpr_strdup(request->hdrs[i].key));

@ -42,4 +42,4 @@ gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request,
const char *body_bytes,
size_t body_size);
#endif /* GRPC_INTERNAL_CORE_HTTPCLI_FORMAT_REQUEST_H */
#endif /* GRPC_INTERNAL_CORE_HTTPCLI_FORMAT_REQUEST_H */

@ -61,4 +61,4 @@ void grpc_httpcli_parser_destroy(grpc_httpcli_parser *parser);
int grpc_httpcli_parser_parse(grpc_httpcli_parser *parser, gpr_slice slice);
int grpc_httpcli_parser_eof(grpc_httpcli_parser *parser);
#endif /* GRPC_INTERNAL_CORE_HTTPCLI_PARSER_H */
#endif /* GRPC_INTERNAL_CORE_HTTPCLI_PARSER_H */

@ -105,8 +105,7 @@ void grpc_alarm_list_init(gpr_timespec now) {
void grpc_alarm_list_shutdown(void) {
int i;
while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL,
0))
while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, 0))
;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@ -362,7 +361,7 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_alarms(
drop_mu, now, next,
drop_mu, now, next,
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
}

@ -86,4 +86,4 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
Requires: cancel() must happen after add() on a given alarm */
void grpc_alarm_cancel(grpc_alarm *alarm);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */
#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */

@ -66,11 +66,11 @@ static void adjust_downwards(grpc_alarm **first, int i, int length,
int next_i;
if (left_child >= length) break;
right_child = left_child + 1;
next_i =
right_child < length && gpr_time_cmp(first[left_child]->deadline,
first[right_child]->deadline) < 0
? right_child
: left_child;
next_i = right_child < length &&
gpr_time_cmp(first[left_child]->deadline,
first[right_child]->deadline) < 0
? right_child
: left_child;
if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break;
first[i] = first[next_i];
first[i]->heap_index = i;

@ -54,4 +54,4 @@ void grpc_alarm_heap_pop(grpc_alarm_heap *heap);
int grpc_alarm_heap_is_empty(grpc_alarm_heap *heap);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_HEAP_H */
#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_HEAP_H */

@ -59,4 +59,4 @@ gpr_timespec grpc_alarm_list_next_timeout(void);
void grpc_kick_poller(void);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_INTERNAL_H */
#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_INTERNAL_H */

@ -50,7 +50,8 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
ep->vtable->add_to_pollset(ep, pollset);
}
void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) {
void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep,
grpc_pollset_set *pollset_set) {
ep->vtable->add_to_pollset_set(ep, pollset_set);
}

@ -103,10 +103,11 @@ void grpc_endpoint_destroy(grpc_endpoint *ep);
/* Add an endpoint to a pollset, so that when the pollset is polled, events from
this endpoint are considered */
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset);
void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set);
void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep,
grpc_pollset_set *pollset_set);
struct grpc_endpoint {
const grpc_endpoint_vtable *vtable;
};
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H */
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H */

@ -44,4 +44,4 @@ typedef struct {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */

@ -52,21 +52,26 @@ static void create_sockets(SOCKET sv[2]) {
SOCKADDR_IN addr;
int addr_len = sizeof(addr);
lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
lst_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
GPR_ASSERT(lst_sock != INVALID_SOCKET);
memset(&addr, 0, sizeof(addr));
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_family = AF_INET;
GPR_ASSERT(bind(lst_sock, (struct sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR);
GPR_ASSERT(bind(lst_sock, (struct sockaddr *)&addr, sizeof(addr)) !=
SOCKET_ERROR);
GPR_ASSERT(listen(lst_sock, SOMAXCONN) != SOCKET_ERROR);
GPR_ASSERT(getsockname(lst_sock, (struct sockaddr*)&addr, &addr_len) != SOCKET_ERROR);
GPR_ASSERT(getsockname(lst_sock, (struct sockaddr *)&addr, &addr_len) !=
SOCKET_ERROR);
cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
GPR_ASSERT(cli_sock != INVALID_SOCKET);
GPR_ASSERT(WSAConnect(cli_sock, (struct sockaddr*)&addr, addr_len, NULL, NULL, NULL, NULL) == 0);
svr_sock = accept(lst_sock, (struct sockaddr*)&addr, &addr_len);
GPR_ASSERT(WSAConnect(cli_sock, (struct sockaddr *)&addr, addr_len, NULL,
NULL, NULL, NULL) == 0);
svr_sock = accept(lst_sock, (struct sockaddr *)&addr, &addr_len);
GPR_ASSERT(svr_sock != INVALID_SOCKET);
closesocket(lst_sock);
@ -77,7 +82,8 @@ static void create_sockets(SOCKET sv[2]) {
sv[0] = svr_sock;
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read_slice_size) {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size) {
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);

@ -65,18 +65,17 @@ static void do_iocp_work() {
LPOVERLAPPED overlapped;
grpc_winsocket *socket;
grpc_winsocket_callback_info *info;
void(*f)(void *, int) = NULL;
void (*f)(void *, int) = NULL;
void *opaque = NULL;
success = GetQueuedCompletionStatus(g_iocp, &bytes,
&completion_key, &overlapped,
INFINITE);
success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key,
&overlapped, INFINITE);
/* success = 0 and overlapped = NULL means the deadline got attained.
Which is impossible. since our wait time is +inf */
GPR_ASSERT(success || overlapped);
GPR_ASSERT(completion_key && overlapped);
if (overlapped == &g_iocp_custom_overlap) {
gpr_atm_full_fetch_add(&g_custom_events, -1);
if (completion_key == (ULONG_PTR) &g_iocp_kick_token) {
if (completion_key == (ULONG_PTR)&g_iocp_kick_token) {
/* We were awoken from a kick. */
return;
}
@ -84,7 +83,7 @@ static void do_iocp_work() {
abort();
}
socket = (grpc_winsocket*) completion_key;
socket = (grpc_winsocket *)completion_key;
if (overlapped == &socket->write_info.overlapped) {
info = &socket->write_info;
} else if (overlapped == &socket->read_info.overlapped) {
@ -121,8 +120,7 @@ static void do_iocp_work() {
}
static void iocp_loop(void *p) {
while (gpr_atm_acq_load(&g_orphans) ||
gpr_atm_acq_load(&g_custom_events) ||
while (gpr_atm_acq_load(&g_orphans) || gpr_atm_acq_load(&g_custom_events) ||
!gpr_event_get(&g_shutdown_iocp)) {
grpc_maybe_call_delayed_callbacks(NULL, 1);
do_iocp_work();
@ -134,8 +132,8 @@ static void iocp_loop(void *p) {
void grpc_iocp_init(void) {
gpr_thd_id id;
g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
NULL, (ULONG_PTR)NULL, 0);
g_iocp =
CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0);
GPR_ASSERT(g_iocp);
gpr_event_init(&g_iocp_done);
@ -147,8 +145,7 @@ void grpc_iocp_kick(void) {
BOOL success;
gpr_atm_full_fetch_add(&g_custom_events, 1);
success = PostQueuedCompletionStatus(g_iocp, 0,
(ULONG_PTR) &g_iocp_kick_token,
success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR)&g_iocp_kick_token,
&g_iocp_custom_overlap);
GPR_ASSERT(success);
}
@ -165,8 +162,8 @@ void grpc_iocp_shutdown(void) {
void grpc_iocp_add_socket(grpc_winsocket *socket) {
HANDLE ret;
if (socket->added_to_iocp) return;
ret = CreateIoCompletionPort((HANDLE)socket->socket,
g_iocp, (gpr_uintptr) socket, 0);
ret = CreateIoCompletionPort((HANDLE)socket->socket, g_iocp,
(gpr_uintptr)socket, 0);
if (!ret) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
@ -189,7 +186,7 @@ void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_winsocket *socket,
void(*cb)(void *, int), void *opaque,
void (*cb)(void *, int), void *opaque,
grpc_winsocket_callback_info *info) {
int run_now = 0;
GPR_ASSERT(!info->cb);
@ -206,13 +203,13 @@ static void socket_notify_on_iocp(grpc_winsocket *socket,
}
void grpc_socket_notify_on_write(grpc_winsocket *socket,
void(*cb)(void *, int), void *opaque) {
void (*cb)(void *, int), void *opaque) {
socket_notify_on_iocp(socket, cb, opaque, &socket->write_info);
}
void grpc_socket_notify_on_read(grpc_winsocket *socket,
void(*cb)(void *, int), void *opaque) {
void grpc_socket_notify_on_read(grpc_winsocket *socket, void (*cb)(void *, int),
void *opaque) {
socket_notify_on_iocp(socket, cb, opaque, &socket->read_info);
}
#endif /* GPR_WINSOCK_SOCKET */
#endif /* GPR_WINSOCK_SOCKET */

@ -44,10 +44,10 @@ void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
void grpc_iocp_socket_orphan(grpc_winsocket *);
void grpc_socket_notify_on_write(grpc_winsocket *, void(*cb)(void *, int success),
void *opaque);
void grpc_socket_notify_on_write(grpc_winsocket *,
void (*cb)(void *, int success), void *opaque);
void grpc_socket_notify_on_read(grpc_winsocket *, void(*cb)(void *, int success),
void *opaque);
void grpc_socket_notify_on_read(grpc_winsocket *,
void (*cb)(void *, int success), void *opaque);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOCP_WINDOWS_H */
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOCP_WINDOWS_H */

@ -77,4 +77,4 @@ void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
argument. */
void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */

@ -52,4 +52,4 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);
void grpc_iomgr_platform_init(void);
void grpc_iomgr_platform_shutdown(void);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */

@ -51,4 +51,4 @@ void grpc_iomgr_platform_shutdown(void) {
grpc_fd_global_shutdown();
}
#endif /* GRPC_POSIX_SOCKET */
#endif /* GRPC_POSIX_SOCKET */

@ -39,4 +39,4 @@
void grpc_pollset_global_init(void);
void grpc_pollset_global_shutdown(void);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_POSIX_H */
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_POSIX_H */

@ -68,4 +68,4 @@ void grpc_iomgr_platform_shutdown(void) {
winsock_shutdown();
}
#endif /* GRPC_WINSOCK_SOCKET */
#endif /* GRPC_WINSOCK_SOCKET */

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save