Merge branch 'master' into metrics_typo

pull/6089/head
Sree Kuchibhotla 9 years ago
commit 9fe691cf32
  1. 12
      BUILD
  2. 4
      Makefile
  3. 2
      binding.gyp
  4. 4
      build.yaml
  5. 2
      config.m4
  6. 6
      gRPC.podspec
  7. 4
      grpc.gemspec
  8. 36
      include/grpc++/impl/codegen/async_stream.h
  9. 12
      include/grpc++/impl/codegen/async_unary_call.h
  10. 7
      include/grpc++/impl/codegen/call.h
  11. 13
      include/grpc++/impl/codegen/client_context.h
  12. 3
      include/grpc++/impl/codegen/client_unary_call.h
  13. 15
      include/grpc++/impl/codegen/method_handler_impl.h
  14. 2
      include/grpc++/impl/codegen/server_context.h
  15. 24
      include/grpc++/impl/codegen/sync_stream.h
  16. 6
      include/grpc/impl/codegen/grpc_types.h
  17. 4
      package.json
  18. 4
      package.xml
  19. 54
      src/core/ext/lb_policy/pick_first/pick_first.c
  20. 46
      src/core/ext/lb_policy/round_robin/round_robin.c
  21. 10
      src/core/ext/resolver/dns/native/dns_resolver.c
  22. 10
      src/core/ext/resolver/sockaddr/sockaddr_resolver.c
  23. 11
      src/core/ext/resolver/zookeeper/zookeeper_resolver.c
  24. 89
      src/core/ext/transport/chttp2/client/insecure/channel_create.c
  25. 101
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
  26. 40
      src/core/lib/channel/client_channel.c
  27. 10
      src/core/lib/channel/http_client_filter.c
  28. 5
      src/core/lib/channel/subchannel_call_holder.c
  29. 1
      src/core/lib/channel/subchannel_call_holder.h
  30. 2
      src/core/lib/client_config/README.md
  31. 20
      src/core/lib/client_config/client_channel_factory.c
  32. 51
      src/core/lib/client_config/client_channel_factory.h
  33. 11
      src/core/lib/client_config/lb_policy.c
  34. 13
      src/core/lib/client_config/lb_policy.h
  35. 4
      src/core/lib/client_config/lb_policy_factory.h
  36. 4
      src/core/lib/client_config/resolver_factory.h
  37. 4
      src/core/lib/client_config/resolver_registry.c
  38. 2
      src/core/lib/client_config/resolver_registry.h
  39. 2
      src/core/lib/client_config/subchannel.c
  40. 10
      src/core/lib/surface/call.c
  41. 2
      src/core/lib/surface/channel.h
  42. 6
      src/core/lib/transport/transport.h
  43. 2
      src/cpp/client/client_context.cc
  44. 2
      src/python/grpcio/grpc_core_dependencies.py
  45. 4
      src/ruby/lib/grpc/generic/rpc_server.rb
  46. 5
      src/ruby/spec/generic/rpc_server_spec.rb
  47. 2
      test/core/bad_ssl/bad_ssl_test.c
  48. 28
      test/core/client_config/resolvers/dns_resolver_connectivity_test.c
  49. 28
      test/core/client_config/resolvers/dns_resolver_test.c
  50. 28
      test/core/client_config/resolvers/sockaddr_resolver_test.c
  51. 4
      test/core/end2end/dualstack_socket_test.c
  52. 2
      test/core/end2end/tests/simple_delayed_request.c
  53. 2
      test/cpp/end2end/client_crash_test.cc
  54. 1
      test/cpp/end2end/server_crash_test_client.cc
  55. 4
      tools/doxygen/Doxyfile.core.internal
  56. 3
      tools/run_tests/run_node.sh
  57. 12
      tools/run_tests/sources_and_headers.json
  58. 6
      vsprojects/vcxproj/grpc/grpc.vcxproj
  59. 12
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  60. 6
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  61. 12
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters

12
BUILD

