Merge branch 'slice_with_exec_ctx' into metadata_filter

reviewable/pr8842/r2
Craig Tiller 8 years ago
commit ef6938477b
  1. 2
      .gitignore
  2. 194
      doc/negative-http2-interop-test-descriptions.md
  3. 6
      gRPC-Core.podspec
  4. 4
      gRPC-ProtoRPC.podspec
  5. 4
      gRPC-RxLibrary.podspec
  6. 4
      gRPC.podspec
  7. 2
      grpc.gemspec
  8. 12
      include/grpc/grpc.h
  9. 10
      include/grpc/impl/codegen/grpc_types.h
  10. 21
      setup.py
  11. 14
      src/compiler/python_generator.cc
  12. 7
      src/core/ext/census/grpc_filter.c
  13. 58
      src/core/ext/client_channel/client_channel.c
  14. 10
      src/core/ext/client_channel/client_channel.h
  15. 30
      src/core/ext/client_channel/client_channel_factory.c
  16. 6
      src/core/ext/client_channel/client_channel_factory.h
  17. 31
      src/core/ext/client_channel/http_connect_handshaker.c
  18. 5
      src/core/ext/client_channel/http_connect_handshaker.h
  19. 3
      src/core/ext/client_channel/lb_policy_factory.h
  20. 2
      src/core/ext/client_channel/resolver_factory.h
  21. 36
      src/core/ext/client_channel/resolver_registry.c
  22. 8
      src/core/ext/client_channel/resolver_registry.h
  23. 16
      src/core/ext/client_channel/subchannel.c
  24. 2
      src/core/ext/client_channel/subchannel.h
  25. 4
      src/core/ext/client_channel/subchannel_index.c
  26. 37
      src/core/ext/lb_policy/grpclb/grpclb.c
  27. 12
      src/core/ext/lb_policy/pick_first/pick_first.c
  28. 12
      src/core/ext/lb_policy/round_robin/round_robin.c
  29. 8
      src/core/ext/load_reporting/load_reporting_filter.c
  30. 21
      src/core/ext/resolver/dns/native/dns_resolver.c
  31. 7
      src/core/ext/resolver/sockaddr/sockaddr_resolver.c
  32. 11
      src/core/ext/transport/chttp2/client/chttp2_connector.c
  33. 3
      src/core/ext/transport/chttp2/client/chttp2_connector.h
  34. 33
      src/core/ext/transport/chttp2/client/insecure/channel_create.c
  35. 47
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
  36. 24
      src/core/lib/channel/channel_stack.c
  37. 23
      src/core/lib/channel/channel_stack.h
  38. 46
      src/core/lib/channel/channel_stack_builder.c
  39. 11
      src/core/lib/channel/channel_stack_builder.h
  40. 7
      src/core/lib/channel/compress_filter.c
  41. 7
      src/core/lib/channel/connected_channel.c
  42. 7
      src/core/lib/channel/deadline_filter.c
  43. 7
      src/core/lib/channel/http_client_filter.c
  44. 7
      src/core/lib/channel/http_server_filter.c
  45. 7
      src/core/lib/channel/message_size_filter.c
  46. 1
      src/core/lib/http/httpcli.c
  47. 2
      src/core/lib/iomgr/resolve_address.h
  48. 9
      src/core/lib/iomgr/resolve_address_posix.c
  49. 9
      src/core/lib/iomgr/resolve_address_uv.c
  50. 9
      src/core/lib/iomgr/resolve_address_windows.c
  51. 7
      src/core/lib/security/transport/client_auth_filter.c
  52. 5
      src/core/lib/security/transport/secure_endpoint.c
  53. 7
      src/core/lib/security/transport/server_auth_filter.c
  54. 124
      src/core/lib/surface/channel.c
  55. 7
      src/core/lib/surface/lame_client.c
  56. 7
      src/core/lib/surface/server.c
  57. 45
      src/cpp/common/channel_filter.h
  58. 26
      src/node/src/client.js
  59. 18
      src/node/src/server.js
  60. 8
      src/node/test/surface_test.js
  61. 7
      src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
  62. 2
      src/objective-c/GRPCClient/private/GRPCHost.m
  63. 8
      src/python/grpcio/grpc/__init__.py
  64. 128
      src/python/grpcio/grpc/_channel.py
  65. 8
      src/python/grpcio/support.py
  66. 153
      src/python/grpcio_tests/tests/http2/_negative_http2_client.py
  67. 22
      src/python/grpcio_tests/tests/interop/methods.py
  68. 1
      src/python/grpcio_tests/tests/tests.json
  69. 4
      src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py
  70. 175
      src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py
  71. 4
      src/python/grpcio_tests/tests/unit/beta/_utilities_test.py
  72. 1
      src/ruby/ext/grpc/rb_byte_buffer.c
  73. 28
      src/ruby/ext/grpc/rb_server.c
  74. 156
      src/ruby/lib/grpc/errors.rb
  75. 4
      src/ruby/lib/grpc/generic/active_call.rb
  76. 4
      src/ruby/lib/grpc/generic/bidi_call.rb
  77. 2
      src/ruby/lib/grpc/generic/rpc_desc.rb
  78. 5
      src/ruby/lib/grpc/generic/service.rb
  79. 4
      src/ruby/pb/grpc/health/checker.rb
  80. 7
      src/ruby/pb/test/client.rb
  81. 6
      src/ruby/qps/client.rb
  82. 64
      src/ruby/spec/error_sanity_spec.rb
  83. 9
      src/ruby/spec/generic/client_stub_spec.rb
  84. 16
      src/ruby/spec/generic/rpc_desc_spec.rb
  85. 8
      src/ruby/spec/generic/rpc_server_spec.rb
  86. 8
      src/ruby/spec/pb/health/checker_spec.rb
  87. 2
      src/ruby/spec/spec_helper.rb
  88. 6
      templates/gRPC-Core.podspec.template
  89. 2
      templates/grpc.gemspec.template
  90. 5
      templates/tools/dockerfile/clang_format.include
  91. 37
      templates/tools/dockerfile/grpc_clang_format/Dockerfile.template
  92. 12
      templates/tools/dockerfile/test/sanity/Dockerfile.template
  93. 0
      templates/tools/run_tests/generated/configs.json.template
  94. 0
      templates/tools/run_tests/generated/sources_and_headers.json.template
  95. 0
      templates/tools/run_tests/generated/tests.json.template
  96. 7
      test/core/channel/channel_stack_test.c
  97. 32
      test/core/client_channel/lb_policies_test.c
  98. 11
      test/core/client_channel/resolvers/dns_resolver_connectivity_test.c
  99. 5
      test/core/client_channel/resolvers/sockaddr_resolver_test.c
  100. 7
      test/core/end2end/fake_resolver.c
  101. Some files were not shown because too many files have changed in this diff Show More

2
.gitignore vendored

@ -96,7 +96,7 @@ DerivedData
Pods/
# Artifacts directory
artifacts/
/artifacts/
# Git generated files for conflicting
*.orig

@ -0,0 +1,194 @@
Negative HTTP/2 Interop Test Case Descriptions
=======================================
Client and server use
[test.proto](../src/proto/grpc/testing/test.proto).
Server
------
The code for the custom http2 server can be found
[here](https://github.com/grpc/grpc/tree/master/test/http2_test).
It is responsible for handling requests and sending responses, and also for
fulfilling the behavior of each particular test case.
Server should accept these arguments:
* --port=PORT
* The port the server will run on. For example, "8080"
* --test_case=TESTCASE
* The name of the test case to execute. For example, "goaway"
Client
------
Clients implement test cases that test certain functionality. Each client is
provided the test case it is expected to run as a command-line parameter. Names
should be lowercase and without spaces.
Clients should accept these arguments:
* --server_host=HOSTNAME
* The server host to connect to. For example, "localhost" or "127.0.0.1"
* --server_port=PORT
* The server port to connect to. For example, "8080"
* --test_case=TESTCASE
* The name of the test case to execute. For example, "goaway"
Note
-----
Note that the server and client must be invoked with the same test case or else
the test will be meaningless. For convenience, we provide a shell script wrapper
that invokes both server and client at the same time, with the same test_case.
This is the preferred way to run these tests.
## Test Cases
### goaway
This test verifies that the client correctly responds to a goaway sent by the
server. The client should handle the goaway by switching to a new stream without
the user application having to do a thing.
Client Procedure:
1. Client sends two UnaryCall requests with:
```
{
response_size: 314159
payload:{
body: 271828 bytes of zeros
}
}
```
Client asserts:
* Call was successful.
* Response payload body is 314159 bytes in size.
Server Procedure:
1. Server sends a GOAWAY after receiving the first UnaryCall.
Server asserts:
* The second UnaryCall has a different stream_id than the first one.
### rst_after_header
This test verifies that the client fails correctly when the server sends a
RST_STREAM immediately after sending headers to the client.
Procedure:
1. Client sends UnaryCall with:
```
{
response_size: 314159
payload:{
body: 271828 bytes of zeros
}
}
```
Client asserts:
* Call was not successful.
Server Procedure:
1. Server sends a RST_STREAM with error code 0 after sending headers to the client.
*At the moment the error code and message returned are not standardized throughout all
languages. Those checks will be added once all client languages behave the same way. [#9142](https://github.com/grpc/grpc/issues/9142) is in flight.*
### rst_during_data
This test verifies that the client fails "correctly" when the server sends a
RST_STREAM halfway through sending data to the client.
Procedure:
1. Client sends UnaryCall with:
```
{
response_size: 314159
payload:{
body: 271828 bytes of zeros
}
}
```
Client asserts:
* Call was not successful.
Server Procedure:
1. Server sends a RST_STREAM with error code 0 after sending half of
the requested data to the client.
### rst_after_data
This test verifies that the client fails "correctly" when the server sends a
RST_STREAM after sending all of the data to the client.
Procedure:
1. Client sends UnaryCall with:
```
{
response_size: 314159
payload:{
body: 271828 bytes of zeros
}
}
```
Client asserts:
* Call was not successful.
Server Procedure:
1. Server sends a RST_STREAM with error code 0 after sending all of the
data to the client.
*Certain client languages allow the data to be accessed even though a RST_STREAM
was encountered. Once all client languages behave this way, checks will be added on
the incoming data.*
### ping
This test verifies that the client correctly acknowledges all pings it gets from the
server.
Procedure:
1. Client sends UnaryCall with:
```
{
response_size: 314159
payload:{
body: 271828 bytes of zeros
}
}
```
Client asserts:
* call was successful.
* response payload body is 314159 bytes in size.
Server Procedure:
1. Server tracks the number of outstanding pings (i.e. +1 when it sends a ping, and -1
when it receives an ack from the client).
2. Server sends pings before and after sending headers, also before and after sending data.
Server Asserts:
* Number of outstanding pings is 0 when the connection is lost.
### max_streams
This test verifies that the client observes the MAX_CONCURRENT_STREAMS limit set by the server.
Client Procedure:
1. Client sends initial UnaryCall to allow the server to update its MAX_CONCURRENT_STREAMS settings.
2. Client concurrently sends 10 UnaryCalls.
Client Asserts:
* All UnaryCalls were successful, and had the correct type and payload size.
Server Procedure:
1. Sets MAX_CONCURRENT_STREAMS to one after the connection is made.
*The assertion that the MAX_CONCURRENT_STREAMS limit is upheld occurs in the http2 library we used.*

@ -35,7 +35,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-Core'
version = '1.0.1'
version = '1.0.2'
s.version = version
s.summary = 'Core cross-platform gRPC library, written in C'
s.homepage = 'http://www.grpc.io'
@ -44,7 +44,9 @@ Pod::Spec.new do |s|
s.source = {
:git => 'https://github.com/grpc/grpc.git',
:tag => "v#{version}",
# TODO(mxyan): Change back to "v#{version}" for next release
#:tag => "v#{version}",
:tag => "objective-c-v#{version}",
# TODO(jcanizales): Depend explicitly on the nanopb pod, and disable submodules.
:submodules => true,
}

@ -30,7 +30,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-ProtoRPC'
version = '1.0.1'
version = '1.0.2'
s.version = version
s.summary = 'RPC library for Protocol Buffers, based on gRPC'
s.homepage = 'http://www.grpc.io'
@ -39,7 +39,7 @@ Pod::Spec.new do |s|
s.source = {
:git => 'https://github.com/grpc/grpc.git',
:tag => "v#{version}",
:tag => "objective-c-v#{version}",
}
s.ios.deployment_target = '7.1'

@ -30,7 +30,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-RxLibrary'
version = '1.0.1'
version = '1.0.2'
s.version = version
s.summary = 'Reactive Extensions library for iOS/OSX.'
s.homepage = 'http://www.grpc.io'
@ -39,7 +39,7 @@ Pod::Spec.new do |s|
s.source = {
:git => 'https://github.com/grpc/grpc.git',
:tag => "v#{version}",
:tag => "objective-c-v#{version}",
}
s.ios.deployment_target = '7.1'

@ -30,7 +30,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC'
version = '1.0.1'
version = '1.0.2'
s.version = version
s.summary = 'gRPC client library for iOS/OSX'
s.homepage = 'http://www.grpc.io'
@ -39,7 +39,7 @@ Pod::Spec.new do |s|
s.source = {
:git => 'https://github.com/grpc/grpc.git',
:tag => "v#{version}",
:tag => "objective-c-v#{version}",
}
s.ios.deployment_target = '7.1'

@ -27,7 +27,7 @@ Gem::Specification.new do |s|
s.require_paths = %w( src/ruby/bin src/ruby/lib src/ruby/pb )
s.platform = Gem::Platform::RUBY
s.add_dependency 'google-protobuf', '~> 3.0.2'
s.add_dependency 'google-protobuf', '~> 3.1.0'
s.add_dependency 'googleauth', '~> 0.5.1'
s.add_development_dependency 'bundler', '~> 1.9'

@ -202,9 +202,15 @@ GRPCAPI grpc_call *grpc_channel_create_registered_call(
completion of type 'tag' to the completion queue bound to the call.
The order of ops specified in the batch has no significance.
Only one operation of each type can be active at once in any given
batch. You must call grpc_completion_queue_next or
grpc_completion_queue_pluck on the completion queue associated with 'call'
for work to be performed.
batch.
If a call to grpc_call_start_batch returns GRPC_CALL_OK you must call
grpc_completion_queue_next or grpc_completion_queue_pluck on the completion
queue associated with 'call' for work to be performed. If a call to
grpc_call_start_batch returns any value other than GRPC_CALL_OK it is
guaranteed that no state associated with 'call' is changed and it is not
appropriate to call grpc_completion_queue_next or
grpc_completion_queue_pluck consequent to the failed grpc_call_start_batch
call.
THREAD SAFETY: access to grpc_call_start_batch in multi-threaded environment
needs to be synchronized. As an optimization, you may synchronize batches
containing just send operations independently from batches containing just

@ -206,18 +206,12 @@ typedef struct {
/** If non-zero, allow the use of SO_REUSEPORT if it's available (default 1) */
#define GRPC_ARG_ALLOW_REUSEPORT "grpc.so_reuseport"
/** If non-zero, a pointer to a buffer pool (use grpc_resource_quota_arg_vtable
to fetch an appropriate pointer arg vtable */
to fetch an appropriate pointer arg vtable) */
#define GRPC_ARG_RESOURCE_QUOTA "grpc.resource_quota"
/** Service config data, to be passed to subchannels.
Not intended for external use. */
/** Service config data in JSON form. Not intended for use outside of tests. */
#define GRPC_ARG_SERVICE_CONFIG "grpc.service_config"
/** LB policy name. */
#define GRPC_ARG_LB_POLICY_NAME "grpc.lb_policy_name"
/** Server name. Not intended for external use. */
#define GRPC_ARG_SERVER_NAME "grpc.server_name"
/** Resolved addresses in a form used by the LB policy.
Not intended for external use. */
#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"
/** The grpc_socket_mutator instance that set the socket options. A pointer. */
#define GRPC_ARG_SOCKET_MUTATOR "grpc.socket_mutator"
/** \} */

@ -218,15 +218,18 @@ SETUP_REQUIRES = INSTALL_REQUIRES + (
'six>=1.10',
) if ENABLE_DOCUMENTATION_BUILD else ()
if BUILD_WITH_CYTHON:
sys.stderr.write(
"You requested a Cython build via GRPC_PYTHON_BUILD_WITH_CYTHON, "
"but do not have Cython installed. We won't stop you from using "
"other commands, but the extension files will fail to build.\n")
elif need_cython:
sys.stderr.write(
'We could not find Cython. Setup may take 10-20 minutes.\n')
SETUP_REQUIRES += ('cython>=0.23',)
try:
import Cython
except ImportError:
if BUILD_WITH_CYTHON:
sys.stderr.write(
"You requested a Cython build via GRPC_PYTHON_BUILD_WITH_CYTHON, "
"but do not have Cython installed. We won't stop you from using "
"other commands, but the extension files will fail to build.\n")
elif need_cython:
sys.stderr.write(
'We could not find Cython. Setup may take 10-20 minutes.\n')
SETUP_REQUIRES += ('cython>=0.23',)
COMMAND_CLASS = {
'doc': commands.SphinxDocumentation,

@ -40,6 +40,7 @@
#include <map>
#include <memory>
#include <ostream>
#include <set>
#include <sstream>
#include <tuple>
#include <vector>
@ -64,7 +65,9 @@ using std::make_pair;
using std::map;
using std::pair;
using std::replace;
using std::tuple;
using std::vector;
using std::set;
namespace grpc_python_generator {
@ -73,6 +76,8 @@ namespace {
typedef vector<const Descriptor*> DescriptorVector;
typedef map<grpc::string, grpc::string> StringMap;
typedef vector<grpc::string> StringVector;
typedef tuple<grpc::string, grpc::string> StringPair;
typedef set<StringPair> StringPairSet;
// Provides RAII indentation handling. Use as:
// {
@ -651,6 +656,7 @@ bool PrivateGenerator::PrintPreamble() {
"face_utilities\n");
if (generate_in_pb2_grpc) {
out->Print("\n");
StringPairSet imports_set;
for (int i = 0; i < file->service_count(); ++i) {
const ServiceDescriptor* service = file->service(i);
for (int j = 0; j < service->method_count(); ++j) {
@ -662,11 +668,15 @@ bool PrivateGenerator::PrintPreamble() {
grpc::string type_file_name = type->file()->name();
grpc::string module_name = ModuleName(type_file_name);
grpc::string module_alias = ModuleAlias(type_file_name);
out->Print("import $ModuleName$ as $ModuleAlias$\n", "ModuleName",
module_name, "ModuleAlias", module_alias);
imports_set.insert(std::make_tuple(module_name, module_alias));
}
}
}
for (StringPairSet::iterator it = imports_set.begin();
it != imports_set.end(); ++it) {
out->Print("import $ModuleName$ as $ModuleAlias$\n", "ModuleName",
std::get<0>(*it), "ModuleAlias", std::get<1>(*it));
}
}
return true;
}

@ -165,11 +165,12 @@ static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;
GPR_ASSERT(chand != NULL);
return GRPC_ERROR_NONE;
}
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,

@ -44,6 +44,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
@ -504,24 +505,39 @@ static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
}
/* Constructor for channel_data */
static void cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
// Initialize data members.
gpr_mu_init(&chand->mu);
chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
on_resolver_result_changed, chand);
chand->owning_stack = args->channel_stack;
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
chand->interested_parties = grpc_pollset_set_create();
// Record client channel factory.
const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
grpc_client_channel_factory_ref(arg->value.pointer.p);
chand->client_channel_factory = arg->value.pointer.p;
// Instantiate resolver.
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
chand->resolver =
grpc_resolver_create(exec_ctx, arg->value.string, args->channel_args,
chand->interested_parties);
if (chand->resolver == NULL) {
return GRPC_ERROR_CREATE("resolver creation failed");
}
return GRPC_ERROR_NONE;
}
/* Destructor for channel_data */
@ -1140,30 +1156,6 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
void grpc_client_channel_finish_initialization(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
grpc_resolver *resolver,
grpc_client_channel_factory *client_channel_factory) {
/* post construction initialization: set the transport setup pointer */
GPR_ASSERT(client_channel_factory != NULL);
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
GRPC_RESOLVER_REF(resolver, "channel");
if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
}
chand->client_channel_factory = client_channel_factory;
grpc_client_channel_factory_ref(client_channel_factory);
gpr_mu_unlock(&chand->mu);
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;

@ -38,6 +38,9 @@
#include "src/core/ext/client_channel/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
// Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri"
/* A client channel is a channel that begins disconnected, and can connect
to some endpoint on demand. If that endpoint disconnects, it will be
connected to again later.
@ -47,13 +50,6 @@
extern const grpc_channel_filter grpc_client_channel_filter;
/* Post-construction initializer to give the client channel its resolver
and factory. */
void grpc_client_channel_finish_initialization(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
grpc_resolver *resolver,
grpc_client_channel_factory *client_channel_factory);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);

