Implement client-side load reporting for grpclb.

pull/10821/head
Mark D. Roth 8 years ago
parent d8696e1023
commit 09e458c6cd
  1. 8
      BUILD
  2. 4
      CMakeLists.txt
  3. 4
      Makefile
  4. 2
      binding.gyp
  5. 8
      build.yaml
  6. 2
      config.m4
  7. 6
      gRPC-Core.podspec
  8. 4
      grpc.gemspec
  9. 4
      package.xml
  10. 38
      src/core/ext/filters/client_channel/client_channel.c
  11. 3
      src/core/ext/filters/client_channel/lb_policy.c
  12. 9
      src/core/ext/filters/client_channel/lb_policy.h
  13. 153
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
  14. 42
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
  15. 354
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
  16. 133
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
  17. 65
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
  18. 50
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
  19. 6
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
  20. 3
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
  21. 3
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  22. 2
      src/core/ext/filters/client_channel/subchannel.c
  23. 1
      src/core/ext/filters/client_channel/subchannel.h
  24. 3
      src/core/lib/channel/context.h
  25. 2
      src/python/grpcio/grpc_core_dependencies.py
  26. 1
      test/cpp/end2end/BUILD
  27. 129
      test/cpp/end2end/grpclb_end2end_test.cc
  28. 4
      tools/doxygen/Doxyfile.core.internal
  29. 12
      tools/run_tests/generated/sources_and_headers.json
  30. 6
      vsprojects/vcxproj/grpc/grpc.vcxproj
  31. 12
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  32. 6
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  33. 12
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -829,14 +829,18 @@ grpc_cc_library(
grpc_cc_library(
name = "grpc_lb_policy_grpclb",
srcs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
],
@ -853,14 +857,18 @@ grpc_cc_library(
grpc_cc_library(
name = "grpc_lb_policy_grpclb_secure",
srcs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h",
],

@ -1121,8 +1121,10 @@ add_library(grpc
src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
src/core/ext/transport/chttp2/client/insecure/channel_create.c
src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
third_party/nanopb/pb_common.c
@ -1991,8 +1993,10 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
src/core/ext/filters/load_reporting/load_reporting.c
src/core/ext/filters/load_reporting/load_reporting_filter.c
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
third_party/nanopb/pb_common.c

@ -3104,8 +3104,10 @@ LIBGRPC_SRC = \
src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c \
src/core/ext/transport/chttp2/client/insecure/channel_create.c \
src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \
third_party/nanopb/pb_common.c \
@ -3943,8 +3945,10 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \
src/core/ext/filters/load_reporting/load_reporting.c \
src/core/ext/filters/load_reporting/load_reporting_filter.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \
third_party/nanopb/pb_common.c \

@ -855,8 +855,10 @@
'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',

@ -488,13 +488,17 @@ filegroups:
- grpc_base
- name: grpc_lb_policy_grpclb
headers:
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
src:
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
plugin: grpc_lb_policy_grpclb
@ -504,13 +508,17 @@ filegroups:
- nanopb
- name: grpc_lb_policy_grpclb_secure
headers:
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
src:
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
- src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
plugin: grpc_lb_policy_grpclb

@ -291,8 +291,10 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c \
src/core/ext/transport/chttp2/client/insecure/channel_create.c \
src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \
third_party/nanopb/pb_common.c \

@ -434,8 +434,10 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/uri_parser.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/transport/chttp2/client/chttp2_connector.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h',
'third_party/nanopb/pb.h',
@ -668,8 +670,10 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',
@ -895,8 +899,10 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/uri_parser.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/transport/chttp2/client/chttp2_connector.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h',
'third_party/nanopb/pb.h',

@ -350,8 +350,10 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/uri_parser.h )
s.files += %w( src/core/ext/filters/deadline/deadline_filter.h )
s.files += %w( src/core/ext/transport/chttp2/client/chttp2_connector.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h )
s.files += %w( third_party/nanopb/pb.h )
@ -584,8 +586,10 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c )
s.files += %w( src/core/ext/transport/chttp2/client/insecure/channel_create.c )
s.files += %w( src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c )
s.files += %w( third_party/nanopb/pb_common.c )

@ -359,8 +359,10 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/uri_parser.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/deadline/deadline_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/client/chttp2_connector.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" role="src" />
<file baseinstalldir="/" name="third_party/nanopb/pb.h" role="src" />
@ -593,8 +595,10 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/client/insecure/channel_create.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c" role="src" />
<file baseinstalldir="/" name="third_party/nanopb/pb_common.c" role="src" />

@ -795,6 +795,7 @@ typedef struct client_channel_call_data {
subchannel_creation_phase creation_phase;
grpc_connected_subchannel *connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity *pollent;
grpc_transport_stream_op_batch **waiting_ops;
@ -944,7 +945,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
.path = calld->path,
.start_time = calld->call_start_time,
.deadline = calld->deadline,
.arena = calld->arena};
.arena = calld->arena,
.context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
gpr_atm_rel_store(&calld->subchannel_call,
@ -973,6 +975,7 @@ typedef struct {
grpc_metadata_batch *initial_metadata;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **connected_subchannel;
grpc_call_context_element *subchannel_call_context;
grpc_closure *on_ready;
grpc_call_element *elem;
grpc_closure closure;
@ -984,7 +987,8 @@ typedef struct {
static bool pick_subchannel_locked(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready,
grpc_connected_subchannel **connected_subchannel,
grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready,
grpc_error *error);
static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
@ -995,10 +999,10 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else if (error != GRPC_ERROR_NONE) {
grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
} else {
if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready,
GRPC_ERROR_NONE)) {
if (pick_subchannel_locked(
exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags, cpa->connected_subchannel,
cpa->subchannel_call_context, cpa->on_ready, GRPC_ERROR_NONE)) {
grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
}
}
@ -1008,7 +1012,8 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
static bool pick_subchannel_locked(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready,
grpc_connected_subchannel **connected_subchannel,
grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready,
grpc_error *error) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
@ -1076,8 +1081,8 @@ static bool pick_subchannel_locked(
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
w_on_pick_arg->lb_policy = lb_policy;
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, lb_policy, &inputs, connected_subchannel, NULL,
&w_on_pick_arg->wrapper_closure);
exec_ctx, lb_policy, &inputs, connected_subchannel,
subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
@ -1100,6 +1105,7 @@ static bool pick_subchannel_locked(
cpa->initial_metadata = initial_metadata;
cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
cpa->subchannel_call_context = subchannel_call_context;
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking_locked, cpa,
@ -1158,7 +1164,7 @@ static void start_transport_stream_op_batch_locked_inner(
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
pick_subchannel_locked(
exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL,
exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL, NULL,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
break;
}
@ -1183,7 +1189,8 @@ static void start_transport_stream_op_batch_locked_inner(
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata,
op->payload->send_initial_metadata.send_initial_metadata_flags,
&calld->connected_subchannel, &calld->next_step, GRPC_ERROR_NONE)) {
&calld->connected_subchannel, calld->subchannel_call_context,
&calld->next_step, GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
} else {
@ -1200,7 +1207,8 @@ static void start_transport_stream_op_batch_locked_inner(
.path = calld->path,
.start_time = calld->call_start_time,
.deadline = calld->deadline,
.arena = calld->arena};
.arena = calld->arena,
.context = calld->subchannel_call_context};
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
gpr_atm_rel_store(&calld->subchannel_call,
@ -1355,6 +1363,12 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
"picked");
}
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
if (calld->subchannel_call_context[i].value != NULL) {
calld->subchannel_call_context[i].destroy(
calld->subchannel_call_context[i].value);
}
}
gpr_free(calld->waiting_ops);
grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}

@ -119,9 +119,10 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_call_context_element *context,
void **user_data, grpc_closure *on_complete) {
return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target,
user_data, on_complete);
context, user_data, on_complete);
}
void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx,