@ -198,6 +198,7 @@ cc_library(
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
"src/core/lib/channel/subchannel_call_holder.h",
"src/core/lib/client_config/client_channel_factory.h",
"src/core/lib/client_config/client_config.h",
"src/core/lib/client_config/connector.h",
"src/core/lib/client_config/initial_connect_string.h",
@ -208,7 +209,6 @@ cc_library(
"src/core/lib/client_config/resolver_factory.h",
"src/core/lib/client_config/resolver_registry.h",
"src/core/lib/client_config/subchannel.h",
"src/core/lib/client_config/subchannel_factory.h",
"src/core/lib/client_config/subchannel_index.h",
"src/core/lib/client_config/uri_parser.h",
"src/core/lib/compression/algorithm_metadata.h",
@ -339,6 +339,7 @@ cc_library(
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
"src/core/lib/channel/subchannel_call_holder.c",
"src/core/lib/client_config/client_channel_factory.c",
"src/core/lib/client_config/client_config.c",
"src/core/lib/client_config/connector.c",
"src/core/lib/client_config/default_initial_connect_string.c",
@ -350,7 +351,6 @@ cc_library(
"src/core/lib/client_config/resolver_factory.c",
"src/core/lib/client_config/resolver_registry.c",
"src/core/lib/client_config/subchannel.c",
"src/core/lib/client_config/subchannel_factory.c",
"src/core/lib/client_config/subchannel_index.c",
"src/core/lib/client_config/uri_parser.c",
"src/core/lib/compression/compression_algorithm.c",
@ -561,6 +561,7 @@ cc_library(
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
"src/core/lib/channel/subchannel_call_holder.h",
"src/core/lib/client_config/client_channel_factory.h",
"src/core/lib/client_config/client_config.h",
"src/core/lib/client_config/connector.h",
"src/core/lib/client_config/initial_connect_string.h",
@ -571,7 +572,6 @@ cc_library(
"src/core/lib/client_config/resolver_factory.h",
"src/core/lib/client_config/resolver_registry.h",
"src/core/lib/client_config/subchannel.h",
"src/core/lib/client_config/subchannel_factory.h",
"src/core/lib/client_config/subchannel_index.h",
"src/core/lib/client_config/uri_parser.h",
"src/core/lib/compression/algorithm_metadata.h",
@ -686,6 +686,7 @@ cc_library(
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
"src/core/lib/channel/subchannel_call_holder.c",
"src/core/lib/client_config/client_channel_factory.c",
"src/core/lib/client_config/client_config.c",
"src/core/lib/client_config/connector.c",
"src/core/lib/client_config/default_initial_connect_string.c",
@ -697,7 +698,6 @@ cc_library(
"src/core/lib/client_config/resolver_factory.c",
"src/core/lib/client_config/resolver_registry.c",
"src/core/lib/client_config/subchannel.c",
"src/core/lib/client_config/subchannel_factory.c",
"src/core/lib/client_config/subchannel_index.c",
"src/core/lib/client_config/uri_parser.c",
"src/core/lib/compression/compression_algorithm.c",
@ -1393,6 +1393,7 @@ objc_library(
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
"src/core/lib/channel/subchannel_call_holder.c",
"src/core/lib/client_config/client_channel_factory.c",
"src/core/lib/client_config/client_config.c",
"src/core/lib/client_config/connector.c",
"src/core/lib/client_config/default_initial_connect_string.c",
@ -1404,7 +1405,6 @@ objc_library(
"src/core/lib/client_config/resolver_factory.c",
"src/core/lib/client_config/resolver_registry.c",
"src/core/lib/client_config/subchannel.c",
"src/core/lib/client_config/subchannel_factory.c",
"src/core/lib/client_config/subchannel_index.c",
"src/core/lib/client_config/uri_parser.c",
"src/core/lib/compression/compression_algorithm.c",
@ -1557,6 +1557,7 @@ objc_library(
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
"src/core/lib/channel/subchannel_call_holder.h",
"src/core/lib/client_config/client_channel_factory.h",
"src/core/lib/client_config/client_config.h",
"src/core/lib/client_config/connector.h",
"src/core/lib/client_config/initial_connect_string.h",
@ -1567,7 +1568,6 @@ objc_library(
"src/core/lib/client_config/resolver_factory.h",
"src/core/lib/client_config/resolver_registry.h",
"src/core/lib/client_config/subchannel.h",
"src/core/lib/client_config/subchannel_factory.h",
"src/core/lib/client_config/subchannel_index.h",
"src/core/lib/client_config/uri_parser.h",
"src/core/lib/compression/algorithm_metadata.h",

@ -2487,6 +2487,7 @@ LIBGRPC_SRC = \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
src/core/lib/channel/subchannel_call_holder.c \
src/core/lib/client_config/client_channel_factory.c \
src/core/lib/client_config/client_config.c \
src/core/lib/client_config/connector.c \
src/core/lib/client_config/default_initial_connect_string.c \
@ -2498,7 +2499,6 @@ LIBGRPC_SRC = \
src/core/lib/client_config/resolver_factory.c \
src/core/lib/client_config/resolver_registry.c \
src/core/lib/client_config/subchannel.c \
src/core/lib/client_config/subchannel_factory.c \
src/core/lib/client_config/subchannel_index.c \
src/core/lib/client_config/uri_parser.c \
src/core/lib/compression/compression_algorithm.c \
@ -2844,6 +2844,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
src/core/lib/channel/subchannel_call_holder.c \
src/core/lib/client_config/client_channel_factory.c \
src/core/lib/client_config/client_config.c \
src/core/lib/client_config/connector.c \
src/core/lib/client_config/default_initial_connect_string.c \
@ -2855,7 +2856,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/client_config/resolver_factory.c \
src/core/lib/client_config/resolver_registry.c \
src/core/lib/client_config/subchannel.c \
src/core/lib/client_config/subchannel_factory.c \
src/core/lib/client_config/subchannel_index.c \
src/core/lib/client_config/uri_parser.c \
src/core/lib/compression/compression_algorithm.c \

@ -607,6 +607,7 @@
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',
'src/core/lib/channel/subchannel_call_holder.c',
'src/core/lib/client_config/client_channel_factory.c',
'src/core/lib/client_config/client_config.c',
'src/core/lib/client_config/connector.c',
'src/core/lib/client_config/default_initial_connect_string.c',
@ -618,7 +619,6 @@
'src/core/lib/client_config/resolver_factory.c',
'src/core/lib/client_config/resolver_registry.c',
'src/core/lib/client_config/subchannel.c',
'src/core/lib/client_config/subchannel_factory.c',
'src/core/lib/client_config/subchannel_index.c',
'src/core/lib/client_config/uri_parser.c',
'src/core/lib/compression/compression_algorithm.c',

@ -264,6 +264,7 @@ filegroups:
- src/core/lib/channel/http_client_filter.h
- src/core/lib/channel/http_server_filter.h
- src/core/lib/channel/subchannel_call_holder.h
- src/core/lib/client_config/client_channel_factory.h
- src/core/lib/client_config/client_config.h
- src/core/lib/client_config/connector.h
- src/core/lib/client_config/initial_connect_string.h
@ -274,7 +275,6 @@ filegroups:
- src/core/lib/client_config/resolver_factory.h
- src/core/lib/client_config/resolver_registry.h
- src/core/lib/client_config/subchannel.h
- src/core/lib/client_config/subchannel_factory.h
- src/core/lib/client_config/subchannel_index.h
- src/core/lib/client_config/uri_parser.h
- src/core/lib/compression/algorithm_metadata.h
@ -352,6 +352,7 @@ filegroups:
- src/core/lib/channel/http_client_filter.c
- src/core/lib/channel/http_server_filter.c
- src/core/lib/channel/subchannel_call_holder.c
- src/core/lib/client_config/client_channel_factory.c
- src/core/lib/client_config/client_config.c
- src/core/lib/client_config/connector.c
- src/core/lib/client_config/default_initial_connect_string.c
@ -363,7 +364,6 @@ filegroups:
- src/core/lib/client_config/resolver_factory.c
- src/core/lib/client_config/resolver_registry.c
- src/core/lib/client_config/subchannel.c
- src/core/lib/client_config/subchannel_factory.c
- src/core/lib/client_config/subchannel_index.c
- src/core/lib/client_config/uri_parser.c
- src/core/lib/compression/compression_algorithm.c

@ -129,6 +129,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
src/core/lib/channel/subchannel_call_holder.c \
src/core/lib/client_config/client_channel_factory.c \
src/core/lib/client_config/client_config.c \
src/core/lib/client_config/connector.c \
src/core/lib/client_config/default_initial_connect_string.c \
@ -140,7 +141,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/client_config/resolver_factory.c \
src/core/lib/client_config/resolver_registry.c \
src/core/lib/client_config/subchannel.c \
src/core/lib/client_config/subchannel_factory.c \
src/core/lib/client_config/subchannel_index.c \
src/core/lib/client_config/uri_parser.c \
src/core/lib/compression/compression_algorithm.c \

@ -200,6 +200,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/http_client_filter.h',
'src/core/lib/channel/http_server_filter.h',
'src/core/lib/channel/subchannel_call_holder.h',
'src/core/lib/client_config/client_channel_factory.h',
'src/core/lib/client_config/client_config.h',
'src/core/lib/client_config/connector.h',
'src/core/lib/client_config/initial_connect_string.h',
@ -210,7 +211,6 @@ Pod::Spec.new do |s|
'src/core/lib/client_config/resolver_factory.h',
'src/core/lib/client_config/resolver_registry.h',
'src/core/lib/client_config/subchannel.h',
'src/core/lib/client_config/subchannel_factory.h',
'src/core/lib/client_config/subchannel_index.h',
'src/core/lib/client_config/uri_parser.h',
'src/core/lib/compression/algorithm_metadata.h',
@ -358,6 +358,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',
'src/core/lib/channel/subchannel_call_holder.c',
'src/core/lib/client_config/client_channel_factory.c',
'src/core/lib/client_config/client_config.c',
'src/core/lib/client_config/connector.c',
'src/core/lib/client_config/default_initial_connect_string.c',
@ -369,7 +370,6 @@ Pod::Spec.new do |s|
'src/core/lib/client_config/resolver_factory.c',
'src/core/lib/client_config/resolver_registry.c',
'src/core/lib/client_config/subchannel.c',
'src/core/lib/client_config/subchannel_factory.c',
'src/core/lib/client_config/subchannel_index.c',
'src/core/lib/client_config/uri_parser.c',
'src/core/lib/compression/compression_algorithm.c',
@ -523,6 +523,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/http_client_filter.h',
'src/core/lib/channel/http_server_filter.h',
'src/core/lib/channel/subchannel_call_holder.h',
'src/core/lib/client_config/client_channel_factory.h',
'src/core/lib/client_config/client_config.h',
'src/core/lib/client_config/connector.h',
'src/core/lib/client_config/initial_connect_string.h',
@ -533,7 +534,6 @@ Pod::Spec.new do |s|
'src/core/lib/client_config/resolver_factory.h',
'src/core/lib/client_config/resolver_registry.h',
'src/core/lib/client_config/subchannel.h',
'src/core/lib/client_config/subchannel_factory.h',
'src/core/lib/client_config/subchannel_index.h',
'src/core/lib/client_config/uri_parser.h',
'src/core/lib/compression/algorithm_metadata.h',

@ -196,6 +196,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/http_client_filter.h )
s.files += %w( src/core/lib/channel/http_server_filter.h )
s.files += %w( src/core/lib/channel/subchannel_call_holder.h )
s.files += %w( src/core/lib/client_config/client_channel_factory.h )
s.files += %w( src/core/lib/client_config/client_config.h )
s.files += %w( src/core/lib/client_config/connector.h )
s.files += %w( src/core/lib/client_config/initial_connect_string.h )
@ -206,7 +207,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/client_config/resolver_factory.h )
s.files += %w( src/core/lib/client_config/resolver_registry.h )
s.files += %w( src/core/lib/client_config/subchannel.h )
s.files += %w( src/core/lib/client_config/subchannel_factory.h )
s.files += %w( src/core/lib/client_config/subchannel_index.h )
s.files += %w( src/core/lib/client_config/uri_parser.h )
s.files += %w( src/core/lib/compression/algorithm_metadata.h )
@ -341,6 +341,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/http_client_filter.c )
s.files += %w( src/core/lib/channel/http_server_filter.c )
s.files += %w( src/core/lib/channel/subchannel_call_holder.c )
s.files += %w( src/core/lib/client_config/client_channel_factory.c )
s.files += %w( src/core/lib/client_config/client_config.c )
s.files += %w( src/core/lib/client_config/connector.c )
s.files += %w( src/core/lib/client_config/default_initial_connect_string.c )
@ -352,7 +353,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/client_config/resolver_factory.c )
s.files += %w( src/core/lib/client_config/resolver_registry.c )
s.files += %w( src/core/lib/client_config/subchannel.c )
s.files += %w( src/core/lib/client_config/subchannel_factory.c )
s.files += %w( src/core/lib/client_config/subchannel_index.c )
s.files += %w( src/core/lib/client_config/uri_parser.c )
s.files += %w( src/core/lib/compression/compression_algorithm.c )

@ -108,7 +108,8 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
init_ops_.ClientSendClose();
@ -173,7 +174,8 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
finish_ops_.RecvMessage(response);
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&init_ops_);
}
@ -240,7 +242,8 @@ class ClientAsyncReaderWriter GRPC_FINAL
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
init_ops_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&init_ops_);
}
@ -305,7 +308,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@ -319,7 +323,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const W& msg, const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
@ -336,7 +341,8 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!status.ok());
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@ -366,7 +372,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@ -374,7 +381,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@ -385,7 +393,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
@ -415,7 +424,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_ops_);
}
@ -429,7 +439,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
write_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
@ -440,7 +451,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
void Finish(const Status& status, void* tag) {
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);

@ -67,7 +67,8 @@ class ClientAsyncResponseReader GRPC_FINAL
call_(channel->CreateCall(method, context, cq)),
collection_(new CallOpSetCollection) {
collection_->init_buf_.SetCollection(collection_);
collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_);
collection_->init_buf_.SendInitialMetadata(
context->send_initial_metadata_, context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(collection_->init_buf_.SendMessage(request).ok());
collection_->init_buf_.ClientSendClose();
@ -122,7 +123,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.set_output_tag(tag);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
meta_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
@ -130,7 +132,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
@ -147,7 +150,8 @@ class ServerAsyncResponseWriter GRPC_FINAL
GPR_CODEGEN_ASSERT(!status.ok());
finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);

@ -181,8 +181,10 @@ class CallOpSendInitialMetadata {
CallOpSendInitialMetadata() : send_(false) {}
void SendInitialMetadata(
const std::multimap<grpc::string, grpc::string>& metadata) {
const std::multimap<grpc::string, grpc::string>& metadata,
uint32_t flags) {
send_ = true;
flags_ = flags;
initial_metadata_count_ = metadata.size();
initial_metadata_ = FillMetadataArray(metadata);
}
@ -192,7 +194,7 @@ class CallOpSendInitialMetadata {
if (!send_) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->flags = 0;
op->flags = flags_;
op->reserved = NULL;
op->data.send_initial_metadata.count = initial_metadata_count_;
op->data.send_initial_metadata.metadata = initial_metadata_;
@ -204,6 +206,7 @@ class CallOpSendInitialMetadata {
}
bool send_;
uint32_t flags_;
size_t initial_metadata_count_;
grpc_metadata* initial_metadata_;
};

@ -221,6 +221,12 @@ class ClientContext {
deadline_ = deadline_tp.raw_time();
}
/// EXPERIMENTAL: Set this request to be idempotent
void set_idempotent(bool idempotent) { idempotent_ = idempotent; }
/// EXPERIMENTAL: Trigger fail-fast or not on this request
void set_fail_fast(bool fail_fast) { fail_fast_ = fail_fast; }
#ifndef GRPC_CXX0X_NO_CHRONO
/// Return the deadline for the client call.
std::chrono::system_clock::time_point deadline() {
@ -328,9 +334,16 @@ class ClientContext {
grpc_call* call() { return call_; }
void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel);
uint32_t initial_metadata_flags() const {
return (idempotent_ ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST : 0) |
(fail_fast_ ? 0 : GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY);
}
grpc::string authority() { return authority_; }
bool initial_metadata_received_;
bool fail_fast_;
bool idempotent_;
std::shared_ptr<Channel> channel_;
grpc::mutex mu_;
grpc_call* call_;

@ -62,7 +62,8 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
if (!status.ok()) {
return status;
}
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
ops.RecvInitialMetadata(context);
ops.RecvMessage(result);
ops.ClientSendClose();

@ -63,7 +63,8 @@ class RpcMethodHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@ -100,7 +101,8 @@ class ClientStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpServerSendStatus>
ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (status.ok()) {
status = ops.SendMessage(rsp);
}
@ -138,7 +140,8 @@ class ServerStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@ -170,7 +173,8 @@ class BidiStreamingHandler : public MethodHandler {
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
ops.SendInitialMetadata(param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@ -191,7 +195,8 @@ class UnknownMethodHandler : public MethodHandler {
static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, "");
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_);
ops->SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags());
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(context->trailing_metadata_, status);

@ -195,6 +195,8 @@ class ServerContext {
void set_call(grpc_call* call);
uint32_t initial_metadata_flags() const { return 0; }
CompletionOp* completion_op_;
bool has_notify_when_done_tag_;
void* async_notify_when_done_tag_;

@ -125,7 +125,8 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose>
ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
ops.ClientSendClose();
@ -190,7 +191,8 @@ class ClientWriter : public ClientWriterInterface<W> {
finish_ops_.RecvMessage(response);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
@ -268,7 +270,8 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
@ -334,7 +337,8 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@ -361,7 +365,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@ -374,7 +379,8 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
@ -397,7 +403,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
@ -417,7 +424,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);

@ -202,8 +202,12 @@ typedef enum grpc_call_error {
/* Initial metadata flags */
/** Signal that the call is idempotent */
#define GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST (0x00000010u)
/** Signal that the call should not return UNAVAILABLE before it has started */
#define GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY (0x00000020u)
/** Mask of all valid flags */
#define GRPC_INITIAL_METADATA_USED_MASK GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
#define GRPC_INITIAL_METADATA_USED_MASK \
(GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY)
/** A single metadata element */
typedef struct grpc_metadata {

@ -139,6 +139,7 @@
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
"src/core/lib/channel/subchannel_call_holder.h",
"src/core/lib/client_config/client_channel_factory.h",
"src/core/lib/client_config/client_config.h",
"src/core/lib/client_config/connector.h",
"src/core/lib/client_config/initial_connect_string.h",
@ -149,7 +150,6 @@
"src/core/lib/client_config/resolver_factory.h",
"src/core/lib/client_config/resolver_registry.h",
"src/core/lib/client_config/subchannel.h",
"src/core/lib/client_config/subchannel_factory.h",
"src/core/lib/client_config/subchannel_index.h",
"src/core/lib/client_config/uri_parser.h",
"src/core/lib/compression/algorithm_metadata.h",
@ -284,6 +284,7 @@
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
"src/core/lib/channel/subchannel_call_holder.c",
"src/core/lib/client_config/client_channel_factory.c",
"src/core/lib/client_config/client_config.c",
"src/core/lib/client_config/connector.c",
"src/core/lib/client_config/default_initial_connect_string.c",
@ -295,7 +296,6 @@
"src/core/lib/client_config/resolver_factory.c",
"src/core/lib/client_config/resolver_registry.c",
"src/core/lib/client_config/subchannel.c",
"src/core/lib/client_config/subchannel_factory.c",
"src/core/lib/client_config/subchannel_index.c",
"src/core/lib/client_config/uri_parser.c",
"src/core/lib/compression/compression_algorithm.c",

@ -200,6 +200,7 @@
<file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/subchannel_call_holder.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/client_channel_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/client_config.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/connector.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/initial_connect_string.h" role="src" />
@ -210,7 +211,6 @@
<file baseinstalldir="/" name="src/core/lib/client_config/resolver_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/resolver_registry.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/subchannel.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/subchannel_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/subchannel_index.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/uri_parser.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/compression/algorithm_metadata.h" role="src" />
@ -345,6 +345,7 @@
<file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/subchannel_call_holder.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/client_channel_factory.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/client_config.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/connector.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/default_initial_connect_string.c" role="src" />
@ -356,7 +357,6 @@
<file baseinstalldir="/" name="src/core/lib/client_config/resolver_factory.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/resolver_registry.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/subchannel.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/subchannel_factory.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/subchannel_index.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/client_config/uri_parser.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/compression/compression_algorithm.c" role="src" />

@ -40,6 +40,7 @@
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@ -149,6 +150,31 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
}
pp = next;
}
gpr_mu_unlock(&p->mu);
}
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
@ -171,6 +197,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@ -199,6 +226,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->next = p->pending_picks;
pp->pollset = pollset;
pp->target = target;
pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@ -286,11 +314,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
if (p->checking_subchannel == 0) {
/* only trigger transient failure when we've tried all alternatives */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
}
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@ -378,14 +409,9 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown,
pf_pick,
pf_cancel_pick,
pf_ping_one,
pf_exit_idle,
pf_check_connectivity,
pf_notify_on_state_change};
pf_destroy, pf_shutdown, pf_pick,
pf_cancel_pick, pf_cancel_picks, pf_ping_one,
pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@ -395,7 +421,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->subchannel_factory != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
if (args->addresses->naddrs == 0) return NULL;
@ -412,8 +438,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
exec_ctx, args->subchannel_factory, &sc_args);
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
if (subchannel != NULL) {
p->subchannels[subchannel_idx++] = subchannel;

@ -49,6 +49,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@ -275,6 +276,32 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*pp->target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
}
pp = next;
}
gpr_mu_unlock(&p->mu);
}
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
@ -303,6 +330,7 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@ -330,6 +358,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->pollset = pollset;
pp->target = target;
pp->on_complete = on_complete;
pp->initial_metadata_flags = initial_metadata_flags;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@ -485,14 +514,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown,
rr_pick,
rr_cancel_pick,
rr_ping_one,
rr_exit_idle,
rr_check_connectivity,
rr_notify_on_state_change};
rr_destroy, rr_shutdown, rr_pick,
rr_cancel_pick, rr_cancel_picks, rr_ping_one,
rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
@ -502,7 +526,7 @@ static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->subchannel_factory != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
@ -518,8 +542,8 @@ static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
exec_ctx, args->subchannel_factory, &sc_args);
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
if (subchannel != NULL) {
subchannel_data *sd = gpr_malloc(sizeof(*sd));

@ -59,7 +59,7 @@ typedef struct {
/** default port to use */
char *default_port;
/** subchannel factory */
grpc_subchannel_factory *subchannel_factory;
grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@ -170,7 +170,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
config = grpc_client_config_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = addresses;
lb_policy_args.subchannel_factory = r->subchannel_factory;
lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
@ -228,7 +228,7 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_config) {
grpc_client_config_unref(exec_ctx, r->resolved_config);
}
grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
@ -255,10 +255,10 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
r->subchannel_factory = args->subchannel_factory;
r->client_channel_factory = args->client_channel_factory;
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
grpc_subchannel_factory_ref(r->subchannel_factory);
grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}