@ -55,3 +55,33 @@ grpc_channel* grpc_client_channel_factory_create_channel(
return factory->vtable->create_client_channel(exec_ctx, factory, target, type,
args);
}
static void* factory_arg_copy(void* factory) {
grpc_client_channel_factory_ref(factory);
return factory;
}
static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) {
// TODO(roth): Remove local exec_ctx when
// https://github.com/grpc/grpc/pull/8705 is merged.
grpc_client_channel_factory_unref(exec_ctx, factory);
}
static int factory_arg_cmp(void* factory1, void* factory2) {
if (factory1 < factory2) return -1;
if (factory1 > factory2) return 1;
return 0;
}
static const grpc_arg_pointer_vtable factory_arg_vtable = {
factory_arg_copy, factory_arg_destroy, factory_arg_cmp};
grpc_arg grpc_client_channel_factory_create_channel_arg(
grpc_client_channel_factory* factory) {
grpc_arg arg;
arg.type = GRPC_ARG_POINTER;
arg.key = GRPC_ARG_CLIENT_CHANNEL_FACTORY;
arg.value.pointer.p = factory;
arg.value.pointer.vtable = &factory_arg_vtable;
return arg;
}

@ -39,6 +39,9 @@
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_stack.h"
// Channel arg key for client channel factory.
#define GRPC_ARG_CLIENT_CHANNEL_FACTORY "grpc.client_channel_factory"
typedef struct grpc_client_channel_factory grpc_client_channel_factory;
typedef struct grpc_client_channel_factory_vtable
grpc_client_channel_factory_vtable;
@ -83,4 +86,7 @@ grpc_channel *grpc_client_channel_factory_create_channel(
const char *target, grpc_client_channel_type type,
const grpc_channel_args *args);
grpc_arg grpc_client_channel_factory_create_channel_arg(
grpc_client_channel_factory *factory);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H */

@ -40,6 +40,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/format_request.h"
@ -52,7 +54,6 @@ typedef struct http_connect_handshaker {
grpc_handshaker base;
char* proxy_server;
char* server_name;
gpr_refcount refcount;
gpr_mu mu;
@ -88,7 +89,6 @@ static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx,
gpr_free(handshaker->read_buffer_to_destroy);
}
gpr_free(handshaker->proxy_server);
gpr_free(handshaker->server_name);
grpc_slice_buffer_destroy_internal(exec_ctx, &handshaker->write_buffer);
grpc_http_parser_destroy(&handshaker->http_parser);
grpc_http_response_destroy(&handshaker->http_response);
@ -268,18 +268,27 @@ static void http_connect_handshaker_do_handshake(
grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done,
grpc_handshaker_args* args) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
gpr_mu_lock(&handshaker->mu);
// Get server name from channel args.
const grpc_arg* arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
char* canonical_uri =
grpc_resolver_factory_add_default_prefix_if_needed(arg->value.string);
grpc_uri* uri = grpc_uri_parse(canonical_uri, 1);
char* server_name = uri->path;
if (server_name[0] == '/') ++server_name;
// Save state in the handshaker object.
gpr_mu_lock(&handshaker->mu);
handshaker->args = args;
handshaker->on_handshake_done = on_handshake_done;
// Send HTTP CONNECT request.
gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s",
handshaker->server_name, handshaker->proxy_server);
gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", server_name,
handshaker->proxy_server);
grpc_httpcli_request request;
memset(&request, 0, sizeof(request));
request.host = handshaker->proxy_server;
request.host = server_name;
request.http.method = "CONNECT";
request.http.path = handshaker->server_name;
request.http.path = server_name;
request.handshaker = &grpc_httpcli_plaintext;
grpc_slice request_slice = grpc_httpcli_format_connect_request(&request);
grpc_slice_buffer_add(&handshaker->write_buffer, request_slice);
@ -288,23 +297,23 @@ static void http_connect_handshaker_do_handshake(
grpc_endpoint_write(exec_ctx, args->endpoint, &handshaker->write_buffer,
&handshaker->request_done_closure);
gpr_mu_unlock(&handshaker->mu);
// Clean up.
gpr_free(canonical_uri);
grpc_uri_destroy(uri);
}
static const grpc_handshaker_vtable http_connect_handshaker_vtable = {
http_connect_handshaker_destroy, http_connect_handshaker_shutdown,
http_connect_handshaker_do_handshake};
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
const char* server_name) {
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server) {
GPR_ASSERT(proxy_server != NULL);
GPR_ASSERT(server_name != NULL);
http_connect_handshaker* handshaker = gpr_malloc(sizeof(*handshaker));
memset(handshaker, 0, sizeof(*handshaker));
grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
gpr_mu_init(&handshaker->mu);
gpr_ref_init(&handshaker->refcount, 1);
handshaker->proxy_server = gpr_strdup(proxy_server);
handshaker->server_name = gpr_strdup(server_name);
grpc_slice_buffer_init(&handshaker->write_buffer);
grpc_closure_init(&handshaker->request_done_closure, on_write_done,
handshaker);

@ -36,9 +36,8 @@
#include "src/core/lib/channel/handshaker.h"
/// Does NOT take ownership of \a proxy_server or \a server_name.
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server,
const char* server_name);
/// Does NOT take ownership of \a proxy_server.
grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server);
/// Returns the name of the proxy to use, or NULL if no proxy is configured.
/// Caller takes ownership of result.

@ -40,6 +40,9 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolve_address.h"
// Channel arg key for grpc_lb_addresses.
#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;

@ -37,6 +37,7 @@
#include "src/core/ext/client_channel/client_channel_factory.h"
#include "src/core/ext/client_channel/resolver.h"
#include "src/core/ext/client_channel/uri_parser.h"
#include "src/core/lib/iomgr/pollset_set.h"
typedef struct grpc_resolver_factory grpc_resolver_factory;
typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable;
@ -48,6 +49,7 @@ struct grpc_resolver_factory {
typedef struct grpc_resolver_args {
grpc_uri *uri;
const grpc_channel_args *args;
grpc_pollset_set *pollset_set;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable {

@ -109,8 +109,8 @@ static grpc_resolver_factory *lookup_factory_by_uri(grpc_uri *uri) {
}
static grpc_resolver_factory *resolve_factory(const char *target,
grpc_uri **uri) {
char *tmp;
grpc_uri **uri,
char **canonical_target) {
grpc_resolver_factory *factory = NULL;
GPR_ASSERT(uri != NULL);
@ -118,38 +118,54 @@ static grpc_resolver_factory *resolve_factory(const char *target,
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
grpc_uri_destroy(*uri);
gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target);
*uri = grpc_uri_parse(tmp, 1);
gpr_asprintf(canonical_target, "%s%s", g_default_resolver_prefix, target);
*uri = grpc_uri_parse(*canonical_target, 1);
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
grpc_uri_destroy(grpc_uri_parse(target, 0));
grpc_uri_destroy(grpc_uri_parse(tmp, 0));
gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, tmp);
grpc_uri_destroy(grpc_uri_parse(*canonical_target, 0));
gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target,
*canonical_target);
}
gpr_free(tmp);
}
return factory;
}
grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *args) {
const grpc_channel_args *args,
grpc_pollset_set *pollset_set) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
char *canonical_target = NULL;
grpc_resolver_factory *factory =
resolve_factory(target, &uri, &canonical_target);
grpc_resolver *resolver;
grpc_resolver_args resolver_args;
memset(&resolver_args, 0, sizeof(resolver_args));
resolver_args.uri = uri;
resolver_args.args = args;
resolver_args.pollset_set = pollset_set;
resolver =
grpc_resolver_factory_create_resolver(exec_ctx, factory, &resolver_args);
grpc_uri_destroy(uri);
gpr_free(canonical_target);
return resolver;
}
char *grpc_get_default_authority(const char *target) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
char *canonical_target = NULL;
grpc_resolver_factory *factory =
resolve_factory(target, &uri, &canonical_target);
char *authority = grpc_resolver_factory_get_default_authority(factory, uri);
grpc_uri_destroy(uri);
gpr_free(canonical_target);
return authority;
}
char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target) {
grpc_uri *uri = NULL;
char *canonical_target = NULL;
resolve_factory(target, &uri, &canonical_target);
grpc_uri_destroy(uri);
return canonical_target == NULL ? gpr_strdup(target) : canonical_target;
}