@ -43,9 +43,6 @@
typedef struct grpc_lb_policy grpc_lb_policy;
typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable;
typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
grpc_status_code status, const char *errmsg);
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
gpr_atm ref_pair;
@ -76,7 +73,8 @@ struct grpc_lb_policy_vtable {
/** \see grpc_lb_policy_pick */
int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_connected_subchannel **target,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete);
/** \see grpc_lb_policy_cancel_pick */
@ -156,6 +154,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
\a target will be set to the selected subchannel, or NULL on failure.
Upon success, \a user_data will be set to whatever opaque information
may need to be propagated from the LB policy, or NULL if not needed.
\a context will be populated with context to pass to the subchannel
call, if needed.
If the pick succeeds and a result is known immediately, a non-zero
value will be returned. Otherwise, \a on_complete will be invoked
@ -167,6 +167,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_call_context_element *context,
void **user_data, grpc_closure *on_complete);
/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)

@ -0,0 +1,153 @@
/*
*
* Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/profiling/timers.h"
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
return GRPC_ERROR_NONE;
}
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
typedef struct {
// Stats object to update.
grpc_grpclb_client_stats *client_stats;
// State for intercepting send_initial_metadata.
grpc_closure on_complete_for_send;
grpc_closure *original_on_complete_for_send;
bool send_initial_metadata_succeeded;
// State for intercepting recv_initial_metadata.
grpc_closure recv_initial_metadata_ready;
grpc_closure *original_recv_initial_metadata_ready;
bool recv_initial_metadata_succeeded;
} call_data;
static void on_complete_for_send(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
call_data *calld = arg;
if (error == GRPC_ERROR_NONE) {
calld->send_initial_metadata_succeeded = true;
}
grpc_closure_run(exec_ctx, calld->original_on_complete_for_send,
GRPC_ERROR_REF(error));
}
static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
call_data *calld = arg;
if (error == GRPC_ERROR_NONE) {
calld->recv_initial_metadata_succeeded = true;
}
grpc_closure_run(exec_ctx, calld->original_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
// Get stats object from context and take a ref.
GPR_ASSERT(args->context != NULL);
GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL);
calld->client_stats = grpc_grpclb_client_stats_ref(
args->context[GRPC_GRPCLB_CLIENT_STATS].value);
// Record call started.
grpc_grpclb_client_stats_add_call_started(calld->client_stats);
return GRPC_ERROR_NONE;
}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
grpc_closure *ignored) {
call_data *calld = elem->call_data;
// Record call finished, optionally setting client_failed_to_send and
// received.
grpc_grpclb_client_stats_add_call_finished(
false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */,
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
calld->recv_initial_metadata_succeeded /* known_received */,
calld->client_stats);
// All done, so unref the stats object.
grpc_grpclb_client_stats_unref(calld->client_stats);
}
static void start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0);
// Intercept send_initial_metadata.
if (batch->send_initial_metadata) {
calld->original_on_complete_for_send = batch->on_complete;
grpc_closure_init(&calld->on_complete_for_send, on_complete_for_send, calld,
grpc_schedule_on_exec_ctx);
batch->on_complete = &calld->on_complete_for_send;
}
// Intercept recv_initial_metadata.
if (batch->recv_initial_metadata) {
calld->original_recv_initial_metadata_ready =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
grpc_closure_init(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, calld,
grpc_schedule_on_exec_ctx);
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
// Chain to next filter.
grpc_call_next_op(exec_ctx, elem, batch);
GPR_TIMER_END("clr_start_transport_stream_op_batch", 0);
}
const grpc_channel_filter grpc_client_load_reporting_filter = {
start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
grpc_channel_next_get_info,
"client_load_reporting"};