@ -52,7 +52,7 @@ typedef struct {
/** refcount */
gpr_refcount refs;
/** subchannel factory */
grpc_subchannel_factory *subchannel_factory;
grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@ -125,7 +125,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = r->addresses;
lb_policy_args.subchannel_factory = r->subchannel_factory;
lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(cfg, lb_policy);
@ -140,7 +140,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
grpc_resolved_addresses_destroy(r->addresses);
gpr_free(r->lb_policy_name);
gpr_free(r);
@ -318,8 +318,8 @@ static grpc_resolver *sockaddr_create(
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
r->client_channel_factory = args->client_channel_factory;
grpc_client_channel_factory_ref(r->client_channel_factory);
return &r->base;
}

@ -57,7 +57,7 @@ typedef struct {
/** name to resolve */
char *name;
/** subchannel factory */
grpc_subchannel_factory *subchannel_factory;
grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@ -187,9 +187,8 @@ static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
lb_policy_args.addresses = addresses;
lb_policy_args.subchannel_factory = r->subchannel_factory;
lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
@ -424,7 +423,7 @@ static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_config != NULL) {
grpc_client_config_unref(exec_ctx, r->resolved_config);
}
grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
gpr_free(r->name);
gpr_free(r->lb_policy_name);
gpr_free(r);
@ -454,8 +453,8 @@ static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path);
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
r->client_channel_factory = args->client_channel_factory;
grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);