@ -35,6 +35,7 @@
#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H
#include "src/core/ext/client_channel/resolver_factory.h"
#include "src/core/lib/iomgr/pollset_set.h"
void grpc_resolver_registry_init();
void grpc_resolver_registry_shutdown(void);
@ -61,7 +62,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
\a args is a set of channel arguments to be included in the result
(typically the set of arguments passed in from the client API). */
grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *args);
const grpc_channel_args *args,
grpc_pollset_set *pollset_set);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
@ -71,4 +73,8 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name);
representing the default authority to pass from a client. */
char *grpc_get_default_authority(const char *target);
/** Returns a newly allocated string containing \a target, adding the
default prefix if needed. */
char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */

@ -605,14 +605,20 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder_set_transport(builder,
c->connecting_result.transport);
if (grpc_channel_init_create_stack(exec_ctx, builder,
GRPC_CLIENT_SUBCHANNEL)) {
con = grpc_channel_stack_builder_finish(exec_ctx, builder, 0, 1,
connection_destroy, NULL);
} else {
if (!grpc_channel_init_create_stack(exec_ctx, builder,
GRPC_CLIENT_SUBCHANNEL)) {
grpc_channel_stack_builder_destroy(exec_ctx, builder);
abort(); /* TODO(ctiller): what to do here (previously we just crashed) */
}
grpc_error *error = grpc_channel_stack_builder_finish(
exec_ctx, builder, 0, 1, connection_destroy, NULL, (void **)&con);
if (error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(error);
gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", msg);
grpc_error_free_string(msg);
GRPC_ERROR_UNREF(error);
abort(); /* TODO(ctiller): what to do here? */
}
stk = CHANNEL_STACK_FROM_CONNECTION(con);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));

@ -164,8 +164,6 @@ struct grpc_subchannel_args {
size_t filter_count;
/** Channel arguments to be supplied to the newly created channel */
const grpc_channel_args *args;
/** Server name */
const char *server_name;
/** Address to connect to */
grpc_resolved_address *addr;
};

@ -86,7 +86,6 @@ static grpc_subchannel_key *create_key(
} else {
k->args.filters = NULL;
}
k->args.server_name = gpr_strdup(args->server_name);
k->args.addr = gpr_malloc(sizeof(grpc_resolved_address));
k->args.addr->len = args->addr->len;
if (k->args.addr->len > 0) {
@ -113,8 +112,6 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
if (c != 0) return c;
c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
c = strcmp(a->args.server_name, b->args.server_name);
if (c != 0) return c;
if (a->args.addr->len) {
c = memcmp(a->args.addr->addr, b->args.addr->addr, a->args.addr->len);
if (c != 0) return c;
@ -132,7 +129,6 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_connector_unref(exec_ctx, k->connector);
gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args);
gpr_free((void *)k->args.server_name);
gpr_free(k->args.addr);
gpr_free(k);
}

@ -106,6 +106,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/client_channel_factory.h"
#include "src/core/ext/client_channel/lb_policy_factory.h"
#include "src/core/ext/client_channel/lb_policy_registry.h"
@ -749,12 +750,6 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
/* Get server name. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
const char *server_name =
arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
/* Count the number of gRPC-LB addresses. There must be at least one.
* TODO(roth): For now, we ignore non-balancer addresses, but in the
* future, we may change the behavior such that we fall back to using
@ -762,7 +757,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* time, this should be changed to allow a list with no balancer addresses,
* since the resolver might fail to return a balancer address even when
* this is the right LB policy to use. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_grpclb_addrs = 0;
@ -774,13 +770,25 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
memset(glb_policy, 0, sizeof(*glb_policy));
/* Get server name. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
grpc_uri *uri = grpc_uri_parse(arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
glb_policy->server_name);
}
grpc_uri_destroy(uri);
/* All input addresses in addresses come from a resolver that claims
* they are LB services. It's the resolver's responsibility to make sure
* this
* policy is only instantiated and used in that case.
* this policy is only instantiated and used in that case.
*
* Create a client channel over them to communicate with a LB service */
glb_policy->server_name = gpr_strdup(server_name);
glb_policy->cc_factory = args->client_channel_factory;
glb_policy->args = grpc_channel_args_copy(args->args);
GPR_ASSERT(glb_policy->cc_factory != NULL);
@ -824,9 +832,14 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* We need the LB channel to return addresses with is_balancer=false
* so that it does not wind up recursively using the grpclb LB policy,
* as per the special case logic in client_channel.c.
*
* Finally, we also strip out the channel arg for the server URI,
* since that will be different for the LB channel than for the parent
* channel. (The client channel factory will re-add this arg with
* the right value.)
*/
static const char *keys_to_remove[] = {GRPC_ARG_LB_POLICY_NAME,
GRPC_ARG_LB_ADDRESSES};
static const char *keys_to_remove[] = {
GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
grpc_channel_args *new_args = grpc_channel_args_copy_and_remove(
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove));
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(

@ -438,15 +438,10 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL);
/* Get server name. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
const char *server_name =
arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
/* Find the number of backend addresses. We ignore balancer
* addresses, since we don't know how to handle them. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
@ -472,9 +467,6 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
/* server_name will be copied as part of the subchannel creation. This makes
* the copying of server_name (a borrowed pointer) OK. */
sc_args.server_name = server_name;
sc_args.addr = &addresses->addresses[i].address;
sc_args.args = args->args;

@ -703,15 +703,10 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL);
/* Get server name. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
const char *server_name =
arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
/* Find the number of backend addresses. We ignore balancer
* addresses, since we don't know how to handle them. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
@ -734,9 +729,6 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (addresses->addresses[i].is_balancer) continue;
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
/* server_name will be copied as part of the subchannel creation. This makes
* the copying of server_name (a borrowed pointer) OK. */
sc_args.server_name = server_name;
sc_args.addr = &addresses->addresses[i].address;
sc_args.args = args->args;

@ -150,9 +150,9 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
GPR_ASSERT(!args->is_last);
channel_data *chand = elem->channel_data;
@ -169,6 +169,8 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
NULL,
NULL};
*/
return GRPC_ERROR_NONE;
}
/* Destructor for channel data */

@ -61,6 +61,8 @@ typedef struct {
char *default_port;
/** channel args. */
grpc_channel_args *channel_args;
/** pollset_set to drive the name resolution process */
grpc_pollset_set *interested_parties;
/** mutex guarding the rest of the state */
gpr_mu mu;
@ -218,6 +220,7 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
r->resolving = true;
r->addresses = NULL;
grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port,
r->interested_parties,
grpc_closure_create(dns_on_resolved, r), &r->addresses);
}
@ -240,13 +243,15 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_result != NULL) {
grpc_channel_args_destroy(exec_ctx, r->resolved_result);
}
grpc_pollset_set_destroy(r->interested_parties);
gpr_free(r->name_to_resolve);
gpr_free(r->default_port);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r);
}
static grpc_resolver *dns_create(grpc_resolver_args *args,
static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx,
grpc_resolver_args *args,
const char *default_port) {
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based dns uri's not supported");
@ -264,12 +269,12 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name;
r->default_port = gpr_strdup(default_port);
grpc_arg server_name_arg;
server_name_arg.type = GRPC_ARG_STRING;
server_name_arg.key = GRPC_ARG_SERVER_NAME;
server_name_arg.value.string = (char *)path;
r->channel_args =
grpc_channel_args_copy_and_add(args->args, &server_name_arg, 1);
r->channel_args = grpc_channel_args_copy(args->args);
r->interested_parties = grpc_pollset_set_create();
if (args->pollset_set != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties,
args->pollset_set);
}
gpr_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS,
GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER,
GRPC_DNS_RECONNECT_JITTER,
@ -289,7 +294,7 @@ static void dns_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *dns_factory_create_resolver(
grpc_exec_ctx *exec_ctx, grpc_resolver_factory *factory,
grpc_resolver_args *args) {
return dns_create(args, "https");
return dns_create(exec_ctx, args, "https");
}
static char *dns_factory_get_default_host_name(grpc_resolver_factory *factory,

@ -200,12 +200,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
r->addresses = addresses;
grpc_arg server_name_arg;
server_name_arg.type = GRPC_ARG_STRING;
server_name_arg.key = GRPC_ARG_SERVER_NAME;
server_name_arg.value.string = args->uri->path;
r->channel_args =
grpc_channel_args_copy_and_add(args->args, &server_name_arg, 1);
r->channel_args = grpc_channel_args_copy(args->args);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
return &r->base;

@ -59,7 +59,6 @@ typedef struct {
bool shutdown;
bool connecting;
char *server_name;
grpc_chttp2_add_handshakers_func add_handshakers;
void *add_handshakers_user_data;
@ -90,7 +89,6 @@ static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx,
// If handshaking is not yet in progress, destroy the endpoint.
// Otherwise, the handshaker will do this for us.
if (c->endpoint != NULL) grpc_endpoint_destroy(exec_ctx, c->endpoint);
gpr_free(c->server_name);
gpr_free(c);
}
}
@ -156,9 +154,8 @@ static void start_handshake_locked(grpc_exec_ctx *exec_ctx,
c->handshake_mgr = grpc_handshake_manager_create();
char *proxy_name = grpc_get_http_proxy_server();
if (proxy_name != NULL) {
grpc_handshake_manager_add(
c->handshake_mgr,
grpc_http_connect_handshaker_create(proxy_name, c->server_name));
grpc_handshake_manager_add(c->handshake_mgr,
grpc_http_connect_handshaker_create(proxy_name));
gpr_free(proxy_name);
}
if (c->add_handshakers != NULL) {
@ -255,15 +252,13 @@ static const grpc_connector_vtable chttp2_connector_vtable = {
chttp2_connector_connect};
grpc_connector *grpc_chttp2_connector_create(
grpc_exec_ctx *exec_ctx, const char *server_name,
grpc_chttp2_add_handshakers_func add_handshakers,
grpc_exec_ctx *exec_ctx, grpc_chttp2_add_handshakers_func add_handshakers,
void *add_handshakers_user_data) {
chttp2_connector *c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c));
c->base.vtable = &chttp2_connector_vtable;
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
c->server_name = gpr_strdup(server_name);
c->add_handshakers = add_handshakers;
c->add_handshakers_user_data = add_handshakers_user_data;
return &c->base;

@ -45,8 +45,7 @@ typedef void (*grpc_chttp2_add_handshakers_func)(
/// If \a add_handshakers is non-NULL, it will be called with
/// \a add_handshakers_user_data to add handshakers.
grpc_connector* grpc_chttp2_connector_create(
grpc_exec_ctx* exec_ctx, const char* server_name,
grpc_chttp2_add_handshakers_func add_handshakers,
grpc_exec_ctx* exec_ctx, grpc_chttp2_add_handshakers_func add_handshakers,
void* add_handshakers_user_data);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */

@ -39,8 +39,8 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@ -54,8 +54,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
grpc_connector *connector = grpc_chttp2_connector_create(
exec_ctx, args->server_name, NULL /* add_handshakers */,
NULL /* user_data */);
exec_ctx, NULL /* add_handshakers */, NULL /* user_data */);
grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
grpc_connector_unref(exec_ctx, connector);
return s;
@ -65,17 +64,15 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
const grpc_channel_args *args) {
grpc_channel *channel =
grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
grpc_resolver *resolver = grpc_resolver_create(exec_ctx, target, args);
if (resolver == NULL) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel");
return NULL;
}
grpc_client_channel_finish_initialization(
exec_ctx, grpc_channel_get_channel_stack(channel), resolver, cc_factory);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
// Add channel arg containing the server URI.
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
arg.value.string = (char *)target;
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(exec_ctx, new_args);
return channel;
}
@ -101,8 +98,14 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
GPR_ASSERT(reserved == NULL);
grpc_client_channel_factory *factory =
(grpc_client_channel_factory *)&client_channel_factory;
// Add channel arg containing the client channel factory.
grpc_arg arg = grpc_client_channel_factory_create_channel_arg(factory);
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
// Create channel.
grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args);
&exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
// Clean up.
grpc_channel_args_destroy(&exec_ctx, new_args);
grpc_client_channel_factory_unref(&exec_ctx, factory);
grpc_exec_ctx_finish(&exec_ctx);
return channel != NULL ? channel : grpc_lame_client_channel_create(

@ -39,7 +39,6 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/security/credentials/credentials.h"
@ -80,7 +79,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
const grpc_subchannel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
grpc_connector *connector = grpc_chttp2_connector_create(
exec_ctx, args->server_name, add_handshakers, f->security_connector);
exec_ctx, add_handshakers, f->security_connector);
grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
grpc_connector_unref(exec_ctx, connector);
return s;
@ -90,18 +89,15 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
const grpc_channel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
grpc_channel *channel =
grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
grpc_resolver *resolver = grpc_resolver_create(exec_ctx, target, args);
if (resolver == NULL) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel");
return NULL;
}
grpc_client_channel_finish_initialization(
exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
// Add channel arg containing the server URI.
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
arg.value.string = (char *)target;
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(exec_ctx, new_args);
return channel;
}
@ -142,14 +138,6 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
return grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL, "Failed to create security connector.");
}
grpc_arg connector_arg =
grpc_security_connector_to_arg(&security_connector->base);
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1);
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(&exec_ctx, new_args_from_connector);
}
// Create client channel factory.
client_channel_factory *f = gpr_malloc(sizeof(*f));
memset(f, 0, sizeof(*f));
@ -158,13 +146,24 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
"grpc_secure_channel_create");
f->security_connector = security_connector;
// Add channel args containing the client channel factory and security
// connector.
grpc_arg new_args[2];
new_args[0] = grpc_client_channel_factory_create_channel_arg(&f->base);
new_args[1] = grpc_security_connector_to_arg(&security_connector->base);
grpc_channel_args *args_copy = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args,
new_args, GPR_ARRAY_SIZE(new_args));
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(&exec_ctx, new_args_from_connector);
}
// Create channel.
grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args_copy);
// Clean up.
GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &f->security_connector->base,
"secure_client_channel_factory_create_channel");
grpc_channel_args_destroy(&exec_ctx, new_args);
grpc_channel_args_destroy(&exec_ctx, args_copy);
grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
return channel; /* may be NULL */