@ -0,0 +1,42 @@
/*
*
* Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H
#include "src/core/lib/channel/channel_stack.h"
extern const grpc_channel_filter grpc_client_load_reporting_filter;
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H \
*/

@ -108,13 +108,16 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@ -126,6 +129,7 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/static_metadata.h"
#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
@ -147,6 +151,10 @@ static grpc_error *initial_metadata_add_lb_token(
lb_token_mdelem_storage, lb_token);
}
static void destroy_client_stats(void *arg) {
grpc_grpclb_client_stats_unref(arg);
}
typedef struct wrapped_rr_closure_arg {
/* the closure instance using this struct as argument */
grpc_closure wrapper_closure;
@ -163,6 +171,13 @@ typedef struct wrapped_rr_closure_arg {
* initial metadata */
grpc_connected_subchannel **target;
/* the context to be populated for the subchannel call */
grpc_call_context_element *context;
/* Stats for client-side load reporting. Note that this holds a
* reference, which must be either passed on via context or unreffed. */
grpc_grpclb_client_stats *client_stats;
/* the LB token associated with the pick */
grpc_mdelem lb_token;
@ -202,6 +217,12 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
(void *)*wc_arg->target, (void *)wc_arg->rr_policy);
abort();
}
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT(wc_arg->client_stats != NULL);
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
} else {
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
}
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
@ -237,6 +258,7 @@ typedef struct pending_pick {
static void add_pending_pick(pending_pick **root,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_call_context_element *context,
grpc_closure *on_complete) {
pending_pick *pp = gpr_zalloc(sizeof(*pp));
pp->next = *root;
@ -244,6 +266,7 @@ static void add_pending_pick(pending_pick **root,
pp->target = target;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
pp->wrapped_on_complete_arg.target = target;
pp->wrapped_on_complete_arg.context = context;
pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
@ -316,6 +339,10 @@ typedef struct glb_lb_policy {
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
/* Finished sending initial request. */
grpc_closure lb_on_sent_initial_request;
/* Status from the LB server has been received. This signals the end of the LB
* call. */
grpc_closure lb_on_server_status_received;
@ -348,6 +375,23 @@ typedef struct glb_lb_policy {
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
bool initial_request_sent;
bool seen_initial_response;
/* Stats for client-side load reporting. Should be unreffed and
* recreated whenever lb_call is replaced. */
grpc_grpclb_client_stats *client_stats;
/* Interval and timer for next client load report. */
gpr_timespec client_stats_report_interval;
grpc_timer client_load_report_timer;
bool client_load_report_timer_pending;
bool last_client_load_report_counters_were_zero;
/* Closure used for either the load report timer or the callback for
* completion of sending the load report. */
grpc_closure client_load_report_closure;
/* Client load report message payload. */
grpc_byte_buffer *client_load_report_payload;
} glb_lb_policy;
/* Keeps track and reacts to changes in connectivity of the RR instance */
@ -552,8 +596,8 @@ static bool pick_from_internal_rr_locked(
grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
GPR_ASSERT(rr_policy != NULL);
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token,
&wc_arg->wrapper_closure);
exec_ctx, rr_policy, pick_args, target, wc_arg->context,
(void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
@ -567,7 +611,12 @@ static bool pick_from_internal_rr_locked(
pick_args->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
gpr_free(wc_arg);
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT(wc_arg->client_stats != NULL);
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
gpr_free(wc_arg->free_when_done);
}
/* else, the pending pick will be registered and taken care of by the
* pending pick list inside the RR policy (glb_policy->rr_policy).
@ -690,6 +739,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
glb_policy->pending_picks = pp->next;
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
pp->wrapped_on_complete_arg.client_stats =
grpc_grpclb_client_stats_ref(glb_policy->client_stats);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
@ -864,9 +915,18 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_uri_destroy(uri);
glb_policy->cc_factory = args->client_channel_factory;
glb_policy->args = grpc_channel_args_copy(args->args);
GPR_ASSERT(glb_policy->cc_factory != NULL);
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
grpc_arg new_arg;
new_arg.key = GRPC_ARG_LB_POLICY_NAME;
new_arg.type = GRPC_ARG_STRING;
new_arg.value.string = "grpclb";
static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
grpc_slice_hash_table *targets_info = NULL;
/* Create a client channel over them to communicate with a LB service */
char *lb_service_target_addresses =
@ -880,6 +940,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
gpr_free(lb_service_target_addresses);
if (glb_policy->lb_channel == NULL) {
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
gpr_free(glb_policy);
return NULL;
}
@ -895,6 +957,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GPR_ASSERT(glb_policy->pending_pings == NULL);
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
if (glb_policy->client_stats != NULL) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
}
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = NULL;
grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
@ -1011,7 +1076,8 @@ static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_connected_subchannel **target,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
@ -1039,6 +1105,10 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_schedule_on_exec_ctx);
wc_arg->rr_policy = glb_policy->rr_policy;
wc_arg->target = target;
wc_arg->context = context;
GPR_ASSERT(glb_policy->client_stats != NULL);
wc_arg->client_stats =
grpc_grpclb_client_stats_ref(glb_policy->client_stats);
wc_arg->wrapped_closure = on_complete;
wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
wc_arg->initial_metadata = pick_args->initial_metadata;
@ -1052,7 +1122,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
"picks",
(void *)(glb_policy));
}
add_pending_pick(&glb_policy->pending_picks, pick_args, target,
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
on_complete);
if (!glb_policy->started_picking) {
@ -1093,6 +1163,104 @@ static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
exec_ctx, &glb_policy->state_tracker, current, notify);
}
static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
const gpr_timespec next_client_load_report_time =
gpr_time_add(now, glb_policy->client_stats_report_interval);
grpc_closure_init(&glb_policy->client_load_report_closure,
send_client_load_report_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner, false));
grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
next_client_load_report_time,
&glb_policy->client_load_report_closure, now);
}
static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = arg;
grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
glb_policy->client_load_report_payload = NULL;
if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) {
glb_policy->client_load_report_timer_pending = false;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"client_load_report");
return;
}
schedule_next_client_load_report(exec_ctx, glb_policy);
}
static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = glb_policy->client_load_report_payload;
grpc_closure_init(&glb_policy->client_load_report_closure,
client_load_report_done_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner, false));
grpc_call_error call_error = grpc_call_start_batch_and_execute(
exec_ctx, glb_policy->lb_call, &op, 1,
&glb_policy->client_load_report_closure);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
request->client_stats.num_calls_finished_with_drop_for_rate_limiting ==
0 &&
request->client_stats
.num_calls_finished_with_drop_for_load_balancing == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
0 &&
request->client_stats.num_calls_finished_known_received == 0;
}
static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = arg;
if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) {
glb_policy->client_load_report_timer_pending = false;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"client_load_report");
return;
}
// Construct message payload.
GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
grpc_grpclb_request *request =
grpc_grpclb_load_report_request_create(glb_policy->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (load_report_counters_are_zero(request)) {
if (glb_policy->last_client_load_report_counters_were_zero) {
grpc_grpclb_request_destroy(request);
schedule_next_client_load_report(exec_ctx, glb_policy);
return;
}
glb_policy->last_client_load_report_counters_were_zero = true;
} else {
glb_policy->last_client_load_report_counters_were_zero = false;
}
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
glb_policy->client_load_report_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(exec_ctx, request_payload_slice);
grpc_grpclb_request_destroy(request);
// If we've already sent the initial request, then we can go ahead and
// sent the load report. Otherwise, we need to wait until the initial
// request has been sent to send this
// (see lb_on_sent_initial_request_locked() below).
if (glb_policy->initial_request_sent) {
do_send_client_load_report_locked(exec_ctx, glb_policy);
}
}
static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error);
static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error);
static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
@ -1114,6 +1282,11 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
&host, glb_policy->deadline, NULL);
grpc_slice_unref_internal(exec_ctx, host);
if (glb_policy->client_stats != NULL) {
grpc_grpclb_client_stats_unref(glb_policy->client_stats);
}
glb_policy->client_stats = grpc_grpclb_client_stats_create();
grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
@ -1125,6 +1298,9 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_slice_unref_internal(exec_ctx, request_payload_slice);
grpc_grpclb_request_destroy(request);
grpc_closure_init(&glb_policy->lb_on_sent_initial_request,
lb_on_sent_initial_request_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner, false));
grpc_closure_init(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner, false));
@ -1138,6 +1314,10 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
GRPC_GRPCLB_RECONNECT_JITTER,
GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
glb_policy->initial_request_sent = false;
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
}
static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
@ -1151,6 +1331,10 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details);
if (!glb_policy->client_load_report_timer_pending) {
grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer);
}
}
/*
@ -1179,21 +1363,27 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&glb_policy->lb_initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
GPR_ASSERT(glb_policy->lb_request_payload != NULL);
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = glb_policy->lb_request_payload;
op->flags = 0;
op->reserved = NULL;
op++;
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
* count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received");
call_error = grpc_call_start_batch_and_execute(
exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_sent_initial_request);
GPR_ASSERT(GRPC_CALL_OK == call_error);
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata =
&glb_policy->lb_trailing_metadata_recv;
@ -1225,6 +1415,19 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = arg;
glb_policy->initial_request_sent = true;
// If we attempted to send a client load report before the initial
// request was sent, send the load report now.
if (glb_policy->client_load_report_payload != NULL) {
do_send_client_load_report_locked(exec_ctx, glb_policy);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_response_received_locked");
}
static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = arg;
@ -1240,58 +1443,91 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
grpc_grpclb_serverlist *serverlist =
grpc_grpclb_response_parse_serverlist(response_slice);
if (serverlist != NULL) {
GPR_ASSERT(glb_policy->lb_call != NULL);
grpc_slice_unref_internal(exec_ctx, response_slice);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Serverlist with %lu servers received",
(unsigned long)serverlist->num_servers);
for (size_t i = 0; i < serverlist->num_servers; ++i) {
grpc_resolved_address addr;
parse_server(serverlist->servers[i], &addr);
char *ipport;
grpc_sockaddr_to_string(&ipport, &addr, false);
gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
gpr_free(ipport);
grpc_grpclb_initial_response *response = NULL;
if (!glb_policy->seen_initial_response &&
(response = grpc_grpclb_initial_response_parse(response_slice)) !=
NULL) {
if (response->has_client_stats_report_interval) {
glb_policy->client_stats_report_interval =
gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN),
grpc_grpclb_duration_to_timespec(
&response->client_stats_report_interval));
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
"received initial LB response message; "
"client load reporting interval = %" PRId64 ".%09d sec",
glb_policy->client_stats_report_interval.tv_sec,
glb_policy->client_stats_report_interval.tv_nsec);
}
/* take a weak ref (won't prevent calling of \a glb_shutdown() if the
* strong ref count goes to zero) to be unref'd in
* send_client_load_report() */
glb_policy->client_load_report_timer_pending = true;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
schedule_next_client_load_report(exec_ctx, glb_policy);
} else if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
"received initial LB response message; "
"client load reporting NOT enabled");
}
grpc_grpclb_initial_response_destroy(response);
glb_policy->seen_initial_response = true;
} else {
grpc_grpclb_serverlist *serverlist =
grpc_grpclb_response_parse_serverlist(response_slice);
if (serverlist != NULL) {
GPR_ASSERT(glb_policy->lb_call != NULL);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Serverlist with %lu servers received",
(unsigned long)serverlist->num_servers);
for (size_t i = 0; i < serverlist->num_servers; ++i) {
grpc_resolved_address addr;
parse_server(serverlist->servers[i], &addr);
char *ipport;
grpc_sockaddr_to_string(&ipport, &addr, false);
gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
gpr_free(ipport);
}
}
/* update serverlist */
if (serverlist->num_servers > 0) {
if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
/* update serverlist */
if (serverlist->num_servers > 0) {
if (grpc_grpclb_serverlist_equals(glb_policy->serverlist,
serverlist)) {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
"Incoming server list identical to current, ignoring.");
}
grpc_grpclb_destroy_serverlist(serverlist);
} else { /* new serverlist */
if (glb_policy->serverlist != NULL) {
/* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
/* and update the copy in the glb_lb_policy instance. This
* serverlist instance will be destroyed either upon the next
* update or in glb_destroy() */
glb_policy->serverlist = serverlist;
rr_handover_locked(exec_ctx, glb_policy);
}
} else {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
"Incoming server list identical to current, ignoring.");
"Received empty server list. Picks will stay pending until "
"a response with > 0 servers is received");
}
grpc_grpclb_destroy_serverlist(serverlist);
} else { /* new serverlist */
if (glb_policy->serverlist != NULL) {
/* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
/* and update the copy in the glb_lb_policy instance. This serverlist
* instance will be destroyed either upon the next update or in
* glb_destroy() */
glb_policy->serverlist = serverlist;
rr_handover_locked(exec_ctx, glb_policy);
}
} else {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
"Received empty server list. Picks will stay pending until a "
"response with > 0 servers is received");
}
grpc_grpclb_destroy_serverlist(serverlist);
} else { /* serverlist == NULL */
gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
}
} else { /* serverlist == NULL */
gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
grpc_slice_unref_internal(exec_ctx, response_slice);
}
grpc_slice_unref_internal(exec_ctx, response_slice);
if (!glb_policy->shutting_down) {
/* keep listening for serverlist updates */
op->op = GRPC_OP_RECV_MESSAGE;
@ -1403,9 +1639,29 @@ grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
}
/* Plugin registration */
// Only add client_load_reporting filter if the grpclb LB policy is used.
static bool maybe_add_client_load_reporting_filter(
grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
const grpc_channel_args *args =
grpc_channel_stack_builder_get_channel_arguments(builder);
const grpc_arg *channel_arg =
grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING &&
strcmp(channel_arg->value.string, "grpclb") == 0) {
return grpc_channel_stack_builder_append_filter(
builder, (const grpc_channel_filter *)arg, NULL, NULL);
}
return true;
}
void grpc_lb_policy_grpclb_init() {
grpc_register_lb_policy(grpc_glb_lb_factory_create());
grpc_register_tracer("glb", &grpc_lb_glb_trace);
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_client_load_reporting_filter,
(void *)&grpc_client_load_reporting_filter);
}
void grpc_lb_policy_grpclb_shutdown() {}