@ -136,31 +136,35 @@ static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
grpc_subchannel_factory base;
grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel *master;
} subchannel_factory;
} client_channel_factory;
static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
subchannel_factory *f = (subchannel_factory *)scf;
static void client_channel_factory_ref(
grpc_client_channel_factory *cc_factory) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_factory *scf) {
subchannel_factory *f = (subchannel_factory *)scf;
static void client_channel_factory_unref(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
if (f->master != NULL) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
"client_channel_factory");
}
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf,
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
subchannel_factory *f = (subchannel_factory *)scf;
client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
@ -175,9 +179,33 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
return s;
}
static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
subchannel_factory_ref, subchannel_factory_unref,
subchannel_factory_create_subchannel};
static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
grpc_channel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(final_args);
grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel");
return NULL;
}
grpc_client_channel_set_resolver(
exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
return channel;
}
static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
{client_channel_factory_ref, client_channel_factory_unref,
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
/* Create a client channel:
Asynchronously: - resolve target
@ -186,38 +214,27 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args,
void *reserved) {
grpc_channel *channel = NULL;
grpc_resolver *resolver;
subchannel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
(target, args, reserved));
GPR_ASSERT(!reserved);
channel =
grpc_channel_create(&exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
client_channel_factory *f = gpr_malloc(sizeof(*f));
memset(f, 0, sizeof(*f));
f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
f->merge_args = grpc_channel_args_copy(args);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
resolver = grpc_resolver_create(target, &f->base);
if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, f->master, "subchannel_factory");
grpc_subchannel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
return NULL;
}
grpc_client_channel_set_resolver(
&exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
grpc_subchannel_factory_unref(&exec_ctx, &f->base);
grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
if (channel != NULL) {
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create");
}
grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
return channel;
return channel; /* may be NULL */
}