@ -102,13 +102,11 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
return CALL_ELEMS_FROM_STACK(call_stack) + index;
}
void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *channel_args,
grpc_transport *optional_transport,
const char *name, grpc_channel_stack *stack) {
grpc_error *grpc_channel_stack_init(
grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count,
const grpc_channel_args *channel_args, grpc_transport *optional_transport,
const char *name, grpc_channel_stack *stack) {
size_t call_size =
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
@ -126,6 +124,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));
/* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE;
for (i = 0; i < filter_count; i++) {
args.channel_stack = stack;
args.channel_args = channel_args;
@ -134,7 +133,15 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
args.is_last = i == (filter_count - 1);
elems[i].filter = filters[i];
elems[i].channel_data = user_data;
elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args);
grpc_error *error =
elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args);
if (error != GRPC_ERROR_NONE) {
if (first_error == GRPC_ERROR_NONE) {
first_error = error;
} else {
GRPC_ERROR_UNREF(error);
}
}
user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
}
@ -144,6 +151,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
grpc_channel_stack_size(filters, filter_count));
stack->call_stack_size = call_size;
return first_error;
}
void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,

@ -34,6 +34,13 @@
#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
#define GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
//////////////////////////////////////////////////////////////////////////////
// IMPORTANT NOTE:
//
// When you update this API, please make the corresponding changes to
// the C++ API in src/cpp/common/channel_filter.{h,cc}
//////////////////////////////////////////////////////////////////////////////
/* A channel filter defines how operations on a channel are implemented.
Channel filters are chained together to create full channels, and if those
chains are linear, then channel stacks provide a mechanism to minimize
@ -146,8 +153,9 @@ typedef struct {
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */
void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_channel_element_args *args);
grpc_error *(*init_channel_elem)(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args);
/* Destroy per channel data.
The filter does not need to do any chaining */
void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
@ -214,12 +222,11 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i);
size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
size_t filter_count);
/* Initialize a channel stack given some filters */
void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
size_t filter_count, const grpc_channel_args *args,
grpc_transport *optional_transport,
const char *name, grpc_channel_stack *stack);
grpc_error *grpc_channel_stack_init(
grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count,
const grpc_channel_args *args, grpc_transport *optional_transport,
const char *name, grpc_channel_stack *stack);
/* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *stack);

@ -229,11 +229,10 @@ void grpc_channel_stack_builder_destroy(grpc_exec_ctx *exec_ctx,
gpr_free(builder);
}
void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder,
size_t prefix_bytes, int initial_refs,
grpc_iomgr_cb_func destroy,
void *destroy_arg) {
grpc_error *grpc_channel_stack_builder_finish(
grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder,
size_t prefix_bytes, int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg, void **result) {
// count the number of filters
size_t num_filters = 0;
for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
@ -252,28 +251,35 @@ void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx,
size_t channel_stack_size = grpc_channel_stack_size(filters, num_filters);
// allocate memory, with prefix_bytes followed by channel_stack_size
char *result = gpr_malloc(prefix_bytes + channel_stack_size);
*result = gpr_malloc(prefix_bytes + channel_stack_size);
// fetch a pointer to the channel stack
grpc_channel_stack *channel_stack =
(grpc_channel_stack *)(result + prefix_bytes);
(grpc_channel_stack *)((char *)(*result) + prefix_bytes);
// and initialize it
grpc_channel_stack_init(exec_ctx, initial_refs, destroy,
destroy_arg == NULL ? result : destroy_arg, filters,
num_filters, builder->args, builder->transport,
builder->name, channel_stack);
// run post-initialization functions
i = 0;
for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
if (p->init != NULL) {
p->init(channel_stack, grpc_channel_stack_element(channel_stack, i),
p->init_arg);
grpc_error *error = grpc_channel_stack_init(
exec_ctx, initial_refs, destroy,
destroy_arg == NULL ? *result : destroy_arg, filters, num_filters,
builder->args, builder->transport, builder->name, channel_stack);
if (error != GRPC_ERROR_NONE) {
grpc_channel_stack_destroy(exec_ctx, channel_stack);
gpr_free(*result);
*result = NULL;
} else {
// run post-initialization functions
i = 0;
for (filter_node *p = builder->begin.next; p != &builder->end;
p = p->next) {
if (p->init != NULL) {
p->init(channel_stack, grpc_channel_stack_element(channel_stack, i),
p->init_arg);
}
i++;
}
i++;
}
grpc_channel_stack_builder_destroy(exec_ctx, builder);
gpr_free((grpc_channel_filter **)filters);
return result;
return error;
}

@ -147,16 +147,15 @@ bool grpc_channel_stack_builder_append_filter(
void grpc_channel_stack_builder_iterator_destroy(
grpc_channel_stack_builder_iterator *iterator);
/// Destroy the builder, return the freshly minted channel stack
/// Destroy the builder, return the freshly minted channel stack in \a result.
/// Allocates \a prefix_bytes bytes before the channel stack
/// Returns the base pointer of the allocated block
/// \a initial_refs, \a destroy, \a destroy_arg are as per
/// grpc_channel_stack_init
void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder,
size_t prefix_bytes, int initial_refs,
grpc_iomgr_cb_func destroy,
void *destroy_arg);
grpc_error *grpc_channel_stack_builder_finish(
grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder,
size_t prefix_bytes, int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg, void **result);
/// Destroy the builder without creating a channel stack
void grpc_channel_stack_builder_destroy(grpc_exec_ctx *exec_ctx,

@ -295,9 +295,9 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
channel_data *channeld = elem->channel_data;
channeld->enabled_algorithms_bitset =
@ -325,6 +325,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
}
GPR_ASSERT(!args->is_last);
return GRPC_ERROR_NONE;
}
/* Destructor for channel data */

@ -114,12 +114,13 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
channel_data *cd = (channel_data *)elem->channel_data;
GPR_ASSERT(args->is_last);
cd->transport = NULL;
return GRPC_ERROR_NONE;
}
/* Destructor for channel_data */

@ -208,10 +208,11 @@ void grpc_deadline_state_client_start_transport_stream_op(
//
// Constructor for channel_data. Used for both client and server filters.
static void init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
return GRPC_ERROR_NONE;
}
// Destructor for channel_data. Used for both client and server filters.

@ -498,9 +498,9 @@ static grpc_slice user_agent_from_args(const grpc_channel_args *args,
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;
GPR_ASSERT(!args->is_last);
GPR_ASSERT(args->optional_transport != NULL);
@ -511,6 +511,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
exec_ctx, GRPC_MDSTR_USER_AGENT,
user_agent_from_args(args->channel_args,
args->optional_transport->vtable->name));
return GRPC_ERROR_NONE;
}
/* Destructor for channel data */

@ -360,10 +360,11 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
GPR_ASSERT(!args->is_last);
return GRPC_ERROR_NONE;
}
/* Destructor for channel data */

@ -196,9 +196,9 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void* ignored) {}
// Constructor for channel_data.
static void init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
channel_data* chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
@ -235,6 +235,7 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_service_config_destroy(service_config);
}
}
return GRPC_ERROR_NONE;
}
// Destructor for channel_data.

@ -279,6 +279,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
grpc_polling_entity_add_to_pollset_set(exec_ctx, req->pollent,
req->context->pollset_set);
grpc_resolve_address(exec_ctx, request->host, req->handshaker->default_port,
req->context->pollset_set,
grpc_closure_create(on_resolved, req), &req->addresses);
}

@ -36,6 +36,7 @@
#include <stddef.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#define GRPC_MAX_SOCKADDR_SIZE 128
@ -54,6 +55,7 @@ typedef struct {
/* TODO(ctiller): add a timeout here */
extern void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *addr,
const char *default_port,
grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addresses);
/* Destroy resolved addresses */

@ -181,6 +181,7 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
request *r = gpr_malloc(sizeof(request));
@ -192,9 +193,9 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port, grpc_closure *on_done,
grpc_resolved_addresses **addrs) =
resolve_address_impl;
void (*grpc_resolve_address)(
grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
grpc_pollset_set *interested_parties, grpc_closure *on_done,
grpc_resolved_addresses **addrs) = resolve_address_impl;
#endif

@ -181,6 +181,7 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
uv_getaddrinfo_t *req;
@ -223,9 +224,9 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
}
}
void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port, grpc_closure *on_done,
grpc_resolved_addresses **addrs) =
resolve_address_impl;
void (*grpc_resolve_address)(
grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
grpc_pollset_set *interested_parties, grpc_closure *on_done,
grpc_resolved_addresses **addrs) = resolve_address_impl;
#endif /* GRPC_UV */

@ -169,6 +169,7 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addresses) {
request *r = gpr_malloc(sizeof(request));
@ -180,9 +181,9 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port, grpc_closure *on_done,
grpc_resolved_addresses **addresses) =
resolve_address_impl;
void (*grpc_resolve_address)(
grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
grpc_pollset_set *interested_parties, grpc_closure *on_done,
grpc_resolved_addresses **addresses) = resolve_address_impl;
#endif

@ -345,9 +345,9 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
grpc_security_connector *sc =
grpc_find_security_connector_in_args(args->channel_args);
grpc_auth_context *auth_context =
@ -369,6 +369,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
sc, "client_auth_filter");
chand->auth_context =
GRPC_AUTH_CONTEXT_REF(auth_context, "client_auth_filter");
return GRPC_ERROR_NONE;
}
/* Destructor for channel data */

@ -373,7 +373,10 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
return grpc_endpoint_get_peer(ep->wrapped_ep);
}
static int endpoint_get_fd(grpc_endpoint *secure_ep) { return -1; }
static int endpoint_get_fd(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
return grpc_endpoint_get_fd(ep->wrapped_ep);
}
static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;

@ -242,9 +242,9 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
grpc_auth_context *auth_context =
grpc_find_auth_context_in_args(args->channel_args);
grpc_server_credentials *creds =
@ -260,6 +260,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->auth_context =
GRPC_AUTH_CONTEXT_REF(auth_context, "server_auth_filter");
chand->creds = grpc_server_credentials_ref(creds);
return GRPC_ERROR_NONE;
}
/* Destructor for channel data */

@ -86,92 +86,92 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *input_args,
grpc_channel_stack_type channel_stack_type,
grpc_transport *optional_transport) {
bool is_client = grpc_channel_stack_type_is_client(channel_stack_type);
grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder,
input_args);
grpc_channel_stack_builder_set_target(builder, target);
grpc_channel_stack_builder_set_transport(builder, optional_transport);
grpc_channel *channel;
grpc_channel_args *args;
if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) {
grpc_channel_stack_builder_destroy(exec_ctx, builder);
return NULL;
} else {
args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
channel = grpc_channel_stack_builder_finish(
exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL);
}
grpc_channel_args *args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
grpc_channel *channel;
grpc_error *error = grpc_channel_stack_builder_finish(
exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL,
(void **)&channel);
if (error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(error);
gpr_log(GPR_ERROR, "channel stack builder failed: %s", msg);
grpc_error_free_string(msg);
GRPC_ERROR_UNREF(error);
goto done;
}
memset(channel, 0, sizeof(*channel));
channel->target = gpr_strdup(target);
channel->is_client = is_client;
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
grpc_compression_options_init(&channel->compression_options);
if (args) {
for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) {
if (args->args[i].type != GRPC_ARG_STRING) {
gpr_log(GPR_ERROR, "%s ignored: it must be a string",
GRPC_ARG_DEFAULT_AUTHORITY);
} else {
if (!GRPC_MDISNULL(channel->default_authority)) {
/* setting this takes precedence over anything else */
GRPC_MDELEM_UNREF(exec_ctx, channel->default_authority);
}
channel->default_authority = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_intern(
grpc_slice_from_static_string(args->args[i].value.string)));
for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) {
if (args->args[i].type != GRPC_ARG_STRING) {
gpr_log(GPR_ERROR, "%s ignored: it must be a string",
GRPC_ARG_DEFAULT_AUTHORITY);
} else {
if (!GRPC_MDISNULL(channel->default_authority)) {
/* setting this takes precedence over anything else */
GRPC_MDELEM_UNREF(exec_ctx, channel->default_authority);
}
} else if (0 ==
strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
if (args->args[i].type != GRPC_ARG_STRING) {
gpr_log(GPR_ERROR, "%s ignored: it must be a string",
channel->default_authority = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_from_copied_string(args->args[i].value.string));
}
} else if (0 ==
strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
if (args->args[i].type != GRPC_ARG_STRING) {
gpr_log(GPR_ERROR, "%s ignored: it must be a string",
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
} else {
if (GRPC_MDISNULL(channel->default_authority)) {
/* other ways of setting this (notably ssl) take precedence */
gpr_log(GPR_ERROR,
"%s ignored: default host already set some other way",
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
} else {
if (!GRPC_MDISNULL(channel->default_authority)) {
/* other ways of setting this (notably ssl) take precedence */
gpr_log(GPR_ERROR,
"%s ignored: default host already set some other way",
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
} else {
channel->default_authority = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_intern(
grpc_slice_from_static_string(args->args[i].value.string)));
}
channel->default_authority = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_AUTHORITY,
grpc_slice_from_copied_string(args->args[i].value.string));
}
} else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) {
channel->compression_options.default_level.is_set = true;
GPR_ASSERT(args->args[i].value.integer >= 0 &&
args->args[i].value.integer < GRPC_COMPRESS_LEVEL_COUNT);
channel->compression_options.default_level.level =
(grpc_compression_level)args->args[i].value.integer;
} else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
channel->compression_options.default_algorithm.is_set = true;
GPR_ASSERT(args->args[i].value.integer >= 0 &&
args->args[i].value.integer <
GRPC_COMPRESS_ALGORITHMS_COUNT);
channel->compression_options.default_algorithm.algorithm =
(grpc_compression_algorithm)args->args[i].value.integer;
} else if (0 ==
strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
channel->compression_options.enabled_algorithms_bitset =
(uint32_t)args->args[i].value.integer |
0x1; /* always support no compression */
}
} else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) {
channel->compression_options.default_level.is_set = true;
GPR_ASSERT(args->args[i].value.integer >= 0 &&
args->args[i].value.integer < GRPC_COMPRESS_LEVEL_COUNT);
channel->compression_options.default_level.level =
(grpc_compression_level)args->args[i].value.integer;
} else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
channel->compression_options.default_algorithm.is_set = true;
GPR_ASSERT(args->args[i].value.integer >= 0 &&
args->args[i].value.integer < GRPC_COMPRESS_ALGORITHMS_COUNT);
channel->compression_options.default_algorithm.algorithm =
(grpc_compression_algorithm)args->args[i].value.integer;
} else if (0 ==
strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
channel->compression_options.enabled_algorithms_bitset =
(uint32_t)args->args[i].value.integer |
0x1; /* always support no compression */
}
grpc_channel_args_destroy(exec_ctx, args);
}
done:
grpc_channel_args_destroy(exec_ctx, args);
return channel;
}