@ -0,0 +1,133 @@
/*
*
* Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
#include "src/core/lib/channel/channel_args.h"
#define GRPC_ARG_GRPCLB_CLIENT_STATS "grpc.grpclb_client_stats"
struct grpc_grpclb_client_stats {
gpr_refcount refs;
gpr_atm num_calls_started;
gpr_atm num_calls_finished;
gpr_atm num_calls_finished_with_drop_for_rate_limiting;
gpr_atm num_calls_finished_with_drop_for_load_balancing;
gpr_atm num_calls_finished_with_client_failed_to_send;
gpr_atm num_calls_finished_known_received;
};
grpc_grpclb_client_stats* grpc_grpclb_client_stats_create() {
grpc_grpclb_client_stats* client_stats = gpr_zalloc(sizeof(*client_stats));
gpr_ref_init(&client_stats->refs, 1);
return client_stats;
}
grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
grpc_grpclb_client_stats* client_stats) {
gpr_ref(&client_stats->refs);
return client_stats;
}
void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) {
if (gpr_unref(&client_stats->refs)) {
gpr_free(client_stats);
}
}
void grpc_grpclb_client_stats_add_call_started(
grpc_grpclb_client_stats* client_stats) {
gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
}
void grpc_grpclb_client_stats_add_call_finished(
bool finished_with_drop_for_rate_limiting,
bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats) {
gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
if (finished_with_drop_for_rate_limiting) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_drop_for_rate_limiting,
(gpr_atm)1);
}
if (finished_with_drop_for_load_balancing) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_drop_for_load_balancing,
(gpr_atm)1);
}
if (finished_with_client_failed_to_send) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_client_failed_to_send,
(gpr_atm)1);
}
if (finished_known_received) {
gpr_atm_full_fetch_add(&client_stats->num_calls_finished_known_received,
(gpr_atm)1);
}
}
static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) {
*value = (int64_t)gpr_atm_acq_load(counter);
gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value));
}
void grpc_grpclb_client_stats_get(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
int64_t* num_calls_finished_with_drop_for_rate_limiting,
int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received) {
atomic_get_and_reset_counter(num_calls_started,
&client_stats->num_calls_started);
atomic_get_and_reset_counter(num_calls_finished,
&client_stats->num_calls_finished);
atomic_get_and_reset_counter(
num_calls_finished_with_drop_for_rate_limiting,
&client_stats->num_calls_finished_with_drop_for_rate_limiting);
atomic_get_and_reset_counter(
num_calls_finished_with_drop_for_load_balancing,
&client_stats->num_calls_finished_with_drop_for_load_balancing);
atomic_get_and_reset_counter(
num_calls_finished_with_client_failed_to_send,
&client_stats->num_calls_finished_with_client_failed_to_send);
atomic_get_and_reset_counter(
num_calls_finished_known_received,
&client_stats->num_calls_finished_known_received);
}

@ -0,0 +1,65 @@
/*
*
* Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H
#include <stdbool.h>
#include <grpc/impl/codegen/grpc_types.h>
typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats;
grpc_grpclb_client_stats* grpc_grpclb_client_stats_create();
grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_started(
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_finished(
bool finished_with_drop_for_rate_limiting,
bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_get(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
int64_t* num_calls_finished_with_drop_for_rate_limiting,
int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \
*/