@ -192,34 +192,38 @@ static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
grpc_subchannel_factory base;
grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
grpc_channel *master;
} subchannel_factory;
} client_channel_factory;
static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
subchannel_factory *f = (subchannel_factory *)scf;
static void client_channel_factory_ref(
grpc_client_channel_factory *cc_factory) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_factory *scf) {
subchannel_factory *f = (subchannel_factory *)scf;
static void client_channel_factory_unref(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"subchannel_factory");
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
"client_channel_factory");
if (f->master != NULL) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
"client_channel_factory");
}
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf,
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
subchannel_factory *f = (subchannel_factory *)scf;
client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
@ -236,9 +240,37 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
return s;
}
static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
subchannel_factory_ref, subchannel_factory_unref,
subchannel_factory_create_subchannel};
static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
grpc_channel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(final_args);
grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
if (resolver != NULL) {
grpc_client_channel_set_resolver(
exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create");
} else {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel");
channel = NULL;
}
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"client_channel_factory_create_channel");
return channel;
}
static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
{client_channel_factory_ref, client_channel_factory_unref,
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
/* Create a secure client channel:
Asynchronously: - resolve target
@ -248,13 +280,11 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
const char *target,
const grpc_channel_args *args,
void *reserved) {
grpc_channel *channel;
grpc_arg connector_arg;
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *security_connector;
grpc_resolver *resolver;
subchannel_factory *f;
client_channel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
@ -284,35 +314,30 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1);
channel = grpc_channel_create(&exec_ctx, target, args_copy,
GRPC_CLIENT_CHANNEL, NULL);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
memset(f, 0, sizeof(*f));
f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, "subchannel_factory");
f->security_connector = security_connector;
f->merge_args = grpc_channel_args_copy(args_copy);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
resolver = grpc_resolver_create(target, &f->base);
if (resolver) {
grpc_client_channel_set_resolver(
&exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
}
grpc_subchannel_factory_unref(&exec_ctx, &f->base);
GRPC_SECURITY_CONNECTOR_UNREF(&security_connector->base, "channel_create");
grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(new_args_from_connector);
}
if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "subchannel_factory");
channel = NULL;
GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
"grpc_secure_channel_create");
f->security_connector = security_connector;
grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
if (channel != NULL) {
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_secure_channel_create");
}
grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
return channel;
return channel; /* may be NULL */
}

@ -114,6 +114,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand,
grpc_connectivity_state state,
const char *reason) {
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_FATAL_FAILURE) &&
chand->lb_policy != NULL) {
/* cancel fail-fast picks */
grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
/* check= */ 0);
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
}
static void on_lb_policy_state_changed_locked(
grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
grpc_connectivity_state publish_state = w->state;
@ -127,8 +143,8 @@ static void on_lb_policy_state_changed_locked(
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = NULL;
}
grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
"lb_changed");
set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
"lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
@ -200,8 +216,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (iomgr_success && chand->resolver) {
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
"new_lb+resolver");
set_channel_connectivity_state_locked(exec_ctx, chand, state,
"new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
@ -216,8 +232,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
set_channel_connectivity_state_locked(
exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@ -272,8 +288,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
set_channel_connectivity_state_locked(
exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@ -290,6 +306,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
typedef struct {
grpc_metadata_batch *initial_metadata;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
@ -298,6 +315,7 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
@ -308,6 +326,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
} else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
}
@ -316,6 +335,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
@ -349,7 +369,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
initial_metadata, connected_subchannel, on_ready);
initial_metadata, initial_metadata_flags,
connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
return r;
}
@ -362,6 +383,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;

@ -111,10 +111,12 @@ static void hc_mutate_op(grpc_call_element *elem,
elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
op->send_idempotent_request
? GRPC_MDELEM_METHOD_PUT
: GRPC_MDELEM_METHOD_POST);
grpc_metadata_batch_add_head(
op->send_initial_metadata, &calld->method,
op->send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
? GRPC_MDELEM_METHOD_PUT
: GRPC_MDELEM_METHOD_POST);
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
channeld->static_scheme);
grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,

@ -127,7 +127,7 @@ retry:
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
&holder->connected_subchannel, NULL);
0, &holder->connected_subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
@ -145,7 +145,8 @@ retry:
GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
if (holder->pick_subchannel(
exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
&holder->connected_subchannel, &holder->next_step)) {
op->send_initial_metadata_flags, &holder->connected_subchannel,
&holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}