@ -134,11 +134,12 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
gpr_free(and_free_memory);
}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
GPR_ASSERT(args->is_first);
GPR_ASSERT(args->is_last);
return GRPC_ERROR_NONE;
}
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,

@ -908,9 +908,9 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_unref(exec_ctx, chand->server);
}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;
GPR_ASSERT(args->is_first);
GPR_ASSERT(!args->is_last);
@ -921,6 +921,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->connectivity_state = GRPC_CHANNEL_IDLE;
grpc_closure_init(&chand->channel_connectivity_changed,
channel_connectivity_changed, chand);
return GRPC_ERROR_NONE;
}
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,

@ -217,12 +217,13 @@ class TransportStreamOp {
/// Represents channel data.
class ChannelData {
public:
virtual ~ChannelData() {
if (peer_) gpr_free((void *)peer_);
}
virtual ~ChannelData() {}
/// Caller does NOT take ownership of result.
const char *peer() const { return peer_; }
/// Initializes the call data.
virtual grpc_error *Init(grpc_exec_ctx *exec_ctx,
grpc_channel_element_args *args) {
return GRPC_ERROR_NONE;
}
// TODO(roth): Find a way to avoid passing elem into these methods.
@ -233,11 +234,7 @@ class ChannelData {
const grpc_channel_info *channel_info);
protected:
/// Takes ownership of \a peer.
ChannelData(const grpc_channel_args &args, const char *peer) : peer_(peer) {}
private:
const char *peer_;
ChannelData() {}
};
/// Represents call data.
@ -246,7 +243,10 @@ class CallData {
virtual ~CallData() {}
/// Initializes the call data.
virtual grpc_error *Init() { return GRPC_ERROR_NONE; }
virtual grpc_error *Init(grpc_exec_ctx *exec_ctx, ChannelData *channel_data,
grpc_call_element_args *args) {
return GRPC_ERROR_NONE;
}
// TODO(roth): Find a way to avoid passing elem into these methods.
@ -264,7 +264,7 @@ class CallData {
virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
protected:
explicit CallData(const ChannelData &) {}
CallData() {}
};
namespace internal {
@ -277,15 +277,11 @@ class ChannelFilter final {
public:
static const size_t channel_data_size = sizeof(ChannelDataType);
static void InitChannelElement(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
const char *peer =
args->optional_transport
? grpc_transport_get_peer(exec_ctx, args->optional_transport)
: nullptr;
// Construct the object in the already-allocated memory.
new (elem->channel_data) ChannelDataType(*args->channel_args, peer);
static grpc_error *InitChannelElement(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
ChannelDataType *channel_data = new (elem->channel_data) ChannelDataType();
return channel_data->Init(exec_ctx, args);
}
static void DestroyChannelElement(grpc_exec_ctx *exec_ctx,
@ -313,11 +309,10 @@ class ChannelFilter final {
static grpc_error *InitCallElement(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
const ChannelDataType &channel_data =
*(ChannelDataType *)elem->channel_data;
ChannelDataType *channel_data = (ChannelDataType *)elem->channel_data;
// Construct the object in the already-allocated memory.
CallDataType *call_data = new (elem->call_data) CallDataType(channel_data);
return call_data->Init();
CallDataType *call_data = new (elem->call_data) CallDataType();
return call_data->Init(exec_ctx, channel_data, args);
}
static void DestroyCallElement(grpc_exec_ctx *exec_ctx,

@ -99,7 +99,18 @@ function ClientWritableStream(call, serialize) {
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
var message = this.serialize(chunk);
var message;
try {
message = this.serialize(chunk);
} catch (e) {
/* Sending this error to the server and emitting it immediately on the
client may put the call in a slightly weird state on the client side,
but passing an object that causes a serialization failure is a misuse
of the API anyway, so that's OK. The primary purpose here is to give the
programmer a useful error and to stop the stream properly */
this.call.cancelWithStatus(grpc.status.INTERNAL, "Serialization failure");
callback(e);
}
if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we
* can get to checking that it is valid flags */
@ -184,14 +195,15 @@ function _emitStatusIfDone() {
} else {
status = this.received_status;
}
this.emit('status', status);
if (status.code !== grpc.status.OK) {
if (status.code === grpc.status.OK) {
this.push(null);
} else {
var error = new Error(status.details);
error.code = status.code;
error.metadata = status.metadata;
this.emit('error', error);
return;
}
this.emit('status', status);
}
}
@ -224,9 +236,11 @@ function _read(size) {
} catch (e) {
self._readsDone({code: grpc.status.INTERNAL,
details: 'Failed to parse server response'});
return;
}
if (data === null) {
self._readsDone();
return;
}
if (self.push(deserialized) && data !== null) {
var read_batch = {};
@ -396,6 +410,8 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
var status = response.status;
var error;
var deserialized;
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
if (status.code === grpc.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
@ -423,8 +439,6 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
args.callback(null, deserialized);
}
emitter.emit('status', status);
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
return emitter;
}

@ -127,7 +127,14 @@ function sendUnaryResponse(call, value, serialize, metadata, flags) {
(new Metadata())._getCoreRepresentation();
call.metadataSent = true;
}
var message = serialize(value);
var message;
try {
message = serialize(value);
} catch (e) {
e.code = grpc.status.INTERNAL;
handleError(e);
return;
}
message.grpcWriteFlags = flags;
end_batch[grpc.opType.SEND_MESSAGE] = message;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
@ -278,7 +285,14 @@ function _write(chunk, encoding, callback) {
(new Metadata())._getCoreRepresentation();
this.call.metadataSent = true;
}
var message = this.serialize(chunk);
var message;
try {
message = this.serialize(chunk);
} catch (e) {
e.code = grpc.status.INTERNAL;
callback(e);
return;
}
if (_.isFinite(encoding)) {
/* Attach the encoding if it is a finite number. This is the closest we
* can get to checking that it is valid flags */

@ -179,8 +179,8 @@ describe('Server.prototype.addProtoService', function() {
call.on('data', function(value) {
assert.fail('No messages expected');
});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.UNIMPLEMENTED);
call.on('error', function(err) {
assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
done();
});
});
@ -189,8 +189,8 @@ describe('Server.prototype.addProtoService', function() {
call.on('data', function(value) {
assert.fail('No messages expected');
});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.UNIMPLEMENTED);
call.on('error', function(err) {
assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
done();
});
call.end();

@ -36,7 +36,7 @@ Pod::Spec.new do |s|
# exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
# before them.
s.name = '!ProtoCompiler-gRPCPlugin'
v = '1.0.1'
v = '1.0.2'
s.version = v
s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.'
s.description = <<-DESC
@ -84,7 +84,10 @@ Pod::Spec.new do |s|
repo = 'grpc/grpc'
file = "grpc_objective_c_plugin-#{v}-macos-x86_64.zip"
s.source = {
:http => "https://github.com/#{repo}/releases/download/v#{v}/#{file}",
# TODO(mxyan): Change back to "https://github.com/#{repo}/releases/download/v#{v}/#{file}" for
# next release
# :http => "https://github.com/#{repo}/releases/download/v#{v}/#{file}",
:http => "https://github.com/#{repo}/releases/download/objective-c-v#{v}/#{file}",
# TODO(jcanizales): Add sha1 or sha256
# :sha1 => '??',
}

@ -50,7 +50,7 @@ NS_ASSUME_NONNULL_BEGIN
// TODO(jcanizales): Generate the version in a standalone header, from templates. Like
// templates/src/core/surface/version.c.template .
#define GRPC_OBJC_VERSION_STRING @"1.0.1"
#define GRPC_OBJC_VERSION_STRING @"1.0.2"
static NSMutableDictionary *kHostCache;

@ -768,8 +768,8 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
gRPC runtime to determine the status code of the RPC.
Args:
code: The integer status code of the RPC to be transmitted to the
invocation side of the RPC.
code: A StatusCode value to be transmitted to the invocation side of the
RPC as the status code of the RPC.
"""
raise NotImplementedError()
@ -781,8 +781,8 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
details to transmit.
Args:
details: The details string of the RPC to be transmitted to
the invocation side of the RPC.
details: A string to be transmitted to the invocation side of the RPC as
the status details of the RPC.
"""
raise NotImplementedError()

@ -36,8 +36,8 @@ import time
import grpc
from grpc import _common
from grpc import _grpcio_metadata
from grpc.framework.foundation import callable_util
from grpc._cython import cygrpc
from grpc.framework.foundation import callable_util
_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
@ -99,6 +99,22 @@ def _wait_once_until(condition, until):
else:
condition.wait(timeout=remaining)
_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
'Internal gRPC call error %d. ' +
'Please report to https://github.com/grpc/grpc/issues')
def _check_call_error(call_error, metadata):
if call_error == cygrpc.CallError.invalid_metadata:
raise ValueError('metadata was invalid: %s' % metadata)
elif call_error != cygrpc.CallError.ok:
raise ValueError(_INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
def _call_error_set_RPCstate(state, call_error, metadata):
if call_error == cygrpc.CallError.invalid_metadata:
_abort(state, grpc.StatusCode.INTERNAL, 'metadata was invalid: %s' % metadata)
else:
_abort(state, grpc.StatusCode.INTERNAL,
_INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error)
class _RPCState(object):
@ -358,7 +374,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
if self._state.callbacks is None:
return False
else:
self._state.callbacks.append(lambda: callback())
self._state.callbacks.append(callback)
return True
def initial_metadata(self):
@ -435,10 +451,10 @@ def _end_unary_response_blocking(state, with_call, deadline):
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def __init__(
self, channel, create_managed_call, method, request_serializer,
self, channel, managed_call, method, request_serializer,
response_deserializer):
self._channel = channel
self._create_managed_call = create_managed_call
self._managed_call = managed_call
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
@ -472,7 +488,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
None, 0, completion_queue, self._method, None, deadline_timespec)
if credentials is not None:
call.set_credentials(credentials._credentials)
call.start_client_batch(cygrpc.Operations(operations), None)
call_error = call.start_client_batch(cygrpc.Operations(operations), None)
_check_call_error(call_error, metadata)
_handle_event(completion_queue.poll(), state, self._response_deserializer)
return state, deadline
@ -490,23 +507,28 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
if rendezvous:
return rendezvous
else:
call = self._create_managed_call(
call, drive_call = self._managed_call(
None, 0, self._method, None, deadline_timespec)
if credentials is not None:
call.set_credentials(credentials._credentials)
event_handler = _event_handler(state, call, self._response_deserializer)
with state.condition:
call.start_client_batch(cygrpc.Operations(operations), event_handler)
call_error = call.start_client_batch(cygrpc.Operations(operations),
event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
return _Rendezvous(state, None, None, deadline)
drive_call()
return _Rendezvous(state, call, self._response_deserializer, deadline)
class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
def __init__(
self, channel, create_managed_call, method, request_serializer,
self, channel, managed_call, method, request_serializer,
response_deserializer):
self._channel = channel
self._create_managed_call = create_managed_call
self._managed_call = managed_call
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
@ -518,7 +540,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
raise rendezvous
else:
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
call = self._create_managed_call(
call, drive_call = self._managed_call(
None, 0, self._method, None, deadline_timespec)
if credentials is not None:
call.set_credentials(credentials._credentials)
@ -535,17 +557,22 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
)
call.start_client_batch(cygrpc.Operations(operations), event_handler)
call_error = call.start_client_batch(cygrpc.Operations(operations),
event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
return _Rendezvous(state, None, None, deadline)
drive_call()
return _Rendezvous(state, call, self._response_deserializer, deadline)
class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
def __init__(
self, channel, create_managed_call, method, request_serializer,
self, channel, managed_call, method, request_serializer,
response_deserializer):
self._channel = channel
self._create_managed_call = create_managed_call
self._managed_call = managed_call
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
@ -569,7 +596,8 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
)
call.start_client_batch(cygrpc.Operations(operations), None)
call_error = call.start_client_batch(cygrpc.Operations(operations), None)
_check_call_error(call_error, metadata)
_consume_request_iterator(
request_iterator, state, call, self._request_serializer)
while True:
@ -597,7 +625,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self, request_iterator, timeout=None, metadata=None, credentials=None):
deadline, deadline_timespec = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
call = self._create_managed_call(
call, drive_call = self._managed_call(
None, 0, self._method, None, deadline_timespec)
if credentials is not None:
call.set_credentials(credentials._credentials)
@ -613,7 +641,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
)
call.start_client_batch(cygrpc.Operations(operations), event_handler)
call_error = call.start_client_batch(cygrpc.Operations(operations),
event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
return _Rendezvous(state, None, None, deadline)
drive_call()
_consume_request_iterator(
request_iterator, state, call, self._request_serializer)
return _Rendezvous(state, call, self._response_deserializer, deadline)
@ -622,10 +655,10 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
def __init__(
self, channel, create_managed_call, method, request_serializer,
self, channel, managed_call, method, request_serializer,
response_deserializer):
self._channel = channel
self._create_managed_call = create_managed_call
self._managed_call = managed_call
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
@ -634,7 +667,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
self, request_iterator, timeout=None, metadata=None, credentials=None):
deadline, deadline_timespec = _deadline(timeout)
state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
call = self._create_managed_call(
call, drive_call = self._managed_call(
None, 0, self._method, None, deadline_timespec)
if credentials is not None:
call.set_credentials(credentials._credentials)
@ -649,7 +682,12 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
_common.cygrpc_metadata(metadata), _EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
)
call.start_client_batch(cygrpc.Operations(operations), event_handler)
call_error = call.start_client_batch(cygrpc.Operations(operations),
event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
return _Rendezvous(state, None, None, deadline)
drive_call()
_consume_request_iterator(
request_iterator, state, call, self._request_serializer)
return _Rendezvous(state, call, self._response_deserializer, deadline)
@ -687,16 +725,13 @@ def _run_channel_spin_thread(state):
channel_spin_thread.start()
def _create_channel_managed_call(state):
def create_channel_managed_call(parent, flags, method, host, deadline):
"""Creates a managed cygrpc.Call.
def _channel_managed_call_management(state):
def create(parent, flags, method, host, deadline):
"""Creates a managed cygrpc.Call and a function to call to drive it.
Callers of this function must conduct at least one operation on the returned
call. The tags associated with operations conducted on the returned call
must be no-argument callables that return None to indicate that this channel
should continue polling for events associated with the call and return the
call itself to indicate that no more events associated with the call will be
generated.
If operations are successfully added to the returned cygrpc.Call, the
returned function must be called. If operations are not successfully added
to the returned cygrpc.Call, the returned function must not be called.
Args:
parent: A cygrpc.Call to be used as the parent of the created call.
@ -706,18 +741,22 @@ def _create_channel_managed_call(state):
deadline: A cygrpc.Timespec to be the deadline of the created call.
Returns:
A cygrpc.Call with which to conduct an RPC.
A cygrpc.Call with which to conduct an RPC and a function to call if
operations are successfully started on the call.
"""
with state.lock:
call = state.channel.create_call(
parent, flags, state.completion_queue, method, host, deadline)
if state.managed_calls is None:
state.managed_calls = set((call,))
_run_channel_spin_thread(state)
else:
state.managed_calls.add(call)
return call
return create_channel_managed_call
call = state.channel.create_call(
parent, flags, state.completion_queue, method, host, deadline)
def drive():
with state.lock:
if state.managed_calls is None:
state.managed_calls = set((call,))
_run_channel_spin_thread(state)
else:
state.managed_calls.add(call)
return call, drive
return create
class _ChannelConnectivityState(object):
@ -847,6 +886,7 @@ def _options(options):
class Channel(grpc.Channel):
"""A cygrpc.Channel-backed implementation of grpc.Channel."""
def __init__(self, target, options, credentials):
"""Constructor.
@ -871,25 +911,25 @@ class Channel(grpc.Channel):
def unary_unary(
self, method, request_serializer=None, response_deserializer=None):
return _UnaryUnaryMultiCallable(
self._channel, _create_channel_managed_call(self._call_state),
self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer)
def unary_stream(
self, method, request_serializer=None, response_deserializer=None):
return _UnaryStreamMultiCallable(
self._channel, _create_channel_managed_call(self._call_state),
self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer)
def stream_unary(
self, method, request_serializer=None, response_deserializer=None):
return _StreamUnaryMultiCallable(
self._channel, _create_channel_managed_call(self._call_state),
self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer)
def stream_stream(
self, method, request_serializer=None, response_deserializer=None):
return _StreamStreamMultiCallable(
self._channel, _create_channel_managed_call(self._call_state),
self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer)
def __del__(self):

@ -100,9 +100,15 @@ def diagnose_compile_error(build_ext, error):
.format(source)
)
def diagnose_attribute_error(build_ext, error):
if any('_needs_stub' in arg for arg in error.args):
raise commands.CommandError(
"We expect a missing `_needs_stub` attribute from older versions of "
"setuptools. Consider upgrading setuptools.")
_ERROR_DIAGNOSES = {
errors.CompileError: diagnose_compile_error
errors.CompileError: diagnose_compile_error,
AttributeError: diagnose_attribute_error
}
def diagnose_build_ext_error(build_ext, error, formatted):

@ -0,0 +1,153 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""The Python client used to test negative http2 conditions."""
import argparse
import grpc
from src.proto.grpc.testing import test_pb2
from src.proto.grpc.testing import messages_pb2
def _validate_payload_type_and_length(response, expected_type, expected_length):
if response.payload.type is not expected_type:
raise ValueError(
'expected payload type %s, got %s' %
(expected_type, type(response.payload.type)))
elif len(response.payload.body) != expected_length:
raise ValueError(
'expected payload body size %d, got %d' %
(expected_length, len(response.payload.body)))
def _expect_status_code(call, expected_code):
if call.code() != expected_code:
raise ValueError(
'expected code %s, got %s' % (expected_code, call.code()))
def _expect_status_details(call, expected_details):
if call.details() != expected_details:
raise ValueError(
'expected message %s, got %s' % (expected_details, call.details()))
def _validate_status_code_and_details(call, expected_code, expected_details):
_expect_status_code(call, expected_code)
_expect_status_details(call, expected_details)
# common requests
_REQUEST_SIZE = 314159
_RESPONSE_SIZE = 271828
_SIMPLE_REQUEST = messages_pb2.SimpleRequest(
response_type=messages_pb2.COMPRESSABLE,
response_size=_RESPONSE_SIZE,
payload=messages_pb2.Payload(body=b'\x00' * _REQUEST_SIZE))
def _goaway(stub):
first_response = stub.UnaryCall(_SIMPLE_REQUEST)
_validate_payload_type_and_length(first_response,
messages_pb2.COMPRESSABLE, _RESPONSE_SIZE)
second_response = stub.UnaryCall(_SIMPLE_REQUEST)
_validate_payload_type_and_length(second_response,
messages_pb2.COMPRESSABLE, _RESPONSE_SIZE)
def _rst_after_header(stub):
resp_future = stub.UnaryCall.future(_SIMPLE_REQUEST)
_validate_status_code_and_details(resp_future, grpc.StatusCode.UNAVAILABLE, "")
def _rst_during_data(stub):
resp_future = stub.UnaryCall.future(_SIMPLE_REQUEST)
_validate_status_code_and_details(resp_future, grpc.StatusCode.UNKNOWN, "")
def _rst_after_data(stub):
resp_future = stub.UnaryCall.future(_SIMPLE_REQUEST)
_validate_payload_type_and_length(next(resp_future),
messages_pb2.COMPRESSABLE, _RESPONSE_SIZE)
_validate_status_code_and_details(resp_future, grpc.StatusCode.UNKNOWN, "")
def _ping(stub):
response = stub.UnaryCall(_SIMPLE_REQUEST)
_validate_payload_type_and_length(response,
messages_pb2.COMPRESSABLE, _RESPONSE_SIZE)
def _max_streams(stub):
# send one req to ensure server sets MAX_STREAMS
response = stub.UnaryCall(_SIMPLE_REQUEST)
_validate_payload_type_and_length(response,
messages_pb2.COMPRESSABLE, _RESPONSE_SIZE)
# give the streams a workout
futures = []
for _ in range(15):
futures.append(stub.UnaryCall.future(_SIMPLE_REQUEST))
for future in futures:
_validate_payload_type_and_length(future.result(),
messages_pb2.COMPRESSABLE, _RESPONSE_SIZE)
def _run_test_case(test_case, stub):
if test_case == 'goaway':
_goaway(stub)
elif test_case == 'rst_after_header':
_rst_after_header(stub)
elif test_case == 'rst_during_data':
_rst_during_data(stub)
elif test_case == 'rst_after_data':
_rst_after_data(stub)
elif test_case =='ping':
_ping(stub)
elif test_case == 'max_streams':
_max_streams(stub)
else:
raise ValueError("Invalid test case: %s" % test_case)
def _args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--server_host', help='the host to which to connect', type=str,
default="127.0.0.1")
parser.add_argument(
'--server_port', help='the port to which to connect', type=int,
default="8080")
parser.add_argument(
'--test_case', help='the test case to execute', type=str,
default="goaway")
return parser.parse_args()
def _stub(server_host, server_port):
target = '{}:{}'.format(server_host, server_port)
channel = grpc.insecure_channel(target)
return test_pb2.TestServiceStub(channel)
def main():
args = _args()
stub = _stub(args.server_host, args.server_port)
_run_test_case(args.test_case, stub)
if __name__ == '__main__':
main()

@ -33,7 +33,6 @@ import enum
import json
import os
import threading
import time
from oauth2client import client as oauth2client_client
@ -196,16 +195,6 @@ def _server_streaming(stub):
response, messages_pb2.COMPRESSABLE, sizes[index])
def _cancel_after_begin(stub):
sizes = (27182, 8, 1828, 45904,)
payloads = (messages_pb2.Payload(body=b'\x00' * size) for size in sizes)
requests = (messages_pb2.StreamingInputCallRequest(payload=payload)
for payload in payloads)
response_future = stub.StreamingInputCall.future(requests)
response_future.cancel()
if not response_future.cancelled():
raise ValueError('expected call to be cancelled')
class _Pipe(object):
@ -265,6 +254,16 @@ def _ping_pong(stub):
response, messages_pb2.COMPRESSABLE, response_size)
def _cancel_after_begin(stub):
with _Pipe() as pipe:
response_future = stub.StreamingInputCall.future(pipe)
response_future.cancel()
if not response_future.cancelled():
raise ValueError('expected cancelled method to return True')
if response_future.code() is not grpc.StatusCode.CANCELLED:
raise ValueError('expected status code CANCELLED')
def _cancel_after_first_response(stub):
request_response_sizes = (31415, 9, 2653, 58979,)
request_payload_sizes = (27182, 8, 1828, 45904,)
@ -302,7 +301,6 @@ def _timeout_on_sleeping_server(stub):
response_type=messages_pb2.COMPRESSABLE,
payload=messages_pb2.Payload(body=b'\x00' * request_payload_size))
pipe.add(request)
time.sleep(0.1)
try:
next(response_iterator)
except grpc.RpcError as rpc_error:

@ -27,6 +27,7 @@
"unit._cython.cygrpc_test.TypeSmokeTest",
"unit._empty_message_test.EmptyMessageTest",
"unit._exit_test.ExitTest",
"unit._invalid_metadata_test.InvalidMetadataTest",
"unit._metadata_code_details_test.MetadataCodeDetailsTest",
"unit._metadata_test.MetadataTest",
"unit._rpc_test.RPCTest",

@ -64,7 +64,7 @@ class ChannelReadyFutureTest(unittest.TestCase):
ready_future = grpc.channel_ready_future(channel)
ready_future.add_done_callback(callback.accept_value)
with self.assertRaises(grpc.FutureTimeoutError):
ready_future.result(test_constants.SHORT_TIMEOUT)
ready_future.result(timeout=test_constants.SHORT_TIMEOUT)
self.assertFalse(ready_future.cancelled())
self.assertFalse(ready_future.done())
self.assertTrue(ready_future.running())
@ -85,7 +85,7 @@ class ChannelReadyFutureTest(unittest.TestCase):
ready_future = grpc.channel_ready_future(channel)
ready_future.add_done_callback(callback.accept_value)
self.assertIsNone(ready_future.result(test_constants.SHORT_TIMEOUT))
self.assertIsNone(ready_future.result(timeout=test_constants.LONG_TIMEOUT))
value_passed_to_callback = callback.block_until_called()
self.assertIs(ready_future, value_passed_to_callback)
self.assertFalse(ready_future.cancelled())

@ -0,0 +1,175 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test of RPCs made against gRPC Python's application-layer API."""
import unittest
import grpc
from tests.unit.framework.common import test_constants
_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:]
_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3]
_UNARY_UNARY = '/test/UnaryUnary'
_UNARY_STREAM = '/test/UnaryStream'
_STREAM_UNARY = '/test/StreamUnary'
_STREAM_STREAM = '/test/StreamStream'
def _unary_unary_multi_callable(channel):
return channel.unary_unary(_UNARY_UNARY)
def _unary_stream_multi_callable(channel):
return channel.unary_stream(
_UNARY_STREAM,
request_serializer=_SERIALIZE_REQUEST,
response_deserializer=_DESERIALIZE_RESPONSE)
def _stream_unary_multi_callable(channel):
return channel.stream_unary(
_STREAM_UNARY,
request_serializer=_SERIALIZE_REQUEST,
response_deserializer=_DESERIALIZE_RESPONSE)
def _stream_stream_multi_callable(channel):
return channel.stream_stream(_STREAM_STREAM)
class InvalidMetadataTest(unittest.TestCase):
def setUp(self):
self._channel = grpc.insecure_channel('localhost:8080')
self._unary_unary = _unary_unary_multi_callable(self._channel)
self._unary_stream = _unary_stream_multi_callable(self._channel)
self._stream_unary = _stream_unary_multi_callable(self._channel)
self._stream_stream = _stream_stream_multi_callable(self._channel)
def testUnaryRequestBlockingUnaryResponse(self):
request = b'\x07\x08'
metadata = (('InVaLiD', 'UnaryRequestBlockingUnaryResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
with self.assertRaises(ValueError) as exception_context:
self._unary_unary(request, metadata=metadata)
self.assertIn(expected_error_details, str(exception_context.exception))
def testUnaryRequestBlockingUnaryResponseWithCall(self):
request = b'\x07\x08'
metadata = (('InVaLiD', 'UnaryRequestBlockingUnaryResponseWithCall'),)
expected_error_details = "metadata was invalid: %s" % metadata
with self.assertRaises(ValueError) as exception_context:
self._unary_unary.with_call(request, metadata=metadata)
self.assertIn(expected_error_details, str(exception_context.exception))
def testUnaryRequestFutureUnaryResponse(self):
request = b'\x07\x08'
metadata = (('InVaLiD', 'UnaryRequestFutureUnaryResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
response_future = self._unary_unary.future(request, metadata=metadata)
with self.assertRaises(grpc.RpcError) as exception_context:
response_future.result()
self.assertEqual(
exception_context.exception.details(), expected_error_details)
self.assertEqual(
exception_context.exception.code(), grpc.StatusCode.INTERNAL)
self.assertEqual(response_future.details(), expected_error_details)
self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL)
def testUnaryRequestStreamResponse(self):
request = b'\x37\x58'
metadata = (('InVaLiD', 'UnaryRequestStreamResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
response_iterator = self._unary_stream(request, metadata=metadata)
with self.assertRaises(grpc.RpcError) as exception_context:
next(response_iterator)
self.assertEqual(
exception_context.exception.details(), expected_error_details)
self.assertEqual(
exception_context.exception.code(), grpc.StatusCode.INTERNAL)
self.assertEqual(response_iterator.details(), expected_error_details)
self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL)
def testStreamRequestBlockingUnaryResponse(self):
request_iterator = (b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
metadata = (('InVaLiD', 'StreamRequestBlockingUnaryResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
with self.assertRaises(ValueError) as exception_context:
self._stream_unary(request_iterator, metadata=metadata)
self.assertIn(expected_error_details, str(exception_context.exception))
def testStreamRequestBlockingUnaryResponseWithCall(self):
request_iterator = (
b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
metadata = (('InVaLiD', 'StreamRequestBlockingUnaryResponseWithCall'),)
expected_error_details = "metadata was invalid: %s" % metadata
multi_callable = _stream_unary_multi_callable(self._channel)
with self.assertRaises(ValueError) as exception_context:
multi_callable.with_call(request_iterator, metadata=metadata)
self.assertIn(expected_error_details, str(exception_context.exception))
def testStreamRequestFutureUnaryResponse(self):
request_iterator = (
b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
metadata = (('InVaLiD', 'StreamRequestFutureUnaryResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
response_future = self._stream_unary.future(
request_iterator, metadata=metadata)
with self.assertRaises(grpc.RpcError) as exception_context:
response_future.result()
self.assertEqual(
exception_context.exception.details(), expected_error_details)
self.assertEqual(
exception_context.exception.code(), grpc.StatusCode.INTERNAL)
self.assertEqual(response_future.details(), expected_error_details)
self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL)
def testStreamRequestStreamResponse(self):
request_iterator = (
b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
metadata = (('InVaLiD', 'StreamRequestStreamResponse'),)
expected_error_details = "metadata was invalid: %s" % metadata
response_iterator = self._stream_stream(request_iterator, metadata=metadata)
with self.assertRaises(grpc.RpcError) as exception_context:
next(response_iterator)
self.assertEqual(
exception_context.exception.details(), expected_error_details)
self.assertEqual(
exception_context.exception.code(), grpc.StatusCode.INTERNAL)
self.assertEqual(response_iterator.details(), expected_error_details)
self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -66,7 +66,7 @@ class ChannelConnectivityTest(unittest.TestCase):
ready_future = utilities.channel_ready_future(channel)
ready_future.add_done_callback(callback.accept_value)
with self.assertRaises(future.TimeoutError):
ready_future.result(test_constants.SHORT_TIMEOUT)
ready_future.result(timeout=test_constants.SHORT_TIMEOUT)
self.assertFalse(ready_future.cancelled())
self.assertFalse(ready_future.done())
self.assertTrue(ready_future.running())
@ -88,7 +88,7 @@ class ChannelConnectivityTest(unittest.TestCase):
ready_future = utilities.channel_ready_future(channel)
ready_future.add_done_callback(callback.accept_value)
self.assertIsNone(
ready_future.result(test_constants.SHORT_TIMEOUT))
ready_future.result(timeout=test_constants.LONG_TIMEOUT))
value_passed_to_callback = callback.block_until_called()
self.assertIs(ready_future, value_passed_to_callback)
self.assertFalse(ready_future.cancelled())

@ -65,6 +65,7 @@ VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) {
GRPC_SLICE_LENGTH(next));
grpc_slice_unref(next);
}
grpc_byte_buffer_reader_destroy(&reader);
return rb_string;
}

@ -37,6 +37,7 @@
#include "rb_server.h"
#include <grpc/grpc.h>
#include <grpc/support/atm.h>
#include <grpc/grpc_security.h>
#include <grpc/support/log.h>
#include "rb_call.h"
@ -60,22 +61,26 @@ typedef struct grpc_rb_server {
/* The actual server */
grpc_server *wrapped;
grpc_completion_queue *queue;
gpr_atm shutdown_started;
} grpc_rb_server;
static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
grpc_event ev;
if (server->wrapped != NULL) {
grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL);
ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_server_cancel_all_calls(server->wrapped);
rb_completion_queue_pluck(server->queue, NULL,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
// This can be started by app or implicitly by GC. Avoid a race between these.
if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) {
if (server->wrapped != NULL) {
grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL);
ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_server_cancel_all_calls(server->wrapped);
rb_completion_queue_pluck(server->queue, NULL,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
}
grpc_server_destroy(server->wrapped);
grpc_rb_completion_queue_destroy(server->queue);
server->wrapped = NULL;
server->queue = NULL;
}
grpc_server_destroy(server->wrapped);
grpc_rb_completion_queue_destroy(server->queue);
server->wrapped = NULL;
server->queue = NULL;
}
}
@ -116,6 +121,7 @@ static const rb_data_type_t grpc_rb_server_data_type = {
static VALUE grpc_rb_server_alloc(VALUE cls) {
grpc_rb_server *wrapper = ALLOC(grpc_rb_server);
wrapper->wrapped = NULL;
wrapper->shutdown_started = (gpr_atm)0;
return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
}

@ -35,9 +35,18 @@ module GRPC
# either end of a GRPC connection. When raised, it indicates that a status
# error should be returned to the other end of a GRPC connection; when
# caught it means that this end received a status error.
#
# There is also subclass of BadStatus in this module for each GRPC status.
# E.g., the GRPC::Cancelled class corresponds to status CANCELLED.
#
# See
# https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/status.h
# for detailed descriptions of each status code.
class BadStatus < StandardError
attr_reader :code, :details, :metadata
include GRPC::Core::StatusCodes
# @param code [Numeric] the status code
# @param details [String] the details of the exception
# @param metadata [Hash] the error's metadata
@ -55,9 +64,152 @@ module GRPC
def to_status
Struct::Status.new(code, details, @metadata)
end
def self.new_status_exception(code, details = 'unkown cause', metadata = {})
codes = {}
codes[OK] = Ok
codes[CANCELLED] = Cancelled
codes[UNKNOWN] = Unknown
codes[INVALID_ARGUMENT] = InvalidArgument
codes[DEADLINE_EXCEEDED] = DeadlineExceeded
codes[NOT_FOUND] = NotFound
codes[ALREADY_EXISTS] = AlreadyExists
codes[PERMISSION_DENIED] = PermissionDenied
codes[UNAUTHENTICATED] = Unauthenticated
codes[RESOURCE_EXHAUSTED] = ResourceExhausted
codes[FAILED_PRECONDITION] = FailedPrecondition
codes[ABORTED] = Aborted
codes[OUT_OF_RANGE] = OutOfRange
codes[UNIMPLEMENTED] = Unimplemented
codes[INTERNAL] = Internal
codes[UNIMPLEMENTED] = Unimplemented
codes[UNAVAILABLE] = Unavailable
codes[DATA_LOSS] = DataLoss
if codes[code].nil?
BadStatus.new(code, details, metadata)
else
codes[code].new(details, metadata)
end
end
end
# GRPC status code corresponding to status OK
class Ok < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::OK, details, metadata)
end
end
# Cancelled is an exception class that indicates that an rpc was cancelled.
class Cancelled < StandardError
# GRPC status code corresponding to status CANCELLED
class Cancelled < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::CANCELLED, details, metadata)
end
end
# GRPC status code corresponding to status UNKNOWN
class Unknown < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::UNKNOWN, details, metadata)
end
end
# GRPC status code corresponding to status INVALID_ARGUMENT
class InvalidArgument < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::INVALID_ARGUMENT, details, metadata)
end
end
# GRPC status code corresponding to status DEADLINE_EXCEEDED
class DeadlineExceeded < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::DEADLINE_EXCEEDED, details, metadata)
end
end
# GRPC status code corresponding to status NOT_FOUND
class NotFound < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::NOT_FOUND, details, metadata)
end
end
# GRPC status code corresponding to status ALREADY_EXISTS
class AlreadyExists < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::ALREADY_EXISTS, details, metadata)
end
end
# GRPC status code corresponding to status PERMISSION_DENIED
class PermissionDenied < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::PERMISSION_DENIED, details, metadata)
end
end
# GRPC status code corresponding to status UNAUTHENTICATED
class Unauthenticated < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::UNAUTHENTICATED, details, metadata)
end
end
# GRPC status code corresponding to status RESOURCE_EXHAUSTED
class ResourceExhausted < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::RESOURCE_EXHAUSTED, details, metadata)
end
end
# GRPC status code corresponding to status FAILED_PRECONDITION
class FailedPrecondition < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::FAILED_PRECONDITION, details, metadata)
end
end
# GRPC status code corresponding to status ABORTED
class Aborted < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::ABORTED, details, metadata)
end
end
# GRPC status code corresponding to status OUT_OF_RANGE
class OutOfRange < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::OUT_OF_RANGE, details, metadata)
end
end
# GRPC status code corresponding to status UNIMPLEMENTED
class Unimplemented < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::UNIMPLEMENTED, details, metadata)
end
end
# GRPC status code corresponding to status INTERNAL
class Internal < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::INTERNAL, details, metadata)
end
end
# GRPC status code corresponding to status UNAVAILABLE
class Unavailable < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::UNAVAILABLE, details, metadata)
end
end
# GRPC status code corresponding to status DATA_LOSS
class DataLoss < BadStatus
def initialize(details = 'unknown cause', metadata = {})
super(Core::StatusCodes::DATA_LOSS, details, metadata)
end
end
end