@ -80,15 +80,45 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name) {
grpc_grpclb_request *req = gpr_malloc(sizeof(grpc_grpclb_request));
req->has_client_stats = 0; /* TODO(dgq): add support for stats once defined */
req->has_initial_request = 1;
req->initial_request.has_name = 1;
req->has_client_stats = false;
req->has_initial_request = true;
req->initial_request.has_name = true;
strncpy(req->initial_request.name, lb_service_name,
GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH);
return req;
}
static void populate_timestamp(gpr_timespec timestamp,
struct _grpc_lb_v1_Timestamp *timestamp_pb) {
timestamp_pb->has_seconds = true;
timestamp_pb->seconds = timestamp.tv_sec;
timestamp_pb->has_nanos = true;
timestamp_pb->nanos = timestamp.tv_nsec;
}
grpc_grpclb_request *grpc_grpclb_load_report_request_create(
grpc_grpclb_client_stats *client_stats) {
grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request));
req->has_client_stats = true;
req->client_stats.has_timestamp = true;
populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
req->client_stats.has_num_calls_started = true;
req->client_stats.has_num_calls_finished = true;
req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true;
req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_known_received = true;
grpc_grpclb_client_stats_get(
client_stats, &req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished,
&req->client_stats.num_calls_finished_with_drop_for_rate_limiting,
&req->client_stats.num_calls_finished_with_drop_for_load_balancing,
&req->client_stats.num_calls_finished_with_client_failed_to_send,
&req->client_stats.num_calls_finished_known_received);
return req;
}
grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
size_t encoded_length;
pb_ostream_t sizestream;
@ -122,6 +152,9 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
if (!res.has_initial_response) return NULL;
grpc_grpclb_initial_response *initial_res =
gpr_malloc(sizeof(grpc_grpclb_initial_response));
memcpy(initial_res, &res.initial_response,
@ -243,6 +276,15 @@ int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs,
return 0;
}
gpr_timespec grpc_grpclb_duration_to_timespec(
grpc_grpclb_duration *duration_pb) {
gpr_timespec duration;
duration.tv_sec = duration_pb->has_seconds ? duration_pb->seconds : 0;
duration.tv_nsec = duration_pb->has_nanos ? duration_pb->nanos : 0;
duration.clock_type = GPR_TIMESPAN;
return duration;
}
void grpc_grpclb_initial_response_destroy(
grpc_grpclb_initial_response *response) {
gpr_free(response);

@ -36,6 +36,7 @@
#include <grpc/slice_buffer.h>
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
@ -58,6 +59,8 @@ typedef struct grpc_grpclb_serverlist {
/** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name);
grpc_grpclb_request *grpc_grpclb_load_report_request_create(
grpc_grpclb_client_stats *client_stats);
/** Protocol Buffers v3-encode \a request */
grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request);
@ -93,6 +96,9 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist);
int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs,
const grpc_grpclb_duration *rhs);
gpr_timespec grpc_grpclb_duration_to_timespec(
grpc_grpclb_duration *duration_pb);
/** Destroy \a initial_response */
void grpc_grpclb_initial_response_destroy(
grpc_grpclb_initial_response *response);