@ -42,6 +42,7 @@
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
typedef enum {

@ -40,7 +40,7 @@ decisions (for example, by avoiding disconnected backends).
Configured sub-channels are fully setup to participate in the grpc data plane.
Their behavior is specified by a set of grpc channel filters defined at their
construction. To customize this behavior, resolvers build
grpc_subchannel_factory objects, which use the decorator pattern to customize
grpc_client_channel_factory objects, which use the decorator pattern to customize
construction arguments for concrete grpc_subchannel instances.

@ -31,19 +31,27 @@
*
*/
#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/client_config/client_channel_factory.h"
void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) {
void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory) {
factory->vtable->ref(factory);
}
void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx,
grpc_subchannel_factory* factory) {
void grpc_client_channel_factory_unref(grpc_exec_ctx* exec_ctx,
grpc_client_channel_factory* factory) {
factory->vtable->unref(exec_ctx, factory);
}
grpc_subchannel* grpc_subchannel_factory_create_subchannel(
grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory,
grpc_subchannel* grpc_client_channel_factory_create_subchannel(
grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
grpc_subchannel_args* args) {
return factory->vtable->create_subchannel(exec_ctx, factory, args);
}
grpc_channel* grpc_client_channel_factory_create_channel(
grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
const char* target, grpc_client_channel_type type,
grpc_channel_args* args) {
return factory->vtable->create_client_channel(exec_ctx, factory, target, type,
args);
}

@ -31,36 +31,55 @@
*
*/
#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
#define GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H
#define GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/client_config/subchannel.h"
typedef struct grpc_subchannel_factory grpc_subchannel_factory;
typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable;
typedef struct grpc_client_channel_factory grpc_client_channel_factory;
typedef struct grpc_client_channel_factory_vtable
grpc_client_channel_factory_vtable;
typedef enum {
GRPC_CLIENT_CHANNEL_TYPE_REGULAR, /** for the user-level regular calls */
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, /** for communication with a load
balancing service */
} grpc_client_channel_type;
/** Constructor for new configured channels.
Creating decorators around this type is encouraged to adapt behavior. */
struct grpc_subchannel_factory {
const grpc_subchannel_factory_vtable *vtable;
struct grpc_client_channel_factory {
const grpc_client_channel_factory_vtable *vtable;
};
struct grpc_subchannel_factory_vtable {
void (*ref)(grpc_subchannel_factory *factory);
void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory);
struct grpc_client_channel_factory_vtable {
void (*ref)(grpc_client_channel_factory *factory);
void (*unref)(grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory);
grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
grpc_subchannel_factory *factory,
grpc_client_channel_factory *factory,
grpc_subchannel_args *args);
grpc_channel *(*create_client_channel)(grpc_exec_ctx *exec_ctx,
grpc_client_channel_factory *factory,
const char *target,
grpc_client_channel_type type,
grpc_channel_args *args);
};
void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory);
void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_factory *factory);
void grpc_client_channel_factory_ref(grpc_client_channel_factory *factory);
void grpc_client_channel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_client_channel_factory *factory);
/** Create a new grpc_subchannel */
grpc_subchannel *grpc_subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
grpc_subchannel *grpc_client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
grpc_subchannel_args *args);
#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */
/** Create a new grpc_channel */
grpc_channel *grpc_client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
const char *target, grpc_client_channel_type type, grpc_channel_args *args);
#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H */

@ -101,10 +101,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
target, on_complete);
initial_metadata_flags, target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@ -112,6 +113,14 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
initial_metadata_flags_eq);
}
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}

@ -60,9 +60,13 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@ -122,6 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
@ -131,6 +136,14 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
/** Cancel all pending picks which have:
(initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,

@ -34,8 +34,8 @@
#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
#include "src/core/lib/client_config/client_channel_factory.h"
#include "src/core/lib/client_config/lb_policy.h"
#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -51,7 +51,7 @@ struct grpc_lb_policy_factory {
typedef struct grpc_lb_policy_args {
grpc_resolved_addresses *addresses;
grpc_subchannel_factory *subchannel_factory;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {

@ -34,8 +34,8 @@
#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H
#define GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H
#include "src/core/lib/client_config/client_channel_factory.h"
#include "src/core/lib/client_config/resolver.h"
#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/client_config/uri_parser.h"
typedef struct grpc_resolver_factory grpc_resolver_factory;
@ -49,7 +49,7 @@ struct grpc_resolver_factory {
typedef struct grpc_resolver_args {
grpc_uri *uri;
grpc_subchannel_factory *subchannel_factory;
grpc_client_channel_factory *client_channel_factory;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable {

@ -123,14 +123,14 @@ static grpc_resolver_factory *resolve_factory(const char *target,
}
grpc_resolver *grpc_resolver_create(
const char *target, grpc_subchannel_factory *subchannel_factory) {
const char *target, grpc_client_channel_factory *client_channel_factory) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver;
grpc_resolver_args args;
memset(&args, 0, sizeof(args));
args.uri = uri;
args.subchannel_factory = subchannel_factory;
args.client_channel_factory = client_channel_factory;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;

@ -56,7 +56,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
return it.
If a resolver factory was not found, return NULL. */
grpc_resolver *grpc_resolver_create(
const char *target, grpc_subchannel_factory *subchannel_factory);
const char *target, grpc_client_channel_factory *client_channel_factory);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */

@ -54,7 +54,7 @@
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 2
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2