@ -43,8 +43,8 @@ class Struct
GRPC.logger.debug("Failing with status #{status}")
# raise BadStatus, propagating the metadata if present.
md = status.metadata
fail GRPC::BadStatus.new(status.code, status.details, md),
"status code: #{status.code}, details: #{status.details}"
fail GRPC::BadStatus.new_status_exception(
status.code, status.details, md)
end
status
end

@ -219,6 +219,10 @@ module GRPC
GRPC.logger.debug('bidi-read-loop: finished')
@reads_complete = true
finished
# Make sure that the write loop is done done before finishing the call.
# Note that blocking is ok at this point because we've already received
# a status
@enq_th.join if is_client
end
end
end

@ -119,7 +119,7 @@ module GRPC
# Send back a UNKNOWN status to the client
GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
GRPC.logger.warn(e)
send_status(active_call, UNKNOWN, 'unkown error handling call on server')
send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}")
end
def assert_arity_matches(mth)

@ -110,8 +110,9 @@ module GRPC
rpc_descs[name] = RpcDesc.new(name, input, output,
marshal_class_method,
unmarshal_class_method)
define_method(name) do
fail GRPC::BadStatus, GRPC::Core::StatusCodes::UNIMPLEMENTED
define_method(GenericService.underscore(name.to_s).to_sym) do
fail GRPC::BadStatus.new_status_exception(
GRPC::Core::StatusCodes::UNIMPLEMENTED)
end
end