@ -189,7 +189,8 @@ static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_connected_subchannel **target,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;

@ -414,7 +414,8 @@ static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_connected_subchannel **target,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;

@ -781,7 +781,7 @@ grpc_error *grpc_connected_subchannel_create_call(
(*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
const grpc_call_element_args call_args = {.call_stack = callstk,
.server_transport_data = NULL,
.context = NULL,
.context = args->context,
.path = args->path,
.start_time = args->start_time,
.deadline = args->deadline,

@ -119,6 +119,7 @@ typedef struct {
gpr_timespec start_time;
gpr_timespec deadline;
gpr_arena *arena;
grpc_call_context_element *context;
} grpc_connected_subchannel_call_args;
grpc_error *grpc_connected_subchannel_create_call(

@ -50,6 +50,9 @@ typedef enum {
/// Reserved for traffic_class_context.
GRPC_CONTEXT_TRAFFIC,
/// Value is a \a grpc_grpclb_client_stats.
GRPC_GRPCLB_CLIENT_STATS,
GRPC_CONTEXT_COUNT
} grpc_context_index;

@ -280,8 +280,10 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c',
'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',

@ -198,6 +198,7 @@ cc_test(
],
)
cc_test(
name = "grpclb_end2end_test",
srcs = ["grpclb_end2end_test.cc"],

@ -147,12 +147,38 @@ grpc::string Ip4ToPackedString(const char* ip_str) {
return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
}
struct ClientStats {
size_t num_calls_started = 0;
size_t num_calls_finished = 0;
size_t num_calls_finished_with_drop_for_rate_limiting = 0;
size_t num_calls_finished_with_drop_for_load_balancing = 0;
size_t num_calls_finished_with_client_failed_to_send = 0;
size_t num_calls_finished_known_received = 0;
ClientStats& operator+=(const ClientStats& other) {
num_calls_started += other.num_calls_started;
num_calls_finished += other.num_calls_finished;
num_calls_finished_with_drop_for_rate_limiting +=
other.num_calls_finished_with_drop_for_rate_limiting;
num_calls_finished_with_drop_for_load_balancing +=
other.num_calls_finished_with_drop_for_load_balancing;
num_calls_finished_with_client_failed_to_send +=
other.num_calls_finished_with_client_failed_to_send;
num_calls_finished_known_received +=
other.num_calls_finished_known_received;
return *this;
}
};
class BalancerServiceImpl : public BalancerService {
public:
using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
using ResponseDelayPair = std::pair<LoadBalanceResponse, int>;
BalancerServiceImpl() : shutdown_(false) {}
explicit BalancerServiceImpl(int client_load_reporting_interval_seconds)
: client_load_reporting_interval_seconds_(
client_load_reporting_interval_seconds),
shutdown_(false) {}
Status BalanceLoad(ServerContext* context, Stream* stream) override {
LoadBalanceRequest request;
@ -160,16 +186,49 @@ class BalancerServiceImpl : public BalancerService {
IncreaseRequestCount();
gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str());
if (client_load_reporting_interval_seconds_ > 0) {
LoadBalanceResponse initial_response;
initial_response.mutable_initial_response()
->mutable_client_stats_report_interval()
->set_seconds(client_load_reporting_interval_seconds_);
stream->Write(initial_response);
}
std::vector<ResponseDelayPair> responses_and_delays;
{
std::unique_lock<std::mutex> lock(mu_);
responses_and_delays = responses_and_delays_;
}
for (const auto& response_and_delay : responses_and_delays) {
if (shutdown_) break;
SendResponse(stream, response_and_delay.first, response_and_delay.second);
}
if (client_load_reporting_interval_seconds_ > 0) {
request.Clear();
stream->Read(&request);
gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'",
request.DebugString().c_str());
GPR_ASSERT(request.has_client_stats());
client_stats_.num_calls_started +=
request.client_stats().num_calls_started();
client_stats_.num_calls_finished +=
request.client_stats().num_calls_finished();
client_stats_.num_calls_finished_with_drop_for_rate_limiting +=
request.client_stats()
.num_calls_finished_with_drop_for_rate_limiting();
client_stats_.num_calls_finished_with_drop_for_load_balancing +=
request.client_stats()
.num_calls_finished_with_drop_for_load_balancing();
client_stats_.num_calls_finished_with_client_failed_to_send +=
request.client_stats()
.num_calls_finished_with_client_failed_to_send();
client_stats_.num_calls_finished_known_received +=
request.client_stats().num_calls_finished_known_received();
std::lock_guard<std::mutex> lock(mu_);
cond_.notify_one();
}
return Status::OK;
}
@ -194,6 +253,12 @@ class BalancerServiceImpl : public BalancerService {
return response;
}
const ClientStats& WaitForLoadReport() {
std::unique_lock<std::mutex> lock(mu_);
cond_.wait(lock);
return client_stats_;
}
private:
void SendResponse(Stream* stream, const LoadBalanceResponse& response,
int delay_ms) {
@ -206,16 +271,23 @@ class BalancerServiceImpl : public BalancerService {
IncreaseResponseCount();
}
const int client_load_reporting_interval_seconds_;
std::vector<ResponseDelayPair> responses_and_delays_;
std::mutex mu_;
std::condition_variable cond_;
ClientStats client_stats_;
bool shutdown_;
};
class GrpclbEnd2endTest : public ::testing::Test {
protected:
GrpclbEnd2endTest(int num_backends, int num_balancers)
GrpclbEnd2endTest(int num_backends, int num_balancers,
int client_load_reporting_interval_seconds)
: server_host_("localhost"),
num_backends_(num_backends),
num_balancers_(num_balancers) {}
num_balancers_(num_balancers),
client_load_reporting_interval_seconds_(
client_load_reporting_interval_seconds) {}
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@ -227,7 +299,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
}
// Start the load balancers.
for (size_t i = 0; i < num_balancers_; ++i) {
balancers_.emplace_back(new BalancerServiceImpl());
balancers_.emplace_back(
new BalancerServiceImpl(client_load_reporting_interval_seconds_));
balancer_servers_.emplace_back(ServerThread<BalancerService>(
"balancer", server_host_, balancers_.back().get()));
}
@ -261,6 +334,14 @@ class GrpclbEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
ClientStats WaitForLoadReports() {
ClientStats client_stats;
for (const auto& balancer : balancers_) {
client_stats += balancer->WaitForLoadReport();
}
return client_stats;
}
struct AddressData {
int port;
bool is_balancer;
@ -367,6 +448,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
const grpc::string server_host_;
const size_t num_backends_;
const size_t num_balancers_;
const int client_load_reporting_interval_seconds_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
@ -381,7 +463,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
class SingleBalancerTest : public GrpclbEnd2endTest {
public:
SingleBalancerTest() : GrpclbEnd2endTest(4, 1) {}
SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {}
};
TEST_F(SingleBalancerTest, Vanilla) {
@ -505,6 +587,41 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
public:
SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {}
};
TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0);
// Start servers and send 100 RPCs per server.
const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_);
for (const auto& status_and_response : statuses_and_responses) {
EXPECT_TRUE(status_and_response.first.ok());
EXPECT_EQ(status_and_response.second.message(), kMessage_);
}
// Each backend should have gotten 100 requests.
for (size_t i = 0; i < backends_.size(); ++i) {
EXPECT_EQ(100, backend_servers_[i].service_->request_count());
}
// The balancer got a single request.
EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
// and sent a single response.
EXPECT_EQ(1, balancer_servers_[0].service_->response_count());
const ClientStats client_stats = WaitForLoadReports();
EXPECT_EQ(100 * num_backends_, client_stats.num_calls_started);
EXPECT_EQ(100 * num_backends_, client_stats.num_calls_finished);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
EXPECT_EQ(100 * num_backends_,
client_stats.num_calls_finished_known_received);
}
} // namespace
} // namespace testing
} // namespace grpc