@ -81,11 +81,11 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
/* Status was created by some internal channel stack operation */
STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
/* Status was created by some internal channel stack operation */
STATUS_FROM_CORE,
/* Status came from the server sending status */
STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
@ -1110,6 +1110,9 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
gpr_mu_lock(&call->mu);
if (bctl->send_initial_metadata) {
if (!success) {
set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
}
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
@ -1232,8 +1235,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->metadata_batch[0][0].deadline = call->send_deadline;
stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op.send_idempotent_request =
(op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0;
stream_op.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {

@ -35,7 +35,7 @@
#define GRPC_CORE_LIB_SURFACE_CHANNEL_H
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/client_config/client_channel_factory.h"
#include "src/core/lib/surface/channel_stack_type.h"
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,

@ -101,9 +101,9 @@ typedef struct grpc_transport_stream_op {
/** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata;
/** Iff send_initial_metadata != NULL, flags if this is an idempotent request
or not */
bool send_idempotent_request;
/** Iff send_initial_metadata != NULL, flags associated with
send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
uint32_t send_initial_metadata_flags;
/** Send trailing metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_trailing_metadata;

@ -60,6 +60,8 @@ static ClientContext::GlobalCallbacks* g_client_callbacks =
ClientContext::ClientContext()
: initial_metadata_received_(false),
fail_fast_(true),
idempotent_(false),
call_(nullptr),
call_canceled_(false),
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),

@ -123,6 +123,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',
'src/core/lib/channel/subchannel_call_holder.c',
'src/core/lib/client_config/client_channel_factory.c',
'src/core/lib/client_config/client_config.c',
'src/core/lib/client_config/connector.c',
'src/core/lib/client_config/default_initial_connect_string.c',
@ -134,7 +135,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/client_config/resolver_factory.c',
'src/core/lib/client_config/resolver_registry.c',
'src/core/lib/client_config/subchannel.c',
'src/core/lib/client_config/subchannel_factory.c',
'src/core/lib/client_config/subchannel_index.c',
'src/core/lib/client_config/uri_parser.c',
'src/core/lib/compression/compression_algorithm.c',

@ -403,7 +403,7 @@ module GRPC
loop_handle_server_calls
end
# Sends UNAVAILABLE if there are too many unprocessed jobs
# Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
@ -411,7 +411,7 @@ module GRPC
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::UNAVAILABLE, '')
c.send_status(StatusCodes::RESOURCE_EXHAUSTED, '')
nil
end

@ -426,7 +426,7 @@ describe GRPC::RpcServer do
threads.each(&:join)
end
it 'should return UNAVAILABLE on too many jobs', server: true do
it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do
opts = {
a_channel_arg: 'an_arg',
server_override: @server,
@ -449,7 +449,8 @@ describe GRPC::RpcServer do
begin
stub.an_rpc(req)
rescue GRPC::BadStatus => e
one_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE
one_failed_as_unavailable =
e.code == StatusCodes::RESOURCE_EXHAUSTED
end
end
end

@ -87,7 +87,7 @@ static void run_test(const char *target, size_t nops) {
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->flags = GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;

@ -41,20 +41,28 @@
#include "src/core/lib/iomgr/timer.h"
#include "test/core/util/test_config.h"
static void subchannel_factory_ref(grpc_subchannel_factory *scv) {}
static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_factory *scv) {}
static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
static void client_channel_factory_ref(grpc_client_channel_factory *scv) {}
static void client_channel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_client_channel_factory *scv) {}
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
grpc_subchannel_args *args) {
return NULL;
}
static const grpc_subchannel_factory_vtable sc_vtable = {
subchannel_factory_ref, subchannel_factory_unref,
subchannel_factory_create_subchannel};
static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
grpc_channel_args *args) {
GPR_UNREACHABLE_CODE(return NULL);
}
static const grpc_client_channel_factory_vtable sc_vtable = {
client_channel_factory_ref, client_channel_factory_unref,
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
static grpc_subchannel_factory sc_factory = {&sc_vtable};
static grpc_client_channel_factory cc_factory = {&sc_vtable};
static gpr_mu g_mu;
static bool g_fail_resolution = true;
@ -84,7 +92,7 @@ static grpc_resolver *create_resolver(const char *name) {
grpc_resolver_args args;
memset(&args, 0, sizeof(args));
args.uri = uri;
args.subchannel_factory = &sc_factory;
args.client_channel_factory = &cc_factory;
grpc_resolver *resolver =
grpc_resolver_factory_create_resolver(factory, &args);
grpc_resolver_factory_unref(factory);

@ -38,20 +38,28 @@
#include "src/core/lib/client_config/resolver_registry.h"
#include "test/core/util/test_config.h"
static void subchannel_factory_ref(grpc_subchannel_factory *scv) {}
static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_factory *scv) {}
static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
static void client_channel_factory_ref(grpc_client_channel_factory *scv) {}
static void client_channel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_client_channel_factory *scv) {}
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
grpc_subchannel_args *args) {
GPR_UNREACHABLE_CODE(return NULL);
}
static const grpc_subchannel_factory_vtable sc_vtable = {
subchannel_factory_ref, subchannel_factory_unref,
subchannel_factory_create_subchannel};
static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
grpc_channel_args *args) {
GPR_UNREACHABLE_CODE(return NULL);
}
static const grpc_client_channel_factory_vtable sc_vtable = {
client_channel_factory_ref, client_channel_factory_unref,
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
static grpc_subchannel_factory sc_factory = {&sc_vtable};
static grpc_client_channel_factory cc_factory = {&sc_vtable};
static void test_succeeds(grpc_resolver_factory *factory, const char *string) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -63,7 +71,7 @@ static void test_succeeds(grpc_resolver_factory *factory, const char *string) {
GPR_ASSERT(uri);
memset(&args, 0, sizeof(args));
args.uri = uri;
args.subchannel_factory = &sc_factory;
args.client_channel_factory = &cc_factory;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
GPR_ASSERT(resolver != NULL);
GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test_succeeds");

@ -38,20 +38,28 @@
#include "src/core/lib/client_config/resolver_registry.h"
#include "test/core/util/test_config.h"
static void subchannel_factory_ref(grpc_subchannel_factory *scv) {}
static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_factory *scv) {}
static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
static void client_channel_factory_ref(grpc_client_channel_factory *scv) {}
static void client_channel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_client_channel_factory *scv) {}
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
grpc_subchannel_args *args) {
GPR_UNREACHABLE_CODE(return NULL);
}
static const grpc_subchannel_factory_vtable sc_vtable = {
subchannel_factory_ref, subchannel_factory_unref,
subchannel_factory_create_subchannel};
static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
grpc_channel_args *args) {
GPR_UNREACHABLE_CODE(return NULL);
}
static const grpc_client_channel_factory_vtable sc_vtable = {
client_channel_factory_ref, client_channel_factory_unref,
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
static grpc_subchannel_factory sc_factory = {&sc_vtable};
static grpc_client_channel_factory cc_factory = {&sc_vtable};
static void test_succeeds(grpc_resolver_factory *factory, const char *string) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -63,7 +71,7 @@ static void test_succeeds(grpc_resolver_factory *factory, const char *string) {
GPR_ASSERT(uri);
memset(&args, 0, sizeof(args));
args.uri = uri;
args.subchannel_factory = &sc_factory;
args.client_channel_factory = &cc_factory;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
GPR_ASSERT(resolver != NULL);
GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test_succeeds");

@ -168,7 +168,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->flags = expect_ok ? GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY : 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
@ -237,7 +237,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_DEADLINE_EXCEEDED);
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
}
grpc_call_destroy(c);

@ -120,7 +120,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->flags = GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;

@ -88,6 +88,7 @@ TEST_F(CrashTest, KillBeforeWrite) {
EchoRequest request;
EchoResponse response;
ClientContext context;
context.set_fail_fast(false);
auto stream = stub->BidiStream(&context);
@ -113,6 +114,7 @@ TEST_F(CrashTest, KillAfterWrite) {
EchoRequest request;
EchoResponse response;
ClientContext context;
context.set_fail_fast(false);
auto stream = stub->BidiStream(&context);

@ -63,6 +63,7 @@ int main(int argc, char** argv) {
EchoRequest request;
EchoResponse response;
grpc::ClientContext context;
context.set_fail_fast(false);
if (FLAGS_mode == "bidi") {
auto stream = stub->BidiStream(&context);

@ -812,6 +812,7 @@ src/core/lib/channel/context.h \
src/core/lib/channel/http_client_filter.h \
src/core/lib/channel/http_server_filter.h \
src/core/lib/channel/subchannel_call_holder.h \
src/core/lib/client_config/client_channel_factory.h \
src/core/lib/client_config/client_config.h \
src/core/lib/client_config/connector.h \
src/core/lib/client_config/initial_connect_string.h \
@ -822,7 +823,6 @@ src/core/lib/client_config/resolver.h \
src/core/lib/client_config/resolver_factory.h \
src/core/lib/client_config/resolver_registry.h \
src/core/lib/client_config/subchannel.h \
src/core/lib/client_config/subchannel_factory.h \
src/core/lib/client_config/subchannel_index.h \
src/core/lib/client_config/uri_parser.h \
src/core/lib/compression/algorithm_metadata.h \
@ -957,6 +957,7 @@ src/core/lib/channel/connected_channel.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
src/core/lib/channel/subchannel_call_holder.c \
src/core/lib/client_config/client_channel_factory.c \
src/core/lib/client_config/client_config.c \
src/core/lib/client_config/connector.c \
src/core/lib/client_config/default_initial_connect_string.c \
@ -968,7 +969,6 @@ src/core/lib/client_config/resolver.c \
src/core/lib/client_config/resolver_factory.c \
src/core/lib/client_config/resolver_registry.c \
src/core/lib/client_config/subchannel.c \
src/core/lib/client_config/subchannel_factory.c \
src/core/lib/client_config/subchannel_index.c \
src/core/lib/client_config/uri_parser.c \
src/core/lib/compression/compression_algorithm.c \

@ -48,6 +48,7 @@ if [ "$CONFIG" = "gcov" ]
then
./node_modules/.bin/istanbul cover --dir reports/node_coverage \
-x **/interop/* ./node_modules/.bin/_mocha -- --timeout $timeout $test_directory
cp -r reports/node_coverage/lcov-report/* reports/node_coverage/
cd build
gcov Release/obj.target/grpc/ext/*.o
lcov --base-directory . --directory . -c -o coverage.info
@ -55,8 +56,6 @@ then
genhtml -o ../reports/node_ext_coverage --num-spaces 2 \
-t 'Node gRPC test coverage' coverage.info --rc genhtml_hi_limit=95 \
--rc genhtml_med_limit=80 --no-prefix
echo '<html><head><meta http-equiv="refresh" content="0;URL=lcov-report/index.html"></head></html>' > \
../reports/node_coverage/index.html
else
JUNIT_REPORT_PATH=src/node/report.xml JUNIT_REPORT_STACK=1 \
./node_modules/.bin/mocha --timeout $timeout \

@ -4040,6 +4040,7 @@
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
"src/core/lib/channel/subchannel_call_holder.h",
"src/core/lib/client_config/client_channel_factory.h",
"src/core/lib/client_config/client_config.h",
"src/core/lib/client_config/connector.h",
"src/core/lib/client_config/initial_connect_string.h",
@ -4050,7 +4051,6 @@
"src/core/lib/client_config/resolver_factory.h",
"src/core/lib/client_config/resolver_registry.h",
"src/core/lib/client_config/subchannel.h",
"src/core/lib/client_config/subchannel_factory.h",
"src/core/lib/client_config/subchannel_index.h",
"src/core/lib/client_config/uri_parser.h",
"src/core/lib/compression/algorithm_metadata.h",
@ -4241,6 +4241,8 @@
"src/core/lib/channel/http_server_filter.h",
"src/core/lib/channel/subchannel_call_holder.c",
"src/core/lib/channel/subchannel_call_holder.h",
"src/core/lib/client_config/client_channel_factory.c",
"src/core/lib/client_config/client_channel_factory.h",
"src/core/lib/client_config/client_config.c",
"src/core/lib/client_config/client_config.h",
"src/core/lib/client_config/connector.c",
@ -4262,8 +4264,6 @@
"src/core/lib/client_config/resolver_registry.h",
"src/core/lib/client_config/subchannel.c",
"src/core/lib/client_config/subchannel.h",
"src/core/lib/client_config/subchannel_factory.c",
"src/core/lib/client_config/subchannel_factory.h",
"src/core/lib/client_config/subchannel_index.c",
"src/core/lib/client_config/subchannel_index.h",
"src/core/lib/client_config/uri_parser.c",
@ -4654,6 +4654,7 @@
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
"src/core/lib/channel/subchannel_call_holder.h",
"src/core/lib/client_config/client_channel_factory.h",
"src/core/lib/client_config/client_config.h",
"src/core/lib/client_config/connector.h",
"src/core/lib/client_config/initial_connect_string.h",
@ -4664,7 +4665,6 @@
"src/core/lib/client_config/resolver_factory.h",
"src/core/lib/client_config/resolver_registry.h",
"src/core/lib/client_config/subchannel.h",
"src/core/lib/client_config/subchannel_factory.h",
"src/core/lib/client_config/subchannel_index.h",
"src/core/lib/client_config/uri_parser.h",
"src/core/lib/compression/algorithm_metadata.h",
@ -4838,6 +4838,8 @@
"src/core/lib/channel/http_server_filter.h",
"src/core/lib/channel/subchannel_call_holder.c",
"src/core/lib/channel/subchannel_call_holder.h",
"src/core/lib/client_config/client_channel_factory.c",
"src/core/lib/client_config/client_channel_factory.h",
"src/core/lib/client_config/client_config.c",
"src/core/lib/client_config/client_config.h",
"src/core/lib/client_config/connector.c",
@ -4859,8 +4861,6 @@
"src/core/lib/client_config/resolver_registry.h",
"src/core/lib/client_config/subchannel.c",
"src/core/lib/client_config/subchannel.h",
"src/core/lib/client_config/subchannel_factory.c",
"src/core/lib/client_config/subchannel_factory.h",
"src/core/lib/client_config/subchannel_index.c",
"src/core/lib/client_config/subchannel_index.h",
"src/core/lib/client_config/uri_parser.c",

@ -321,6 +321,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\subchannel_call_holder.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\client_channel_factory.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\client_config.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\connector.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\initial_connect_string.h" />
@ -331,7 +332,6 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\resolver_factory.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\resolver_registry.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_factory.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_index.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\uri_parser.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\compression\algorithm_metadata.h" />
@ -517,6 +517,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\subchannel_call_holder.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\client_channel_factory.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\client_config.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\connector.c">
@ -539,8 +541,6 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_factory.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_index.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\uri_parser.c">

@ -148,6 +148,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\subchannel_call_holder.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\client_channel_factory.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\client_config.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
@ -181,9 +184,6 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_factory.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_index.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
@ -647,6 +647,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\subchannel_call_holder.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\client_channel_factory.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\client_config.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>
@ -677,9 +680,6 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_factory.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_index.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>

@ -311,6 +311,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\subchannel_call_holder.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\client_channel_factory.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\client_config.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\connector.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\initial_connect_string.h" />
@ -321,7 +322,6 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\resolver_factory.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\resolver_registry.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_factory.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_index.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\uri_parser.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\compression\algorithm_metadata.h" />
@ -489,6 +489,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\subchannel_call_holder.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\client_channel_factory.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\client_config.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\connector.c">
@ -511,8 +513,6 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_factory.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_index.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\uri_parser.c">

@ -142,6 +142,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\subchannel_call_holder.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\client_channel_factory.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\client_config.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
@ -175,9 +178,6 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_factory.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_index.c">
<Filter>src\core\lib\client_config</Filter>
</ClCompile>
@ -584,6 +584,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\subchannel_call_holder.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\client_channel_factory.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\client_config.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>
@ -614,9 +617,6 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_factory.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\client_config\subchannel_index.h">
<Filter>src\core\lib\client_config</Filter>
</ClInclude>

Loading…
Cancel
Save