@ -52,7 +52,9 @@ module Grpc
@status_mutex.synchronize do
status = @statuses["#{req.service}"]
end
fail GRPC::BadStatus, StatusCodes::NOT_FOUND if status.nil?
if status.nil?
fail GRPC::BadStatus.new_status_exception(StatusCodes::NOT_FOUND)
end
HealthCheckResponse.new(status: status)
end

@ -459,11 +459,8 @@ class NamedTests
deadline = GRPC::Core::TimeConsts::from_relative_time(1)
resps = @stub.full_duplex_call(enum.each_item, deadline: deadline)
resps.each { } # wait to receive each request (or timeout)
fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)'
rescue GRPC::BadStatus => e
assert("#{__callee__}: status was wrong") do
e.code == GRPC::Core::StatusCodes::DEADLINE_EXCEEDED
end
fail 'Should have raised GRPC::DeadlineExceeded'
rescue GRPC::DeadlineExceeded
end
def empty_stream

@ -134,6 +134,7 @@ class BenchmarkClient
resp = stub.streaming_call(q.each_item)
start = Time.now
q.push(req)
pushed_sentinal = false
resp.each do |r|
@histogram.add((Time.now-start)*1e9)
if !@done
@ -141,8 +142,9 @@ class BenchmarkClient
start = Time.now
q.push(req)
else
q.push(self)
break
q.push(self) unless pushed_sentinal
# Continue polling on the responses to consume and release resources
pushed_sentinal = true
end
end
end

@ -0,0 +1,64 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
StatusCodes = GRPC::Core::StatusCodes
describe StatusCodes do
# convert upper snake-case to camel case.
# e.g., DEADLINE_EXCEEDED -> DeadlineExceeded
def upper_snake_to_camel(name)
name.to_s.split('_').map(&:downcase).map(&:capitalize).join('')
end
StatusCodes.constants.each do |status_name|
it 'there is a subclass of BadStatus corresponding to StatusCode: ' \
"#{status_name} that has code: #{StatusCodes.const_get(status_name)}" do
camel_case = upper_snake_to_camel(status_name)
error_class = GRPC.const_get(camel_case)
# expect the error class to be a subclass of BadStatus
expect(error_class < GRPC::BadStatus)
error_object = error_class.new
# check that the code matches the int value of the error's constant
status_code = StatusCodes.const_get(status_name)
expect(error_object.code).to eq(status_code)
# check default parameters
expect(error_object.details).to eq('unknown cause')
expect(error_object.metadata).to eq({})
# check that the BadStatus factory for creates the correct
# exception too
from_factory = GRPC::BadStatus.new_status_exception(status_code)
expect(from_factory.is_a?(error_class)).to be(true)
end
end
end