@ -910,10 +910,14 @@ src/core/ext/filters/client_channel/http_proxy.c \
src/core/ext/filters/client_channel/http_proxy.h \
src/core/ext/filters/client_channel/lb_policy.c \
src/core/ext/filters/client_channel/lb_policy.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c \
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \

@ -8234,8 +8234,10 @@
"nanopb"
],
"headers": [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
],
@ -8243,10 +8245,14 @@
"language": "c",
"name": "grpc_lb_policy_grpclb",
"src": [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",
@ -8264,8 +8270,10 @@
"nanopb"
],
"headers": [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
],
@ -8273,10 +8281,14 @@
"language": "c",
"name": "grpc_lb_policy_grpclb_secure",
"src": [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c",

@ -475,8 +475,10 @@
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\uri_parser.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\deadline\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\client\chttp2_connector.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\proto\grpc\lb\v1\load_balancer.pb.h" />
<ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb.h" />
@ -915,10 +917,14 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\client\insecure\channel_create_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel_secure.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\proto\grpc\lb\v1\load_balancer.pb.c">

@ -613,12 +613,18 @@
<ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\client\insecure\channel_create_posix.c">
<Filter>src\core\ext\transport\chttp2\client\insecure</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel_secure.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
@ -1334,12 +1340,18 @@
<ClInclude Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\client\chttp2_connector.h">
<Filter>src\core\ext\transport\chttp2\client</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>

@ -444,8 +444,10 @@
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_wrapper.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\proto\grpc\lb\v1\load_balancer.pb.h" />
<ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb.h" />
@ -836,10 +838,14 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\proto\grpc\lb\v1\load_balancer.pb.c">

@ -547,12 +547,18 @@
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.c">
<Filter>src\core\ext\filters\load_reporting</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.c">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClCompile>
@ -1181,12 +1187,18 @@
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.h">
<Filter>src\core\ext\filters\load_reporting</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.h">
<Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter>
</ClInclude>

Loading…
Cancel
Save