@ -190,15 +190,14 @@ describe 'ClientStub' do
end
creds = GRPC::Core::CallCredentials.new(failing_auth)
error_occured = false
unauth_error_occured = false
begin
get_response(stub, credentials: creds)
rescue GRPC::BadStatus => e
error_occured = true
expect(e.code).to eq(GRPC::Core::StatusCodes::UNAUTHENTICATED)
rescue GRPC::Unauthenticated => e
unauth_error_occured = true
expect(e.details.include?(error_message)).to be true
end
expect(error_occured).to eq(true)
expect(unauth_error_occured).to eq(true)
# Kill the server thread so tests can complete
th.kill

@ -48,7 +48,6 @@ describe GRPC::RpcDesc do
@bidi_streamer = RpcDesc.new('ss', Stream.new(Object.new),
Stream.new(Object.new), 'encode', 'decode')
@bs_code = INTERNAL
@no_reason = 'unkown error handling call on server'
@ok_response = Object.new
end
@ -62,8 +61,9 @@ describe GRPC::RpcDesc do
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:remote_read).once.and_return(Object.new)
expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
false, metadata: {})
expect(@call).to receive(:send_status).once.with(UNKNOWN,
arg_error_msg,
false, metadata: {})
this_desc.run_server_method(@call, method(:other_error))
end
@ -112,7 +112,7 @@ describe GRPC::RpcDesc do
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason,
expect(@call).to receive(:send_status).once.with(UNKNOWN, arg_error_msg,
false, metadata: {})
@client_streamer.run_server_method(@call, method(:other_error_alt))
end
@ -174,8 +174,9 @@ describe GRPC::RpcDesc do
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
error_msg = arg_error_msg(StandardError.new)
expect(@call).to receive(:run_server_bidi).and_raise(StandardError)
expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason,
expect(@call).to receive(:send_status).once.with(UNKNOWN, error_msg,
false, metadata: {})
@bidi_streamer.run_server_method(@call, method(:other_error_alt))
end
@ -342,4 +343,9 @@ describe GRPC::RpcDesc do
def other_error_alt(_call)
fail(ArgumentError, 'other error')
end
def arg_error_msg(error = nil)
error ||= ArgumentError.new('other error')
"#{error.class}: #{error.message}"
end
end

@ -408,21 +408,21 @@ describe GRPC::RpcServer do
req = EchoMsg.new
n = 20 # arbitrary, use as many to ensure the server pool is exceeded
threads = []
bad_status_code = nil
one_failed_as_unavailable = false
n.times do
threads << Thread.new do
stub = SlowStub.new(alt_host, :this_channel_is_insecure)
begin
stub.an_rpc(req)
rescue GRPC::BadStatus => e
bad_status_code = e.code
rescue GRPC::ResourceExhausted
one_failed_as_unavailable = true
end
end
end
threads.each(&:join)
alt_srv.stop
t.join
expect(bad_status_code).to be(StatusCodes::RESOURCE_EXHAUSTED)
expect(one_failed_as_unavailable).to be(true)
end
end

@ -122,7 +122,7 @@ describe Grpc::Health::Checker do
checker.check(HCReq.new(service: t[:service]), nil)
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
expect(&blk).to raise_error GRPC::NotFound, expected_msg
end
end
end
@ -141,7 +141,7 @@ describe Grpc::Health::Checker do
checker.check(HCReq.new(service: t[:service]), nil)
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
expect(&blk).to raise_error GRPC::NotFound, expected_msg
end
end
end
@ -163,7 +163,7 @@ describe Grpc::Health::Checker do
checker.check(HCReq.new(service: t[:service]), nil)
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
expect(&blk).to raise_error GRPC::NotFound, expected_msg
end
end
end
@ -214,7 +214,7 @@ describe Grpc::Health::Checker do
stub.check(HCReq.new(service: 'unknown'))
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
expect(&blk).to raise_error GRPC::NotFound, expected_msg
@srv.stop
t.join
end

@ -67,3 +67,5 @@ RSpec.configure do |config|
end
RSpec::Expectations.configuration.warn_about_potential_false_positives = false
Thread.abort_on_exception = true

@ -62,7 +62,7 @@
%>
Pod::Spec.new do |s|
s.name = 'gRPC-Core'
version = '1.0.1'
version = '1.0.2'
s.version = version
s.summary = 'Core cross-platform gRPC library, written in C'
s.homepage = 'http://www.grpc.io'
@ -71,7 +71,9 @@
s.source = {
:git => 'https://github.com/grpc/grpc.git',
:tag => "v#{version}",
# TODO(mxyan): Change back to "v#{version}" for next release
#:tag => "v#{version}",
:tag => "objective-c-v#{version}",
# TODO(jcanizales): Depend explicitly on the nanopb pod, and disable submodules.
:submodules => true,
}

@ -29,7 +29,7 @@
s.require_paths = %w( src/ruby/bin src/ruby/lib src/ruby/pb )
s.platform = Gem::Platform::RUBY
s.add_dependency 'google-protobuf', '~> 3.0.2'
s.add_dependency 'google-protobuf', '~> 3.1.0'
s.add_dependency 'googleauth', '~> 0.5.1'
s.add_development_dependency 'bundler', '~> 1.9'

@ -0,0 +1,5 @@
RUN apt-get update && apt-get -y install wget
RUN echo deb http://llvm.org/apt/wily/ llvm-toolchain-wily-3.8 main >> /etc/apt/sources.list
RUN echo deb-src http://llvm.org/apt/wily/ llvm-toolchain-wily-3.8 main >> /etc/apt/sources.list
RUN wget -O - http://llvm.org/apt/llvm-snapshot.gpg.key| apt-key add -
RUN apt-get update && apt-get -y install clang-format-3.8

@ -0,0 +1,37 @@
%YAML 1.2
--- |
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
FROM ubuntu:15.10
<%include file="../clang_format.include"/>
ADD clang_format_all_the_things.sh /
CMD ["echo 'Run with tools/distrib/clang_format_code.sh'"]

@ -52,18 +52,12 @@
# ./compile.sh without a local protoc dependency
# TODO(mattkwong): install dependencies to support latest Bazel version if newer
# version is needed
RUN git clone https://github.com/bazelbuild/bazel.git /bazel && \
RUN git clone https://github.com/bazelbuild/bazel.git /bazel && ${"\\"}
cd /bazel && git checkout tags/0.4.1 && ./compile.sh
RUN ln -s /bazel/output/bazel /bin/
#===================
# Docker "inception"
# Note this is quite the ugly hack.
# This makes sure that the docker binary we inject has its dependencies.
RUN curl https://get.docker.com/ | sh
RUN apt-get remove --purge -y docker-engine
RUN mkdir /var/local/jenkins
<%include file="../../clang_format.include"/>
<%include file="../../run_tests_addons.include"/>
# Define the default command.
CMD ["bash"]

@ -42,9 +42,9 @@
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/util/test_config.h"
static void channel_init_func(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
static grpc_error *channel_init_func(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
GPR_ASSERT(args->channel_args->num_args == 1);
GPR_ASSERT(args->channel_args->args[0].type == GRPC_ARG_INTEGER);
GPR_ASSERT(0 == strcmp(args->channel_args->args[0].key, "test_key"));
@ -52,6 +52,7 @@ static void channel_init_func(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->is_first);
GPR_ASSERT(args->is_last);
*(int *)(elem->channel_data) = 0;
return GRPC_ERROR_NONE;
}
static grpc_error *call_init_func(grpc_exec_ctx *exec_ctx,

@ -240,6 +240,8 @@ static request_sequences request_sequences_create(size_t n) {
res.n = n;
res.connections = gpr_malloc(sizeof(*res.connections) * n);
res.connectivity_states = gpr_malloc(sizeof(*res.connectivity_states) * n);
memset(res.connections, 0, sizeof(*res.connections) * n);
memset(res.connectivity_states, 0, sizeof(*res.connectivity_states) * n);
return res;
}
@ -785,17 +787,15 @@ static void verify_total_carnage_round_robin(const servers_fixture *f,
}
}
/* no server is ever available. The persistent state is TRANSIENT_FAILURE. May
* also be CONNECTING if, under load, this check took too long to run and some
* subchannel already transitioned to retrying. */
/* No server is ever available. There should be no READY states (or SHUTDOWN).
* Note that all other states (IDLE, CONNECTING, TRANSIENT_FAILURE) are still
* possible, as the policy transitions while attempting to reconnect. */
for (size_t i = 0; i < sequences->n; i++) {
const grpc_connectivity_state actual = sequences->connectivity_states[i];
if (actual != GRPC_CHANNEL_TRANSIENT_FAILURE &&
actual != GRPC_CHANNEL_CONNECTING) {
if (actual == GRPC_CHANNEL_READY || actual == GRPC_CHANNEL_SHUTDOWN) {
gpr_log(GPR_ERROR,
"CONNECTIVITY STATUS SEQUENCE FAILURE: expected "
"GRPC_CHANNEL_TRANSIENT_FAILURE or GRPC_CHANNEL_CONNECTING, got "
"'%s' at iteration #%d",
"CONNECTIVITY STATUS SEQUENCE FAILURE: got unexpected state "
"'%s' at iteration #%d.",
grpc_connectivity_state_name(actual), (int)i);
abort();
}
@ -844,17 +844,15 @@ static void verify_partial_carnage_round_robin(
abort();
}
/* ... and that the last one should be TRANSIENT_FAILURE, after all servers
* are gone. May also be CONNECTING if, under load, this check took too long
* to run and the subchannel already transitioned to retrying. */
/* ... and that the last one shouldn't be READY (or SHUTDOWN): all servers are
* gone. It may be all other states (IDLE, CONNECTING, TRANSIENT_FAILURE), as
* the policy transitions while attempting to reconnect. */
actual = sequences->connectivity_states[num_iters - 1];
for (i = 0; i < sequences->n; i++) {
if (actual != GRPC_CHANNEL_TRANSIENT_FAILURE &&
actual != GRPC_CHANNEL_CONNECTING) {
if (actual == GRPC_CHANNEL_READY || actual == GRPC_CHANNEL_SHUTDOWN) {
gpr_log(GPR_ERROR,
"CONNECTIVITY STATUS SEQUENCE FAILURE: expected "
"GRPC_CHANNEL_TRANSIENT_FAILURE or GRPC_CHANNEL_CONNECTING, got "
"'%s' at iteration #%d",
"CONNECTIVITY STATUS SEQUENCE FAILURE: got unexpected state "
"'%s' at iteration #%d.",
grpc_connectivity_state_name(actual), (int)i);
abort();
}
@ -951,8 +949,8 @@ int main(int argc, char **argv) {
const size_t NUM_ITERS = 10;
const size_t NUM_SERVERS = 4;
grpc_test_init(argc, argv);
grpc_init();
grpc_test_init(argc, argv);
grpc_tracer_set_enabled("round_robin", 1);
GPR_ASSERT(grpc_lb_policy_create(&exec_ctx, "this-lb-policy-does-not-exist",

@ -63,8 +63,8 @@ static grpc_error *my_resolve_address(const char *name, const char *addr,
}
}
static grpc_resolver *create_resolver(const char *name) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
static grpc_resolver *create_resolver(grpc_exec_ctx *exec_ctx,
const char *name) {
grpc_resolver_factory *factory = grpc_resolver_factory_lookup("dns");
grpc_uri *uri = grpc_uri_parse(name, 0);
GPR_ASSERT(uri);
@ -72,10 +72,9 @@ static grpc_resolver *create_resolver(const char *name) {
memset(&args, 0, sizeof(args));
args.uri = uri;
grpc_resolver *resolver =
grpc_resolver_factory_create_resolver(&exec_ctx, factory, &args);
grpc_resolver_factory_create_resolver(exec_ctx, factory, &args);
grpc_resolver_factory_unref(factory);
grpc_uri_destroy(uri);
grpc_exec_ctx_finish(&exec_ctx);
return resolver;
}
@ -103,12 +102,10 @@ int main(int argc, char **argv) {
grpc_init();
gpr_mu_init(&g_mu);
grpc_blocking_resolve_address = my_resolve_address;
grpc_resolver *resolver = create_resolver("dns:test");
grpc_channel_args *result = (grpc_channel_args *)1;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolver *resolver = create_resolver(&exec_ctx, "dns:test");
gpr_event ev1;
gpr_event_init(&ev1);
grpc_resolver_next(&exec_ctx, resolver, &result,

@ -49,11 +49,6 @@ typedef struct on_resolution_arg {
void on_resolution_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
on_resolution_arg *res = arg;
const grpc_arg *channel_arg =
grpc_channel_args_find(res->resolver_result, GRPC_ARG_SERVER_NAME);
GPR_ASSERT(channel_arg != NULL);
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
GPR_ASSERT(strcmp(res->expected_server_name, channel_arg->value.string) == 0);
grpc_channel_args_destroy(exec_ctx, res->resolver_result);
}

@ -183,12 +183,7 @@ static grpc_resolver* fake_resolver_create(grpc_exec_ctx* exec_ctx,
// Instantiate resolver.
fake_resolver* r = gpr_malloc(sizeof(fake_resolver));
memset(r, 0, sizeof(*r));
grpc_arg server_name_arg;
server_name_arg.type = GRPC_ARG_STRING;
server_name_arg.key = GRPC_ARG_SERVER_NAME;
server_name_arg.value.string = args->uri->path;
r->channel_args =
grpc_channel_args_copy_and_add(args->args, &server_name_arg, 1);
r->channel_args = grpc_channel_args_copy(args->args);
r->addresses = addresses;
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &fake_resolver_vtable);

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save