Merge branch 'master' into rq-threads

pull/16081/head
Sree Kuchibhotla 7 years ago
commit 7b8be4d6fd
  1. 20
      BUILD
  2. 116
      CMakeLists.txt
  3. 147
      Makefile
  4. 43
      build.yaml
  5. 19
      doc/interop-test-descriptions.md
  6. 2
      examples/csharp/route_guide/RouteGuide/route_guide_db.json
  7. 2
      grpc.def
  8. 13
      grpc.gyp
  9. 23
      include/grpc/grpc.h
  10. 41
      include/grpcpp/ext/channelz_service_plugin.h
  11. 2
      include/grpcpp/impl/codegen/completion_queue.h
  12. 5
      src/android/test/interop/app/src/main/cpp/grpc-interop.cc
  13. 4
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  14. 26
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  15. 26
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  16. 59
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  17. 13
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  18. 21
      src/core/lib/channel/channelz_registry.cc
  19. 220
      src/core/lib/iomgr/executor.cc
  20. 45
      src/core/lib/iomgr/executor.h
  21. 6
      src/core/lib/iomgr/lockfree_event.cc
  22. 5
      src/core/lib/iomgr/resolve_address_posix.cc
  23. 5
      src/core/lib/iomgr/resolve_address_windows.cc
  24. 5
      src/core/lib/security/credentials/alts/check_gcp_environment_linux.cc
  25. 9
      src/core/lib/security/security_connector/security_connector.cc
  26. 57
      src/cpp/server/channelz/channelz_service.cc
  27. 43
      src/cpp/server/channelz/channelz_service.h
  28. 79
      src/cpp/server/channelz/channelz_service_plugin.cc
  29. 14
      src/csharp/Grpc.Core/Grpc.Core.csproj
  30. 24
      src/csharp/Grpc.Core/Internal/NativeExtension.cs
  31. 19
      src/csharp/Grpc.Core/Internal/NativeLogRedirector.cs
  32. 33
      src/csharp/Grpc.Core/Internal/PlatformApis.cs
  33. 21
      src/csharp/Grpc.Core/build/MonoAndroid/Grpc.Core.targets
  34. 0
      src/csharp/Grpc.Core/build/net45/Grpc.Core.targets
  35. 2
      src/csharp/doc/docfx.json
  36. 24
      src/csharp/experimental/build_native_ext_for_android.sh
  37. 1
      src/proto/grpc/testing/echo_messages.proto
  38. 4
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  39. 6
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  40. 10
      test/core/channel/channelz_test.cc
  41. 1
      test/core/security/check_gcp_environment_linux_test.cc
  42. 2
      test/core/surface/public_headers_must_be_c89.c
  43. 21
      test/cpp/end2end/BUILD
  44. 352
      test/cpp/end2end/channelz_service_test.cc
  45. 16
      test/cpp/interop/client.cc
  46. 64
      test/cpp/interop/interop_client.cc
  47. 20
      test/cpp/interop/interop_client.h
  48. 6
      test/cpp/interop/stress_interop_client.cc
  49. 4
      test/cpp/interop/stress_interop_client.h
  50. 11
      test/cpp/interop/stress_test.cc
  51. 4
      test/cpp/server/load_reporter/load_reporter_test.cc
  52. 5
      test/cpp/util/channel_trace_proto_helper.cc
  53. 1
      test/cpp/util/channel_trace_proto_helper.h
  54. 31
      tools/dockerfile/grpc_artifact_android_ndk/Dockerfile
  55. 26
      tools/internal_ci/linux/grpc_publish_packages.cfg
  56. 144
      tools/internal_ci/linux/grpc_publish_packages.sh
  57. 2
      tools/profiling/ios_bin/binary_size.py
  58. 13
      tools/run_tests/artifacts/artifact_targets.py
  59. 1
      tools/run_tests/artifacts/build_artifact_csharp_android.sh
  60. 79
      tools/run_tests/generated/sources_and_headers.json
  61. 24
      tools/run_tests/generated/tests.json

20
BUILD

@ -2053,6 +2053,26 @@ grpc_cc_library(
alwayslink = 1,
)
grpc_cc_library(
name = "grpcpp_channelz",
srcs = [
"src/cpp/server/channelz/channelz_service.cc",
"src/cpp/server/channelz/channelz_service_plugin.cc",
],
hdrs = [
"src/cpp/server/channelz/channelz_service.h",
],
language = "c++",
public_hdrs = [
"include/grpcpp/ext/channelz_service_plugin.h",
],
deps = [
":grpc++",
"//src/proto/grpc/channelz:channelz_proto",
],
alwayslink = 1,
)
grpc_cc_library(
name = "grpc++_test",
public_hdrs = [

@ -551,6 +551,7 @@ add_dependencies(buildtests_cxx channel_arguments_test)
add_dependencies(buildtests_cxx channel_filter_test)
add_dependencies(buildtests_cxx channel_trace_test)
add_dependencies(buildtests_cxx channelz_registry_test)
add_dependencies(buildtests_cxx channelz_service_test)
add_dependencies(buildtests_cxx channelz_test)
add_dependencies(buildtests_cxx check_gcp_environment_linux_test)
add_dependencies(buildtests_cxx check_gcp_environment_windows_test)
@ -4690,6 +4691,73 @@ if (gRPC_INSTALL)
)
endif()
if (gRPC_BUILD_CODEGEN)
add_library(grpcpp_channelz
src/cpp/server/channelz/channelz_service.cc
src/cpp/server/channelz/channelz_service_plugin.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.grpc.pb.h
)
if(WIN32 AND MSVC)
set_target_properties(grpcpp_channelz PROPERTIES COMPILE_PDB_NAME "grpcpp_channelz"
COMPILE_PDB_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}"
)
if (gRPC_INSTALL)
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/grpcpp_channelz.pdb
DESTINATION ${gRPC_INSTALL_LIBDIR} OPTIONAL
)
endif()
endif()
protobuf_generate_grpc_cpp(
src/proto/grpc/channelz/channelz.proto
)
target_include_directories(grpcpp_channelz
PUBLIC $<INSTALL_INTERFACE:${gRPC_INSTALL_INCLUDEDIR}> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(grpcpp_channelz
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++
grpc
)
foreach(_hdr
include/grpcpp/ext/channelz_service_plugin.h
)
string(REPLACE "include/" "" _path ${_hdr})
get_filename_component(_path ${_path} PATH)
install(FILES ${_hdr}
DESTINATION "${gRPC_INSTALL_INCLUDEDIR}/${_path}"
)
endforeach()
endif (gRPC_BUILD_CODEGEN)
if (gRPC_INSTALL)
install(TARGETS grpcpp_channelz EXPORT gRPCTargets
RUNTIME DESTINATION ${gRPC_INSTALL_BINDIR}
LIBRARY DESTINATION ${gRPC_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${gRPC_INSTALL_LIBDIR}
)
endif()
if (gRPC_BUILD_TESTS)
if (gRPC_BUILD_CODEGEN)
@ -10873,6 +10941,54 @@ target_link_libraries(channelz_registry_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(channelz_service_test
test/cpp/end2end/channelz_service_test.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.grpc.pb.h
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
protobuf_generate_grpc_cpp(
src/proto/grpc/channelz/channelz.proto
)
target_include_directories(channelz_service_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(channelz_service_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpcpp_channelz
grpc++_test_util
grpc_test_util
grpc++
grpc
gpr_test_util
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(channelz_test
test/core/channel/channelz_test.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.pb.cc

@ -1142,6 +1142,7 @@ channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test
channel_filter_test: $(BINDIR)/$(CONFIG)/channel_filter_test
channel_trace_test: $(BINDIR)/$(CONFIG)/channel_trace_test
channelz_registry_test: $(BINDIR)/$(CONFIG)/channelz_registry_test
channelz_service_test: $(BINDIR)/$(CONFIG)/channelz_service_test
channelz_test: $(BINDIR)/$(CONFIG)/channelz_test
check_gcp_environment_linux_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_linux_test
check_gcp_environment_windows_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test
@ -1378,12 +1379,12 @@ static: static_c static_cxx
static_c: pc_c pc_c_unsecure cache.mk $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgrpc_cronet.a $(LIBDIR)/$(CONFIG)/libgrpc_unsecure.a
static_cxx: pc_cxx pc_cxx_unsecure cache.mk $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_cronet.a $(LIBDIR)/$(CONFIG)/libgrpc++_error_details.a $(LIBDIR)/$(CONFIG)/libgrpc++_reflection.a $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure.a
static_cxx: pc_cxx pc_cxx_unsecure cache.mk $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_cronet.a $(LIBDIR)/$(CONFIG)/libgrpc++_error_details.a $(LIBDIR)/$(CONFIG)/libgrpc++_reflection.a $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure.a $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a
shared: shared_c shared_cxx
shared_c: pc_c pc_c_unsecure cache.mk $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)address_sorting$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc_cronet$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE)
shared_cxx: pc_cxx pc_cxx_unsecure cache.mk $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)
shared_cxx: pc_cxx pc_cxx_unsecure cache.mk $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_cronet$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_error_details$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)
shared_csharp: shared_c $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc_csharp_ext$(SHARED_VERSION_CSHARP).$(SHARED_EXT_CSHARP)
grpc_csharp_ext: shared_csharp
@ -1640,6 +1641,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/channel_filter_test \
$(BINDIR)/$(CONFIG)/channel_trace_test \
$(BINDIR)/$(CONFIG)/channelz_registry_test \
$(BINDIR)/$(CONFIG)/channelz_service_test \
$(BINDIR)/$(CONFIG)/channelz_test \
$(BINDIR)/$(CONFIG)/check_gcp_environment_linux_test \
$(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \
@ -1818,6 +1820,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/channel_filter_test \
$(BINDIR)/$(CONFIG)/channel_trace_test \
$(BINDIR)/$(CONFIG)/channelz_registry_test \
$(BINDIR)/$(CONFIG)/channelz_service_test \
$(BINDIR)/$(CONFIG)/channelz_test \
$(BINDIR)/$(CONFIG)/check_gcp_environment_linux_test \
$(BINDIR)/$(CONFIG)/check_gcp_environment_windows_test \
@ -2261,6 +2264,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/channel_trace_test || ( echo test channel_trace_test failed ; exit 1 )
$(E) "[RUN] Testing channelz_registry_test"
$(Q) $(BINDIR)/$(CONFIG)/channelz_registry_test || ( echo test channelz_registry_test failed ; exit 1 )
$(E) "[RUN] Testing channelz_service_test"
$(Q) $(BINDIR)/$(CONFIG)/channelz_service_test || ( echo test channelz_service_test failed ; exit 1 )
$(E) "[RUN] Testing channelz_test"
$(Q) $(BINDIR)/$(CONFIG)/channelz_test || ( echo test channelz_test failed ; exit 1 )
$(E) "[RUN] Testing check_gcp_environment_linux_test"
@ -2459,6 +2464,8 @@ ifeq ($(CONFIG),opt)
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc++_reflection.a
$(E) "[STRIP] Stripping libgrpc++_unsecure.a"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure.a
$(E) "[STRIP] Stripping libgrpcpp_channelz.a"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a
endif
strip-shared_c: shared_c
@ -2487,6 +2494,8 @@ ifeq ($(CONFIG),opt)
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_reflection$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)
$(E) "[STRIP] Stripping $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)
$(E) "[STRIP] Stripping $(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
$(Q) $(STRIP) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)
endif
strip-shared_csharp: shared_csharp
@ -2946,6 +2955,9 @@ install-static_cxx: static_cxx strip-static_cxx install-pkg-config_cxx
$(E) "[INSTALL] Installing libgrpc++_unsecure.a"
$(Q) $(INSTALL) -d $(prefix)/lib
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpc++_unsecure.a $(prefix)/lib/libgrpc++_unsecure.a
$(E) "[INSTALL] Installing libgrpcpp_channelz.a"
$(Q) $(INSTALL) -d $(prefix)/lib
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a $(prefix)/lib/libgrpcpp_channelz.a
@ -3047,6 +3059,15 @@ ifeq ($(SYSTEM),MINGW32)
else ifneq ($(SYSTEM),Darwin)
$(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so.6
$(Q) ln -sf $(SHARED_PREFIX)grpc++_unsecure$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpc++_unsecure.so
endif
$(E) "[INSTALL] Installing $(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)"
$(Q) $(INSTALL) -d $(prefix)/lib
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/$(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP)
ifeq ($(SYSTEM),MINGW32)
$(Q) $(INSTALL) $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz$(SHARED_VERSION_CPP)-dll.a $(prefix)/lib/libgrpcpp_channelz.a
else ifneq ($(SYSTEM),Darwin)
$(Q) ln -sf $(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpcpp_channelz.so.6
$(Q) ln -sf $(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(prefix)/lib/libgrpcpp_channelz.so
endif
ifneq ($(SYSTEM),MINGW32)
ifneq ($(SYSTEM),Darwin)
@ -7021,6 +7042,79 @@ ifneq ($(NO_DEPS),true)
endif
LIBGRPCPP_CHANNELZ_SRC = \
src/cpp/server/channelz/channelz_service.cc \
src/cpp/server/channelz/channelz_service_plugin.cc \
$(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc \
PUBLIC_HEADERS_CXX += \
include/grpcpp/ext/channelz_service_plugin.h \
LIBGRPCPP_CHANNELZ_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPCPP_CHANNELZ_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure libraries if you don't have OpenSSL.
$(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a: openssl_dep_error
$(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP): openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build a C++ library if you don't have protobuf - a bit overreached, but still okay.
$(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a: protobuf_dep_error
$(LIBDIR)/$(CONFIG)/$(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP): protobuf_dep_error
else
$(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a: $(ZLIB_DEP) $(OPENSSL_DEP) $(CARES_DEP) $(ADDRESS_SORTING_DEP) $(PROTOBUF_DEP) $(LIBGRPCPP_CHANNELZ_OBJS)
$(E) "[AR] Creating $@"
$(Q) mkdir -p `dirname $@`
$(Q) rm -f $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a
$(Q) $(AR) $(AROPTS) $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a $(LIBGRPCPP_CHANNELZ_OBJS)
ifeq ($(SYSTEM),Darwin)
$(Q) ranlib -no_warning_for_no_symbols $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a
endif
ifeq ($(SYSTEM),MINGW32)
$(LIBDIR)/$(CONFIG)/grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP): $(LIBGRPCPP_CHANNELZ_OBJS) $(ZLIB_DEP) $(CARES_DEP) $(ADDRESS_SORTING_DEP) $(PROTOBUF_DEP) $(LIBDIR)/$(CONFIG)/grpc++$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(OPENSSL_DEP)
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,--output-def=$(LIBDIR)/$(CONFIG)/grpcpp_channelz$(SHARED_VERSION_CPP).def -Wl,--out-implib=$(LIBDIR)/$(CONFIG)/libgrpcpp_channelz$(SHARED_VERSION_CPP)-dll.a -o $(LIBDIR)/$(CONFIG)/grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBGRPCPP_CHANNELZ_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(ADDRESS_SORTING_MERGE_LIBS) $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) -lgrpc++$(SHARED_VERSION_CPP)-dll -lgrpc$(SHARED_VERSION_CORE)-dll
else
$(LIBDIR)/$(CONFIG)/libgrpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP): $(LIBGRPCPP_CHANNELZ_OBJS) $(ZLIB_DEP) $(CARES_DEP) $(ADDRESS_SORTING_DEP) $(PROTOBUF_DEP) $(LIBDIR)/$(CONFIG)/libgrpc++.$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/libgrpc.$(SHARED_EXT_CORE) $(OPENSSL_DEP)
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBGRPCPP_CHANNELZ_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(ADDRESS_SORTING_MERGE_LIBS) $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) -lgrpc++ -lgrpc
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpcpp_channelz.so.1 -o $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBGRPCPP_CHANNELZ_OBJS) $(ZLIB_MERGE_LIBS) $(CARES_MERGE_LIBS) $(ADDRESS_SORTING_MERGE_LIBS) $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) -lgrpc++ -lgrpc
$(Q) ln -sf $(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz$(SHARED_VERSION_CPP).so.1
$(Q) ln -sf $(SHARED_PREFIX)grpcpp_channelz$(SHARED_VERSION_CPP).$(SHARED_EXT_CPP) $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz$(SHARED_VERSION_CPP).so
endif
endif
endif
endif
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(LIBGRPCPP_CHANNELZ_OBJS:.o=.dep)
endif
endif
$(OBJDIR)/$(CONFIG)/src/cpp/server/channelz/channelz_service.o: $(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc
$(OBJDIR)/$(CONFIG)/src/cpp/server/channelz/channelz_service_plugin.o: $(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc
LIBHTTP2_CLIENT_MAIN_SRC = \
$(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc \
$(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc \
@ -16562,6 +16656,53 @@ endif
endif
CHANNELZ_SERVICE_TEST_SRC = \
test/cpp/end2end/channelz_service_test.cc \
$(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc \
CHANNELZ_SERVICE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CHANNELZ_SERVICE_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/channelz_service_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/channelz_service_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/channelz_service_test: $(PROTOBUF_DEP) $(CHANNELZ_SERVICE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(CHANNELZ_SERVICE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/channelz_service_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/channelz_service_test.o: $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/src/proto/grpc/channelz/channelz.o: $(LIBDIR)/$(CONFIG)/libgrpcpp_channelz.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_channelz_service_test: $(CHANNELZ_SERVICE_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(CHANNELZ_SERVICE_TEST_OBJS:.o=.dep)
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/channelz_service_test.o: $(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc
CHANNELZ_TEST_SRC = \
test/core/channel/channelz_test.cc \
$(GENDIR)/src/proto/grpc/channelz/channelz.pb.cc $(GENDIR)/src/proto/grpc/channelz/channelz.grpc.pb.cc \
@ -24517,6 +24658,8 @@ src/cpp/common/secure_channel_arguments.cc: $(OPENSSL_DEP)
src/cpp/common/secure_create_auth_context.cc: $(OPENSSL_DEP)
src/cpp/ext/proto_server_reflection.cc: $(OPENSSL_DEP)
src/cpp/ext/proto_server_reflection_plugin.cc: $(OPENSSL_DEP)
src/cpp/server/channelz/channelz_service.cc: $(OPENSSL_DEP)
src/cpp/server/channelz/channelz_service_plugin.cc: $(OPENSSL_DEP)
src/cpp/server/secure_server_credentials.cc: $(OPENSSL_DEP)
src/cpp/util/core_stats.cc: $(OPENSSL_DEP)
src/cpp/util/error_details.cc: $(OPENSSL_DEP)

@ -1111,10 +1111,6 @@ filegroups:
secure: true
uses:
- grpc_trace
- name: grpc++_channelz_proto
language: c++
src:
- src/proto/grpc/channelz/channelz.proto
- name: grpc++_codegen_base
language: c++
public_headers:
@ -1359,6 +1355,10 @@ filegroups:
deps:
- grpc++
- grpc
- name: grpcpp_channelz_proto
language: c++
src:
- src/proto/grpc/channelz/channelz.proto
libs:
- name: address_sorting
build: all
@ -1851,6 +1851,21 @@ libs:
vs_project_guid: '{B6E81D84-2ACB-41B8-8781-493A944C7817}'
vs_props:
- protoc
- name: grpcpp_channelz
build: all
language: c++
public_headers:
- include/grpcpp/ext/channelz_service_plugin.h
headers:
- src/cpp/server/channelz/channelz_service.h
src:
- src/cpp/server/channelz/channelz_service.cc
- src/cpp/server/channelz/channelz_service_plugin.cc
deps:
- grpc++
- grpc
filegroups:
- grpcpp_channelz_proto
- name: http2_client_main
build: private
language: c++
@ -4295,7 +4310,7 @@ targets:
- gpr_test_util
- gpr
filegroups:
- grpc++_channelz_proto
- grpcpp_channelz_proto
uses:
- grpc++_test
- name: channelz_registry_test
@ -4314,6 +4329,22 @@ targets:
uses:
- grpc++_test
uses_polling: false
- name: channelz_service_test
gtest: true
build: test
language: c++
src:
- test/cpp/end2end/channelz_service_test.cc
deps:
- grpcpp_channelz
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr_test_util
- gpr
filegroups:
- grpcpp_channelz_proto
- name: channelz_test
gtest: true
build: test
@ -4328,7 +4359,7 @@ targets:
- gpr_test_util
- gpr
filegroups:
- grpc++_channelz_proto
- grpcpp_channelz_proto
uses:
- grpc++_test
- name: check_gcp_environment_linux_test

@ -899,6 +899,25 @@ Status: TODO
This test verifies that a client sending faster than a server can drain sees
pushback (i.e., attempts to send succeed only after appropriate delays).
### Experimental Tests
These tests are not yet standardized, and are not yet implemented in all
languages. Therefore they are not part of our interop matrix.
#### rpc_soak
The client performs many large_unary RPCs in sequence over the same channel.
The number of RPCs is configured by the experimental flag, `soak_iterations`.
#### channel_soak
The client performs many large_unary RPCs in sequence. Before each RPC, it
tears down and rebuilds the channel. The number of RPCs is configured by
the experimental flag, `soak_iterations`.
This tests puts stress on several gRPC components; the resolver, the load
balancer, and the RPC hotpath.
### TODO Tests
#### High priority:

@ -1,4 +1,4 @@
[{
[{
"location": {
"latitude": 407838351,
"longitude": -746143763

@ -69,6 +69,8 @@ EXPORTS
grpc_resource_quota_unref
grpc_resource_quota_resize
grpc_resource_quota_arg_vtable
grpc_channelz_get_top_channels
grpc_channelz_get_channel
grpc_insecure_channel_create_from_fd
grpc_server_add_insecure_channel_from_fd
grpc_use_signal

@ -1585,6 +1585,19 @@
'src/compiler/ruby_generator.cc',
],
},
{
'target_name': 'grpcpp_channelz',
'type': 'static_library',
'dependencies': [
'grpc++',
'grpc',
],
'sources': [
'src/cpp/server/channelz/channelz_service.cc',
'src/cpp/server/channelz/channelz_service_plugin.cc',
'src/proto/grpc/channelz/channelz.proto',
],
},
{
'target_name': 'http2_client_main',
'type': 'static_library',

@ -458,6 +458,29 @@ GRPCAPI void grpc_resource_quota_set_max_threads(
*/
GRPCAPI const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void);
/************* CHANNELZ API *************/
/** Channelz is under active development. The following APIs will see some
churn as the feature is implemented. This comment will be removed once
channelz is officially supported, and these APIs become stable. For now
you may track the progress by following this github issue:
https://github.com/grpc/grpc/issues/15340
the following APIs return allocated JSON strings that match the response
objects from the channelz proto, found here:
https://github.com/grpc/grpc/blob/master/src/proto/grpc/channelz/channelz.proto.
For easy conversion to protobuf, The JSON is formatted according to:
https://developers.google.com/protocol-buffers/docs/proto3#json. */
/* Gets all root channels (i.e. channels the application has directly
created). This does not include subchannels nor non-top level channels.
The returned string is allocated and must be freed by the application. */
GRPCAPI char* grpc_channelz_get_top_channels(intptr_t start_channel_id);
/* Returns a single Channel, or else a NOT_FOUND code. The returned string
is allocated and must be freed by the application. */
GRPCAPI char* grpc_channelz_get_channel(intptr_t channel_id);
#ifdef __cplusplus
}
#endif

@ -0,0 +1,41 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPCPP_EXT_CHANNELZ_SERVICE_PLUGIN_H
#define GRPCPP_EXT_CHANNELZ_SERVICE_PLUGIN_H
#include <grpc/support/port_platform.h>
#include <grpcpp/impl/server_builder_plugin.h>
#include <grpcpp/impl/server_initializer.h>
#include <grpcpp/support/config.h>
namespace grpc {
namespace channelz {
namespace experimental {
/// Add channelz server plugin to \a ServerBuilder. This function should
/// be called at static initialization time. This service is experimental
/// for now. Track progress in https://github.com/grpc/grpc/issues/15988.
void InitChannelzService();
} // namespace experimental
} // namespace channelz
} // namespace grpc
#endif // GRPCPP_EXT_CHANNELZ_SERVICE_PLUGIN_H

@ -367,7 +367,7 @@ class ServerCompletionQueue : public CompletionQueue {
protected:
/// Default constructor
ServerCompletionQueue() {}
ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {}
private:
/// \param is_frequently_polled Informs the GRPC library about whether the

@ -45,9 +45,10 @@ std::shared_ptr<grpc::testing::InteropClient> GetClient(const char* host,
credentials = grpc::InsecureChannelCredentials();
}
grpc::testing::ChannelCreationFunc channel_creation_func =
std::bind(grpc::CreateChannel, host_port, credentials);
return std::shared_ptr<grpc::testing::InteropClient>(
new grpc::testing::InteropClient(
grpc::CreateChannel(host_port, credentials), true, false));
new grpc::testing::InteropClient(channel_creation_func, true, false));
}
extern "C" JNIEXPORT jboolean JNICALL

@ -85,12 +85,12 @@ void ClientChannelNode::PopulateChildRefs(grpc_json* json) {
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false);
json_iterator = nullptr;
for (size_t i = 0; i < child_subchannels.size(); ++i) {
for (size_t i = 0; i < child_channels.size(); ++i) {
json_iterator =
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_add_number_string_child(json_iterator, nullptr, "channelId",
child_subchannels[i]);
child_channels[i]);
}
}
}

@ -135,9 +135,8 @@ class GrpcLb : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
// TODO(ncteisen): implement this in a follow up PR
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) override {}
ChildRefsList* child_channels) override;
private:
/// Linked list of pending pick requests. It stores all information needed to
@ -301,6 +300,9 @@ class GrpcLb : public LoadBalancingPolicy {
// The channel for communicating with the LB server.
grpc_channel* lb_channel_ = nullptr;
// Mutex to protect the channel to the LB server. This is used when
// processing a channelz request.
gpr_mu lb_channel_mu_;
grpc_connectivity_state lb_channel_connectivity_;
grpc_closure lb_channel_on_connectivity_changed_;
// Are we already watching the LB channel's connectivity?
@ -1040,6 +1042,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1000)) {
// Initialization.
gpr_mu_init(&lb_channel_mu_);
grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
@ -1078,6 +1081,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
GrpcLb::~GrpcLb() {
GPR_ASSERT(pending_picks_ == nullptr);
GPR_ASSERT(pending_pings_ == nullptr);
gpr_mu_destroy(&lb_channel_mu_);
gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_);
grpc_connectivity_state_destroy(&state_tracker_);
@ -1107,8 +1111,10 @@ void GrpcLb::ShutdownLocked() {
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked.
if (lb_channel_ != nullptr) {
gpr_mu_lock(&lb_channel_mu_);
grpc_channel_destroy(lb_channel_);
lb_channel_ = nullptr;
gpr_mu_unlock(&lb_channel_mu_);
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "grpclb_shutdown");
@ -1279,6 +1285,20 @@ void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
}
}
void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) {
// delegate to the RoundRobin to fill the children subchannels.
rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
mu_guard guard(&lb_channel_mu_);
if (lb_channel_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_);
if (channel_node != nullptr) {
child_channels->push_back(channel_node->channel_uuid());
}
}
}
grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
grpc_error** connectivity_error) {
return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
@ -1322,9 +1342,11 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
if (lb_channel_ == nullptr) {
char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", server_name_);
gpr_mu_lock(&lb_channel_mu_);
lb_channel_ = grpc_client_channel_factory_create_channel(
client_channel_factory(), uri_str,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
gpr_mu_unlock(&lb_channel_mu_);
GPR_ASSERT(lb_channel_ != nullptr);
gpr_free(uri_str);
}

@ -181,7 +181,7 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
}
void PickFirst::ShutdownLocked() {
AutoChildRefsUpdater gaurd(this);
AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
@ -327,30 +327,10 @@ void PickFirst::FillChildRefsForChannelz(
void PickFirst::UpdateChildRefsLocked() {
ChildRefsList cs;
if (subchannel_list_ != nullptr) {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(
subchannel_list_->subchannel(i)->subchannel());
if (subchannel_node != nullptr) {
cs.push_back(subchannel_node->subchannel_uuid());
}
}
}
subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
for (size_t i = 0; i < latest_pending_subchannel_list_->num_subchannels();
++i) {
if (latest_pending_subchannel_list_->subchannel(i)->subchannel() !=
nullptr) {
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(
latest_pending_subchannel_list_->subchannel(i)->subchannel());
if (subchannel_node != nullptr) {
cs.push_back(subchannel_node->subchannel_uuid());
}
}
}
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
mu_guard guard(&child_refs_mu_);

@ -69,9 +69,8 @@ class RoundRobin : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
// TODO(ncteisen): implement this in a follow up PR
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) override {}
ChildRefsList* ignored) override;
private:
~RoundRobin();
@ -183,11 +182,24 @@ class RoundRobin : public LoadBalancingPolicy {
size_t last_ready_index_ = -1; // Index into list of last pick.
};
// Helper class to ensure that any function that modifies the child refs
// data structures will update the channelz snapshot data structures before
// returning.
class AutoChildRefsUpdater {
public:
explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
private:
RoundRobin* rr_;
};
void ShutdownLocked() override;
void StartPickingLocked();
bool DoPickLocked(PickState* pick);
void DrainPendingPicksLocked();
void UpdateChildRefsLocked();
/** list of subchannels */
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
@ -205,10 +217,16 @@ class RoundRobin : public LoadBalancingPolicy {
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker_;
/// Lock and data used to capture snapshots of this channel's child
/// channels and subchannels. This data is consumed by channelz.
gpr_mu child_refs_mu_;
ChildRefsList child_subchannels_;
ChildRefsList child_channels_;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"round_robin");
UpdateLocked(*args.args);
@ -223,6 +241,7 @@ RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
@ -242,6 +261,7 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
}
void RoundRobin::ShutdownLocked() {
AutoChildRefsUpdater guard(this);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
@ -365,6 +385,39 @@ bool RoundRobin::PickLocked(PickState* pick) {
return false;
}
void RoundRobin::FillChildRefsForChannelz(
ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
mu_guard guard(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
// performance when channelz requests are made.
bool found = false;
for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
found = true;
break;
}
}
if (!found) {
child_subchannels_to_fill->push_back(child_subchannels_[i]);
}
}
}
void RoundRobin::UpdateChildRefsLocked() {
ChildRefsList cs;
if (subchannel_list_ != nullptr) {
subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
mu_guard guard(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
@ -455,6 +508,7 @@ void RoundRobin::RoundRobinSubchannelList::
void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
AutoChildRefsUpdater guard(p);
if (num_ready_ > 0) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
@ -611,6 +665,7 @@ void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
AutoChildRefsUpdater guard(this);
if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.

@ -189,6 +189,19 @@ class SubchannelList
// Returns true if the subchannel list is shutting down.
bool shutting_down() const { return shutting_down_; }
// Populates refs_list with the uuids of this SubchannelLists's subchannels.
void PopulateChildRefsList(ChildRefsList* refs_list) {
for (size_t i = 0; i < subchannels_.size(); ++i) {
if (subchannels_[i].subchannel() != nullptr) {
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
if (subchannel_node != nullptr) {
refs_list->push_back(subchannel_node->subchannel_uuid());
}
}
}
}
// Accessors.
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }

@ -121,3 +121,24 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) {
} // namespace channelz
} // namespace grpc_core
char* grpc_channelz_get_top_channels(intptr_t start_channel_id) {
return grpc_core::channelz::ChannelzRegistry::GetTopChannels(
start_channel_id);
}
char* grpc_channelz_get_channel(intptr_t channel_id) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_core::channelz::ChannelzRegistry::GetChannelNode(channel_id);
if (channel_node == nullptr) {
return nullptr;
}
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* channel_json = channel_node->RenderJson();
channel_json->key = "channel";
grpc_json_link_child(json, channel_json, nullptr);
char* json_str = grpc_json_dump_to_string(top_level_json, 0);
grpc_json_destroy(top_level_json);
return json_str;
}

@ -40,19 +40,25 @@
gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \
}
#define EXECUTOR_TRACE0(str) \
if (executor_trace.enabled()) { \
gpr_log(GPR_INFO, "EXECUTOR " str); \
}
grpc_core::TraceFlag executor_trace(false, "executor");
GPR_TLS_DECL(g_this_thread_state);
GrpcExecutor::GrpcExecutor(const char* executor_name) : name_(executor_name) {
GrpcExecutor::GrpcExecutor(const char* name) : name_(name) {
adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
gpr_atm_no_barrier_store(&num_threads_, 0);
gpr_atm_rel_store(&num_threads_, 0);
max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores());
}
void GrpcExecutor::Init() { SetThreading(true); }
size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
size_t GrpcExecutor::RunClosures(const char* executor_name,
grpc_closure_list list) {
size_t n = 0;
grpc_closure* c = list.head;
@ -60,11 +66,11 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
grpc_closure* next = c->next_data.next;
grpc_error* error = c->error_data.error;
#ifndef NDEBUG
EXECUTOR_TRACE("run %p [created by %s:%d]", c, c->file_created,
c->line_created);
EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c,
c->file_created, c->line_created);
c->scheduled = false;
#else
EXECUTOR_TRACE("run %p", c);
EXECUTOR_TRACE("(%s) run %p", executor_name, c);
#endif
c->cb(c->cb_arg, error);
GRPC_ERROR_UNREF(error);
@ -77,17 +83,21 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
}
bool GrpcExecutor::IsThreaded() const {
return gpr_atm_no_barrier_load(&num_threads_) > 0;
return gpr_atm_acq_load(&num_threads_) > 0;
}
void GrpcExecutor::SetThreading(bool threading) {
gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading);
if (threading) {
if (curr_num_threads > 0) return;
if (curr_num_threads > 0) {
EXECUTOR_TRACE("(%s) SetThreading(true). curr_num_threads == 0", name_);
return;
}
GPR_ASSERT(num_threads_ == 0);
gpr_atm_no_barrier_store(&num_threads_, 1);
gpr_atm_rel_store(&num_threads_, 1);
gpr_tls_init(&g_this_thread_state);
thd_state_ = static_cast<ThreadState*>(
gpr_zalloc(sizeof(ThreadState) * max_threads_));
@ -96,6 +106,7 @@ void GrpcExecutor::SetThreading(bool threading) {
gpr_mu_init(&thd_state_[i].mu);
gpr_cv_init(&thd_state_[i].cv);
thd_state_[i].id = i;
thd_state_[i].name = name_;
thd_state_[i].thd = grpc_core::Thread();
thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT;
}
@ -104,7 +115,10 @@ void GrpcExecutor::SetThreading(bool threading) {
grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]);
thd_state_[0].thd.Start();
} else { // !threading
if (curr_num_threads == 0) return;
if (curr_num_threads == 0) {
EXECUTOR_TRACE("(%s) SetThreading(false). curr_num_threads == 0", name_);
return;
}
for (size_t i = 0; i < max_threads_; i++) {
gpr_mu_lock(&thd_state_[i].mu);
@ -121,20 +135,22 @@ void GrpcExecutor::SetThreading(bool threading) {
curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
for (gpr_atm i = 0; i < curr_num_threads; i++) {
thd_state_[i].thd.Join();
EXECUTOR_TRACE(" Thread %" PRIdPTR " of %" PRIdPTR " joined", i,
curr_num_threads);
EXECUTOR_TRACE("(%s) Thread %" PRIdPTR " of %" PRIdPTR " joined", name_,
i + 1, curr_num_threads);
}
gpr_atm_no_barrier_store(&num_threads_, 0);
gpr_atm_rel_store(&num_threads_, 0);
for (size_t i = 0; i < max_threads_; i++) {
gpr_mu_destroy(&thd_state_[i].mu);
gpr_cv_destroy(&thd_state_[i].cv);
RunClosures(thd_state_[i].elems);
RunClosures(thd_state_[i].name, thd_state_[i].elems);
}
gpr_free(thd_state_);
gpr_tls_destroy(&g_this_thread_state);
}
EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading);
}
void GrpcExecutor::Shutdown() { SetThreading(false); }
@ -147,8 +163,8 @@ void GrpcExecutor::ThreadMain(void* arg) {
size_t subtract_depth = 0;
for (;;) {
EXECUTOR_TRACE("[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", ts->id,
subtract_depth);
EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")",
ts->name, ts->id, subtract_depth);
gpr_mu_lock(&ts->mu);
ts->depth -= subtract_depth;
@ -159,7 +175,7 @@ void GrpcExecutor::ThreadMain(void* arg) {
}
if (ts->shutdown) {
EXECUTOR_TRACE("[%" PRIdPTR "]: shutdown", ts->id);
EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: shutdown", ts->name, ts->id);
gpr_mu_unlock(&ts->mu);
break;
}
@ -169,10 +185,10 @@ void GrpcExecutor::ThreadMain(void* arg) {
ts->elems = GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
EXECUTOR_TRACE("[%" PRIdPTR "]: execute", ts->id);
EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: execute", ts->name, ts->id);
grpc_core::ExecCtx::Get()->InvalidateNow();
subtract_depth = RunClosures(closures);
subtract_depth = RunClosures(ts->name, closures);
}
}
@ -188,16 +204,16 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
do {
retry_push = false;
size_t cur_thread_count =
static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_));
static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
// If the number of threads is zero(i.e either the executor is not threaded
// or already shutdown), then queue the closure on the exec context itself
if (cur_thread_count == 0) {
#ifndef NDEBUG
EXECUTOR_TRACE("schedule %p (created %s:%d) inline", closure,
EXECUTOR_TRACE("(%s) schedule %p (created %s:%d) inline", name_, closure,
closure->file_created, closure->line_created);
#else
EXECUTOR_TRACE("schedule %p inline", closure);
EXECUTOR_TRACE("(%s) schedule %p inline", name_, closure);
#endif
grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
closure, error);
@ -213,18 +229,18 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
}
ThreadState* orig_ts = ts;
bool try_new_thread = false;
for (;;) {
#ifndef NDEBUG
EXECUTOR_TRACE(
"try to schedule %p (%s) (created %s:%d) to thread "
"(%s) try to schedule %p (%s) (created %s:%d) to thread "
"%" PRIdPTR,
closure, is_short ? "short" : "long", closure->file_created,
name_, closure, is_short ? "short" : "long", closure->file_created,
closure->line_created, ts->id);
#else
EXECUTOR_TRACE("try to schedule %p (%s) to thread %" PRIdPTR, closure,
is_short ? "short" : "long", ts->id);
EXECUTOR_TRACE("(%s) try to schedule %p (%s) to thread %" PRIdPTR, name_,
closure, is_short ? "short" : "long", ts->id);
#endif
gpr_mu_lock(&ts->mu);
@ -236,18 +252,22 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
size_t idx = ts->id;
ts = &thd_state_[(idx + 1) % cur_thread_count];
if (ts == orig_ts) {
// We cycled through all the threads. Retry enqueue again (by creating
// a new thread)
// We cycled through all the threads. Retry enqueue again by creating
// a new thread
//
// TODO (sreek): There is a potential issue here. We are
// unconditionally setting try_new_thread to true here. What if the
// executor is shutdown OR if cur_thread_count is already equal to
// max_threads ?
// (Fortunately, this is not an issue yet (as of july 2018) because
// there is only one instance of long job in gRPC and hence we will
// not hit this code path)
retry_push = true;
// TODO (sreek): What if the executor is shutdown OR if
// cur_thread_count is already equal to max_threads ? (currently - as
// of July 2018, we do not run in to this issue because there is only
// one instance of long job in gRPC. This has to be fixed soon)
try_new_thread = true;
break;
}
continue;
continue; // Try the next thread-state
}
// == Found the thread state (i.e thread) to enqueue this closure! ==
@ -277,13 +297,11 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
}
if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) {
cur_thread_count =
static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_));
cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
if (cur_thread_count < max_threads_) {
// Increment num_threads (Safe to do a no_barrier_store instead of a
// cas because we always increment num_threads under the
// 'adding_thread_lock')
gpr_atm_no_barrier_store(&num_threads_, cur_thread_count + 1);
// Increment num_threads (safe to do a store instead of a cas because we
// always increment num_threads under the 'adding_thread_lock')
gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
thd_state_[cur_thread_count].thd = grpc_core::Thread(
name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]);
@ -298,60 +316,118 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
} while (retry_push);
}
static GrpcExecutor* global_executor;
static GrpcExecutor* executors[GRPC_NUM_EXECUTORS];
void enqueue_long(grpc_closure* closure, grpc_error* error) {
global_executor->Enqueue(closure, error, false /* is_short */);
void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
true /* is_short */);
}
void enqueue_short(grpc_closure* closure, grpc_error* error) {
global_executor->Enqueue(closure, error, true /* is_short */);
void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
false /* is_short */);
}
// Short-Job executor scheduler
static const grpc_closure_scheduler_vtable global_executor_vtable_short = {
enqueue_short, enqueue_short, "executor-short"};
static grpc_closure_scheduler global_scheduler_short = {
&global_executor_vtable_short};
void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
true /* is_short */);
}
// Long-job executor scheduler
static const grpc_closure_scheduler_vtable global_executor_vtable_long = {
enqueue_long, enqueue_long, "executor-long"};
static grpc_closure_scheduler global_scheduler_long = {
&global_executor_vtable_long};
void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
false /* is_short */);
}
static const grpc_closure_scheduler_vtable
vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
{{&default_enqueue_short, &default_enqueue_short, "def-ex-short"},
{&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
{{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"},
{&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}};
static grpc_closure_scheduler
schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
{{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]},
{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}},
{{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]},
{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}};
// grpc_executor_init() and grpc_executor_shutdown() functions are called in the
// the grpc_init() and grpc_shutdown() code paths which are protected by a
// global mutex. So it is okay to assume that these functions are thread-safe
void grpc_executor_init() {
if (global_executor != nullptr) {
// grpc_executor_init() already called once (and grpc_executor_shutdown()
// wasn't called)
EXECUTOR_TRACE0("grpc_executor_init() enter");
// Return if grpc_executor_init() is already called earlier
if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) {
GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr);
return;
}
global_executor = grpc_core::New<GrpcExecutor>("global-executor");
global_executor->Init();
executors[GRPC_DEFAULT_EXECUTOR] =
grpc_core::New<GrpcExecutor>("default-executor");
executors[GRPC_RESOLVER_EXECUTOR] =
grpc_core::New<GrpcExecutor>("resolver-executor");
executors[GRPC_DEFAULT_EXECUTOR]->Init();
executors[GRPC_RESOLVER_EXECUTOR]->Init();
EXECUTOR_TRACE0("grpc_executor_init() done");
}
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
GrpcExecutorJobType job_type) {
return &schedulers_[executor_type][job_type];
}
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type);
}
void grpc_executor_shutdown() {
// Shutdown already called
if (global_executor == nullptr) {
EXECUTOR_TRACE0("grpc_executor_shutdown() enter");
// Return if grpc_executor_shutdown() is already called earlier
if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) {
GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr);
return;
}
global_executor->Shutdown();
grpc_core::Delete<GrpcExecutor>(global_executor);
global_executor = nullptr;
executors[GRPC_DEFAULT_EXECUTOR]->Shutdown();
executors[GRPC_RESOLVER_EXECUTOR]->Shutdown();
// Delete the executor objects.
//
// NOTE: It is important to call Shutdown() on all executors first before
// calling Delete() because it is possible for one executor (that is not
// shutdown yet) to call Enqueue() on a different executor which is already
// shutdown. This is legal and in such cases, the Enqueue() operation
// effectively "fails" and enqueues that closure on the calling thread's
// exec_ctx.
//
// By ensuring that all executors are shutdown first, we are also ensuring
// that no thread is active across all executors.
grpc_core::Delete<GrpcExecutor>(executors[GRPC_DEFAULT_EXECUTOR]);
grpc_core::Delete<GrpcExecutor>(executors[GRPC_RESOLVER_EXECUTOR]);
executors[GRPC_DEFAULT_EXECUTOR] = nullptr;
executors[GRPC_RESOLVER_EXECUTOR] = nullptr;
EXECUTOR_TRACE0("grpc_executor_shutdown() done");
}
bool grpc_executor_is_threaded() { return global_executor->IsThreaded(); }
bool grpc_executor_is_threaded(GrpcExecutorType executor_type) {
GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS);
return executors[executor_type]->IsThreaded();
}
void grpc_executor_set_threading(bool enable) {
global_executor->SetThreading(enable);
bool grpc_executor_is_threaded() {
return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR);
}
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short
: &global_scheduler_long;
void grpc_executor_set_threading(bool enable) {
EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable);
for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) {
executors[i]->SetThreading(enable);
}
}

@ -27,7 +27,8 @@
typedef struct {
gpr_mu mu;
size_t id; // For debugging purposes
size_t id; // For debugging purposes
const char* name; // Thread state name
gpr_cv cv;
grpc_closure_list elems;
size_t depth; // Number of closures in the closure list
@ -36,7 +37,11 @@ typedef struct {
grpc_core::Thread thd;
} ThreadState;
typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } GrpcExecutorJobType;
typedef enum {
GRPC_EXECUTOR_SHORT = 0,
GRPC_EXECUTOR_LONG,
GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this
} GrpcExecutorJobType;
class GrpcExecutor {
public:
@ -58,7 +63,7 @@ class GrpcExecutor {
void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short);
private:
static size_t RunClosures(grpc_closure_list list);
static size_t RunClosures(const char* executor_name, grpc_closure_list list);
static void ThreadMain(void* arg);
const char* name_;
@ -70,14 +75,42 @@ class GrpcExecutor {
// == Global executor functions ==
typedef enum {
GRPC_DEFAULT_EXECUTOR = 0,
GRPC_RESOLVER_EXECUTOR,
GRPC_NUM_EXECUTORS // Add new values above this
} GrpcExecutorType;
// TODO(sreek): Currently we have two executors (available globally): The
// default executor and the resolver executor.
//
// Some of the functions below operate on the DEFAULT executor only while some
// operate of ALL the executors. This is a bit confusing and should be cleaned
// up in future (where we make all the following functions take executor_type
// and/or job_type)
// Initialize ALL the executors
void grpc_executor_init();
// Shutdown ALL the executors
void grpc_executor_shutdown();
// Set the threading mode for ALL the executors
void grpc_executor_set_threading(bool enable);
// Get the DEFAULT executor scheduler for the given job_type
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type);
void grpc_executor_shutdown();
// Get the executor scheduler for a given executor_type and a job_type
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
GrpcExecutorJobType job_type);
bool grpc_executor_is_threaded();
// Return if a given executor is running in threaded mode (i.e if
// grpc_executor_set_threading(true) was called previously on that executor)
bool grpc_executor_is_threaded(GrpcExecutorType executor_type);
void grpc_executor_set_threading(bool enable);
// Return if the DEFAULT executor is threaded
bool grpc_executor_is_threaded();
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */

@ -89,7 +89,11 @@ void LockfreeEvent::DestroyEvent() {
void LockfreeEvent::NotifyOn(grpc_closure* closure) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_);
/* This load needs to be an acquire load because this can be a shutdown
* error that we might need to reference. Adding acquire semantics makes
* sure that the shutdown error has been initialized properly before us
* referencing it. */
gpr_atm curr = gpr_atm_acq_load(&state_);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this,
(void*)curr, closure);

@ -166,8 +166,9 @@ static void posix_resolve_address(const char* name, const char* default_port,
grpc_closure* on_done,
grpc_resolved_addresses** addrs) {
request* r = static_cast<request*>(gpr_malloc(sizeof(request)));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
GRPC_CLOSURE_INIT(
&r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;

@ -151,8 +151,9 @@ static void windows_resolve_address(const char* name, const char* default_port,
grpc_closure* on_done,
grpc_resolved_addresses** addresses) {
request* r = (request*)gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
GRPC_CLOSURE_INIT(
&r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;

@ -41,8 +41,9 @@ namespace internal {
bool check_bios_data(const char* bios_data_file) {
char* bios_data = read_bios_file(bios_data_file);
bool result = (!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GOOGLE)) ||
(!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GCE));
bool result =
bios_data && ((!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GOOGLE)) ||
(!strcmp(bios_data, GRPC_ALTS_EXPECT_NAME_GCE)));
gpr_free(bios_data);
return result;
}

@ -57,6 +57,10 @@ static const char* installed_roots_path =
INSTALL_PREFIX "/share/grpc/roots.pem";
#endif
#ifndef TSI_OPENSSL_ALPN_SUPPORT
#define TSI_OPENSSL_ALPN_SUPPORT 1
#endif
/* -- Overridden default roots. -- */
static grpc_ssl_roots_override_callback ssl_roots_override_cb = nullptr;
@ -850,7 +854,8 @@ grpc_auth_context* grpc_ssl_peer_to_auth_context(const tsi_peer* peer) {
static grpc_error* ssl_check_peer(grpc_security_connector* sc,
const char* peer_name, const tsi_peer* peer,
grpc_auth_context** auth_context) {
/* Check the ALPN. */
#if TSI_OPENSSL_ALPN_SUPPORT
/* Check the ALPN if ALPN is supported. */
const tsi_peer_property* p =
tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL);
if (p == nullptr) {
@ -861,7 +866,7 @@ static grpc_error* ssl_check_peer(grpc_security_connector* sc,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Cannot check peer: invalid ALPN value.");
}
#endif /* TSI_OPENSSL_ALPN_SUPPORT */
/* Check the peer name if specified. */
if (peer_name != nullptr && !grpc_ssl_host_matches_name(peer, peer_name)) {
char* msg;

@ -0,0 +1,57 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/server/channelz/channelz_service.h"
#include <google/protobuf/text_format.h>
#include <google/protobuf/util/json_util.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
namespace grpc {
Status ChannelzService::GetTopChannels(
ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
channelz::v1::GetTopChannelsResponse* response) {
char* json_str = grpc_channelz_get_top_channels(request->start_channel_id());
google::protobuf::util::Status s =
google::protobuf::util::JsonStringToMessage(json_str, response);
gpr_free(json_str);
if (s != google::protobuf::util::Status::OK) {
return Status(INTERNAL, s.ToString());
}
return Status::OK;
}
Status ChannelzService::GetChannel(
ServerContext* unused, const channelz::v1::GetChannelRequest* request,
channelz::v1::GetChannelResponse* response) {
char* json_str = grpc_channelz_get_channel(request->channel_id());
google::protobuf::util::Status s =
google::protobuf::util::JsonStringToMessage(json_str, response);
gpr_free(json_str);
if (s != google::protobuf::util::Status::OK) {
return Status(INTERNAL, s.ToString());
}
return Status::OK;
}
} // namespace grpc

@ -0,0 +1,43 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
#define GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H
#include <grpc/support/port_platform.h>
#include <grpcpp/grpcpp.h>
#include "src/proto/grpc/channelz/channelz.grpc.pb.h"
namespace grpc {
class ChannelzService final : public channelz::v1::Channelz::Service {
private:
// implementation of GetTopChannels rpc
Status GetTopChannels(
ServerContext* unused, const channelz::v1::GetTopChannelsRequest* request,
channelz::v1::GetTopChannelsResponse* response) override;
// implementation of GetChannel rpc
Status GetChannel(ServerContext* unused,
const channelz::v1::GetChannelRequest* request,
channelz::v1::GetChannelResponse* response) override;
};
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_SERVER_CHANNELZ_SERVICE_H

@ -0,0 +1,79 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <grpcpp/ext/channelz_service_plugin.h>
#include <grpcpp/impl/server_builder_plugin.h>
#include <grpcpp/impl/server_initializer.h>
#include <grpcpp/server.h>
#include "src/cpp/server/channelz/channelz_service.h"
namespace grpc {
namespace channelz {
namespace experimental {
class ChannelzServicePlugin : public ::grpc::ServerBuilderPlugin {
public:
ChannelzServicePlugin() : channelz_service_(new grpc::ChannelzService()) {}
grpc::string name() override { return "channelz_service"; }
void InitServer(grpc::ServerInitializer* si) override {
si->RegisterService(channelz_service_);
}
void Finish(grpc::ServerInitializer* si) override {}
void ChangeArguments(const grpc::string& name, void* value) override {}
bool has_sync_methods() const override {
if (channelz_service_) {
return channelz_service_->has_synchronous_methods();
}
return false;
}
bool has_async_methods() const override {
if (channelz_service_) {
return channelz_service_->has_async_methods();
}
return false;
}
private:
std::shared_ptr<grpc::ChannelzService> channelz_service_;
};
static std::unique_ptr< ::grpc::ServerBuilderPlugin>
CreateChannelzServicePlugin() {
return std::unique_ptr< ::grpc::ServerBuilderPlugin>(
new ChannelzServicePlugin());
}
void InitChannelzService() {
static bool already_here = false;
if (already_here) return;
already_here = true;
::grpc::ServerBuilder::InternalAddPluginFactory(&CreateChannelzServicePlugin);
}
} // namespace experimental
} // namespace channelz
} // namespace grpc

@ -46,10 +46,22 @@
<PackagePath>runtimes/win/native/grpc_csharp_ext.x86.dll</PackagePath>
<Pack>true</Pack>
</Content>
<Content Include="Grpc.Core.targets">
<Content Include="..\nativelibs\csharp_ext_linux_android_armeabi-v7a\libgrpc_csharp_ext.so">
<PackagePath>runtimes/monoandroid/armeabi-v7a/libgrpc_csharp_ext.so</PackagePath>
<Pack>true</Pack>
</Content>
<Content Include="..\nativelibs\csharp_ext_linux_android_arm64-v8a\libgrpc_csharp_ext.so">
<PackagePath>runtimes/monoandroid/arm64-v8a/libgrpc_csharp_ext.so</PackagePath>
<Pack>true</Pack>
</Content>
<Content Include="build\net45\Grpc.Core.targets">
<PackagePath>build/net45/</PackagePath>
<Pack>true</Pack>
</Content>
<Content Include="build\MonoAndroid\Grpc.Core.targets">
<PackagePath>build/MonoAndroid/</PackagePath>
<Pack>true</Pack>
</Content>
</ItemGroup>
<ItemGroup>

@ -106,7 +106,15 @@ namespace Grpc.Core.Internal
/// </summary>
private static NativeMethods LoadNativeMethods()
{
return PlatformApis.IsUnity ? LoadNativeMethodsUnity() : new NativeMethods(LoadUnmanagedLibrary());
if (PlatformApis.IsUnity)
{
return LoadNativeMethodsUnity();
}
if (PlatformApis.IsXamarin)
{
return LoadNativeMethodsXamarin();
}
return new NativeMethods(LoadUnmanagedLibrary());
}
/// <summary>
@ -128,6 +136,20 @@ namespace Grpc.Core.Internal
}
}
/// <summary>
/// Return native method delegates when running on the Xamarin platform.
/// WARNING: Xamarin support is experimental and work-in-progress. Don't expect it to work.
/// </summary>
private static NativeMethods LoadNativeMethodsXamarin()
{
if (PlatformApis.IsXamarinAndroid)
{
return new NativeMethods(new NativeMethods.DllImportsFromSharedLib());
}
// not tested yet
return new NativeMethods(new NativeMethods.DllImportsFromStaticLib());
}
private static string GetAssemblyPath()
{
var assembly = typeof(NativeExtension).GetTypeInfo().Assembly;

@ -51,6 +51,7 @@ namespace Grpc.Core.Internal
}
}
[MonoPInvokeCallback(typeof(GprLogDelegate))]
private static void HandleWrite(IntPtr fileStringPtr, int line, ulong threadId, IntPtr severityStringPtr, IntPtr msgPtr)
{
try
@ -86,4 +87,22 @@ namespace Grpc.Core.Internal
}
}
}
/// <summary>
/// Use this attribute to mark methods that will be called back from P/Invoke calls.
/// iOS (and probably other AOT platforms) needs to have delegates registered.
/// Instead of depending on Xamarin.iOS for this, we can just create our own,
/// the iOS runtime just checks for the type name.
/// See: https://docs.microsoft.com/en-gb/xamarin/ios/internals/limitations#reverse-callbacks
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
internal sealed class MonoPInvokeCallbackAttribute : Attribute
{
public MonoPInvokeCallbackAttribute(Type type)
{
Type = type;
}
public Type Type { get; private set; }
}
}

@ -33,12 +33,17 @@ namespace Grpc.Core.Internal
internal static class PlatformApis
{
const string UnityEngineApplicationClassName = "UnityEngine.Application, UnityEngine";
const string XamarinAndroidActivityClassName = "Android.App.Activity, Mono.Android";
const string XamariniOSEnumClassName = "Mono.CSharp.Enum, Mono.CSharp";
static readonly bool isLinux;
static readonly bool isMacOSX;
static readonly bool isWindows;
static readonly bool isMono;
static readonly bool isNetCore;
static readonly bool isUnity;
static readonly bool isXamarin;
static readonly bool isXamariniOS;
static readonly bool isXamarinAndroid;
static PlatformApis()
{
@ -58,6 +63,9 @@ namespace Grpc.Core.Internal
#endif
isMono = Type.GetType("Mono.Runtime") != null;
isUnity = Type.GetType(UnityEngineApplicationClassName) != null;
isXamariniOS = Type.GetType(XamariniOSEnumClassName) != null;
isXamarinAndroid = Type.GetType(XamarinAndroidActivityClassName) != null;
isXamarin = isXamariniOS || isXamarinAndroid;
}
public static bool IsLinux
@ -88,6 +96,31 @@ namespace Grpc.Core.Internal
get { return isUnity; }
}
/// <summary>
/// true if running on a Xamarin platform (either Xamarin.Android or Xamarin.iOS),
/// false otherwise.
/// </summary>
public static bool IsXamarin
{
get { return isXamarin; }
}
/// <summary>
/// true if running on Xamarin.iOS, false otherwise.
/// </summary>
public static bool IsXamariniOS
{
get { return isXamariniOS; }
}
/// <summary>
/// true if running on Xamarin.Android, false otherwise.
/// </summary>
public static bool IsXamarinAndroid
{
get { return isXamarinAndroid; }
}
/// <summary>
/// true if running on .NET Core (CoreCLR), false otherwise.
/// </summary>

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="utf-8"?>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<_GrpcCoreNugetNativePath Condition="'$(_GrpcCoreNugetNativePath)' == ''">$(MSBuildThisFileDirectory)..\..\</_GrpcCoreNugetNativePath>
</PropertyGroup>
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == 'MonoAndroid'">
<AndroidNativeLibrary Include="$(_GrpcCoreNugetNativePath)runtimes\monoandroid\arm64-v8a\libgrpc_csharp_ext.so">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
<Abi>arm64-v8a</Abi>
</AndroidNativeLibrary>
</ItemGroup>
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == 'MonoAndroid'">
<AndroidNativeLibrary Include="$(_GrpcCoreNugetNativePath)runtimes\monoandroid\armeabi-v7a\libgrpc_csharp_ext.so">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
<Abi>armeabi-v7a</Abi>
</AndroidNativeLibrary>
</ItemGroup>
</Project>

@ -24,7 +24,7 @@
"dest": "api"
},
{
"files": [ "toc.yml"],
"files": [ "toc.yml"]
}
],
"globalMetadata": {

@ -23,17 +23,29 @@ mkdir -p build
cd build
# set to the location where Android SDK is installed
# e.g. ANDROID_NDK_PATH="$HOME/android-ndk-r16b"
# e.g. ANDROID_SDK_PATH="$HOME/Android/Sdk"
cmake ../.. \
-DCMAKE_SYSTEM_NAME=Android \
-DCMAKE_SYSTEM_VERSION=15 \
-DCMAKE_ANDROID_ARCH_ABI=armeabi-v7a \
# set to location where Android NDK is installed, usually a subfolder of Android SDK
# to install the Android NKD, use the "sdkmanager" tool
# e.g. ANDROID_NDK_PATH=${ANDROID_SDK_PATH}/ndk-bundle
# set to location of the cmake executable from the Android SDK
# to install cmake, use the "sdkmanager" tool
# e.g. ANDROID_SDK_CMAKE=${ANDROID_SDK_PATH}/cmake/3.6.4111459/bin/cmake
# ANDROID_ABI in ('arm64-v8a', 'armeabi-v7a')
# e.g. ANDROID_ABI=armeabi-v7a
${ANDROID_SDK_CMAKE} ../.. \
-DCMAKE_TOOLCHAIN_FILE="${ANDROID_NDK_PATH}/build/cmake/android.toolchain.cmake" \
-DCMAKE_ANDROID_NDK="${ANDROID_NDK_PATH}" \
-DCMAKE_ANDROID_STL_TYPE=c++_static \
-DRUN_HAVE_POSIX_REGEX=0 \
-DRUN_HAVE_STD_REGEX=0 \
-DRUN_HAVE_STEADY_CLOCK=0 \
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_BUILD_TYPE=Release \
-DANDROID_PLATFORM=android-28 \
-DANDROID_ABI="${ANDROID_ABI}" \
-DANDROID_NDK="${ANDROID_NDK_PATH}"
make -j4 grpc_csharp_ext

@ -46,6 +46,7 @@ message RequestParams {
string binary_error_details = 13;
ErrorStatus expected_error = 14;
int32 server_sleep_us = 15; // Amount to sleep when invoking server
int32 backend_channel_idx = 16; // which backend to send request to
}
message EchoRequest {

@ -92,6 +92,8 @@ grpc_resource_quota_ref_type grpc_resource_quota_ref_import;
grpc_resource_quota_unref_type grpc_resource_quota_unref_import;
grpc_resource_quota_resize_type grpc_resource_quota_resize_import;
grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import;
grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import;
grpc_use_signal_type grpc_use_signal_import;
@ -340,6 +342,8 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_resource_quota_unref_import = (grpc_resource_quota_unref_type) GetProcAddress(library, "grpc_resource_quota_unref");
grpc_resource_quota_resize_import = (grpc_resource_quota_resize_type) GetProcAddress(library, "grpc_resource_quota_resize");
grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable");
grpc_channelz_get_top_channels_import = (grpc_channelz_get_top_channels_type) GetProcAddress(library, "grpc_channelz_get_top_channels");
grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel");
grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd");
grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd");
grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal");

@ -251,6 +251,12 @@ extern grpc_resource_quota_resize_type grpc_resource_quota_resize_import;
typedef const grpc_arg_pointer_vtable*(*grpc_resource_quota_arg_vtable_type)(void);
extern grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
#define grpc_resource_quota_arg_vtable grpc_resource_quota_arg_vtable_import
typedef char*(*grpc_channelz_get_top_channels_type)(intptr_t start_channel_id);
extern grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import;
#define grpc_channelz_get_top_channels grpc_channelz_get_top_channels_import
typedef char*(*grpc_channelz_get_channel_type)(intptr_t channel_id);
extern grpc_channelz_get_channel_type grpc_channelz_get_channel_import;
#define grpc_channelz_get_channel grpc_channelz_get_channel_import
typedef grpc_channel*(*grpc_insecure_channel_create_from_fd_type)(const char* target, int fd, const grpc_channel_args* args);
extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
#define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import

@ -95,6 +95,11 @@ void ValidateGetTopChannels(size_t expected_channels) {
EXPECT_EQ(end->type, GRPC_JSON_TRUE);
grpc_json_destroy(parsed_json);
gpr_free(json_str);
// also check that the core API formats this correctly
char* core_api_json_str = grpc_channelz_get_top_channels(0);
grpc::testing::ValidateGetTopChannelsResponseProtoJsonTranslation(
core_api_json_str);
gpr_free(core_api_json_str);
}
class ChannelFixture {
@ -151,6 +156,11 @@ void ValidateChannel(ChannelNode* channel, validate_channel_data_args args) {
grpc::testing::ValidateChannelProtoJsonTranslation(json_str);
ValidateCounters(json_str, args);
gpr_free(json_str);
// also check that the core API formats this the correct way
char* core_api_json_str = grpc_channelz_get_channel(channel->channel_uuid());
grpc::testing::ValidateGetChannelResponseProtoJsonTranslation(
core_api_json_str);
gpr_free(core_api_json_str);
}
grpc_millis GetLastCallStartedMillis(ChannelNode* channel) {

@ -69,6 +69,7 @@ static void test_gcp_environment_check_failure() {
GPR_ASSERT(!check_bios_data_linux_test("Amazon"));
GPR_ASSERT(!check_bios_data_linux_test("Google-Chrome\t\t"));
GPR_ASSERT(!check_bios_data_linux_test("Amazon"));
GPR_ASSERT(!check_bios_data_linux_test("\n"));
}
int main(int argc, char** argv) {

@ -131,6 +131,8 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_resource_quota_unref);
printf("%lx", (unsigned long) grpc_resource_quota_resize);
printf("%lx", (unsigned long) grpc_resource_quota_arg_vtable);
printf("%lx", (unsigned long) grpc_channelz_get_top_channels);
printf("%lx", (unsigned long) grpc_channelz_get_channel);
printf("%lx", (unsigned long) grpc_auth_property_iterator_next);
printf("%lx", (unsigned long) grpc_auth_context_property_iterator);
printf("%lx", (unsigned long) grpc_auth_context_peer_identity);

@ -119,6 +119,27 @@ grpc_cc_library(
],
)
grpc_cc_test(
name = "channelz_service_test",
srcs = ["channelz_service_test.cc"],
external_deps = [
"gtest",
],
deps = [
":test_service_impl",
"//:gpr",
"//:grpc",
"//:grpc++",
"//:grpcpp_channelz",
"//src/proto/grpc/channelz:channelz_proto",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "server_early_return_test",
srcs = ["server_early_return_test.cc"],

@ -0,0 +1,352 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <grpcpp/ext/channelz_service_plugin.h>
#include "src/proto/grpc/channelz/channelz.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
#include <gtest/gtest.h>
using grpc::channelz::v1::GetChannelRequest;
using grpc::channelz::v1::GetChannelResponse;
using grpc::channelz::v1::GetTopChannelsRequest;
using grpc::channelz::v1::GetTopChannelsResponse;
namespace grpc {
namespace testing {
namespace {
// Proxy service supports N backends. Sends RPC to backend dictated by
// request->backend_channel_idx().
class Proxy : public ::grpc::testing::EchoTestService::Service {
public:
Proxy() {}
void AddChannelToBackend(const std::shared_ptr<Channel>& channel) {
stubs_.push_back(grpc::testing::EchoTestService::NewStub(channel));
}
Status Echo(ServerContext* server_context, const EchoRequest* request,
EchoResponse* response) override {
std::unique_ptr<ClientContext> client_context =
ClientContext::FromServerContext(*server_context);
size_t idx = request->param().backend_channel_idx();
GPR_ASSERT(idx < stubs_.size());
return stubs_[idx]->Echo(client_context.get(), *request, response);
}
private:
std::vector<std::unique_ptr<::grpc::testing::EchoTestService::Stub>> stubs_;
};
} // namespace
class ChannelzServerTest : public ::testing::Test {
public:
ChannelzServerTest() {}
void SetUp() override {
// ensure channel server is brought up on all severs we build.
::grpc::channelz::experimental::InitChannelzService();
// We set up a proxy server with channelz enabled.
proxy_port_ = grpc_pick_unused_port_or_die();
ServerBuilder proxy_builder;
grpc::string proxy_server_address = "localhost:" + to_string(proxy_port_);
proxy_builder.AddListeningPort(proxy_server_address,
InsecureServerCredentials());
// forces channelz and channel tracing to be enabled.
proxy_builder.AddChannelArgument(GRPC_ARG_ENABLE_CHANNELZ, 1);
proxy_builder.AddChannelArgument(GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE,
10);
proxy_builder.RegisterService(&proxy_service_);
proxy_server_ = proxy_builder.BuildAndStart();
}
// Sets the proxy up to have an arbitrary number of backends.
void ConfigureProxy(size_t num_backends) {
backends_.resize(num_backends);
for (size_t i = 0; i < num_backends; ++i) {
// create a new backend.
backends_[i].port = grpc_pick_unused_port_or_die();
ServerBuilder backend_builder;
grpc::string backend_server_address =
"localhost:" + to_string(backends_[i].port);
backend_builder.AddListeningPort(backend_server_address,
InsecureServerCredentials());
backends_[i].service.reset(new TestServiceImpl);
// ensure that the backend itself has channelz disabled.
backend_builder.AddChannelArgument(GRPC_ARG_ENABLE_CHANNELZ, 0);
backend_builder.RegisterService(backends_[i].service.get());
backends_[i].server = backend_builder.BuildAndStart();
// set up a channel to the backend. We ensure that this channel has
// channelz enabled since these channels (proxy outbound to backends)
// are the ones that our test will actually be validating.
ChannelArguments args;
args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 1);
args.SetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE, 10);
std::shared_ptr<Channel> channel_to_backend = CreateCustomChannel(
backend_server_address, InsecureChannelCredentials(), args);
proxy_service_.AddChannelToBackend(channel_to_backend);
}
}
void ResetStubs() {
string target = "dns:localhost:" + to_string(proxy_port_);
ChannelArguments args;
// disable channelz. We only want to focus on proxy to backend outbound.
args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 0);
std::shared_ptr<Channel> channel =
CreateCustomChannel(target, InsecureChannelCredentials(), args);
channelz_stub_ = grpc::channelz::v1::Channelz::NewStub(channel);
echo_stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
void SendSuccessfulEcho(int channel_idx) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello channelz");
request.mutable_param()->set_backend_channel_idx(channel_idx);
ClientContext context;
Status s = echo_stub_->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.ok());
}
void SendFailedEcho(int channel_idx) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello channelz");
request.mutable_param()->set_backend_channel_idx(channel_idx);
auto* error = request.mutable_param()->mutable_expected_error();
error->set_code(13); // INTERNAL
error->set_error_message("error");
ClientContext context;
Status s = echo_stub_->Echo(&context, request, &response);
EXPECT_FALSE(s.ok());
}
static string to_string(const int number) {
std::stringstream strs;
strs << number;
return strs.str();
}
protected:
// package of data needed for each backend server.
struct BackendData {
std::unique_ptr<Server> server;
int port;
std::unique_ptr<TestServiceImpl> service;
};
std::unique_ptr<grpc::channelz::v1::Channelz::Stub> channelz_stub_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> echo_stub_;
// proxy server to ping with channelz requests.
std::unique_ptr<Server> proxy_server_;
int proxy_port_;
Proxy proxy_service_;
// backends. All implement the echo service.
std::vector<BackendData> backends_;
};
TEST_F(ChannelzServerTest, BasicTest) {
ResetStubs();
ConfigureProxy(1);
GetTopChannelsRequest request;
GetTopChannelsResponse response;
request.set_start_channel_id(0);
ClientContext context;
Status s = channelz_stub_->GetTopChannels(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel_size(), 1);
}
TEST_F(ChannelzServerTest, HighStartId) {
ResetStubs();
ConfigureProxy(1);
GetTopChannelsRequest request;
GetTopChannelsResponse response;
request.set_start_channel_id(10000);
ClientContext context;
Status s = channelz_stub_->GetTopChannels(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel_size(), 0);
}
TEST_F(ChannelzServerTest, SuccessfulRequestTest) {
ResetStubs();
ConfigureProxy(1);
SendSuccessfulEcho(0);
GetChannelRequest request;
GetChannelResponse response;
request.set_channel_id(1);
ClientContext context;
Status s = channelz_stub_->GetChannel(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel().data().calls_started(), 1);
EXPECT_EQ(response.channel().data().calls_succeeded(), 1);
EXPECT_EQ(response.channel().data().calls_failed(), 0);
}
TEST_F(ChannelzServerTest, FailedRequestTest) {
ResetStubs();
ConfigureProxy(1);
SendFailedEcho(0);
GetChannelRequest request;
GetChannelResponse response;
request.set_channel_id(1);
ClientContext context;
Status s = channelz_stub_->GetChannel(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel().data().calls_started(), 1);
EXPECT_EQ(response.channel().data().calls_succeeded(), 0);
EXPECT_EQ(response.channel().data().calls_failed(), 1);
}
TEST_F(ChannelzServerTest, ManyRequestsTest) {
ResetStubs();
ConfigureProxy(1);
// send some RPCs
const int kNumSuccess = 10;
const int kNumFailed = 11;
for (int i = 0; i < kNumSuccess; ++i) {
SendSuccessfulEcho(0);
}
for (int i = 0; i < kNumFailed; ++i) {
SendFailedEcho(0);
}
GetChannelRequest request;
GetChannelResponse response;
request.set_channel_id(1);
ClientContext context;
Status s = channelz_stub_->GetChannel(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel().data().calls_started(),
kNumSuccess + kNumFailed);
EXPECT_EQ(response.channel().data().calls_succeeded(), kNumSuccess);
EXPECT_EQ(response.channel().data().calls_failed(), kNumFailed);
}
TEST_F(ChannelzServerTest, ManyChannels) {
ResetStubs();
const int kNumChannels = 4;
ConfigureProxy(kNumChannels);
GetTopChannelsRequest request;
GetTopChannelsResponse response;
request.set_start_channel_id(0);
ClientContext context;
Status s = channelz_stub_->GetTopChannels(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel_size(), kNumChannels);
}
TEST_F(ChannelzServerTest, ManyRequestsManyChannels) {
ResetStubs();
const int kNumChannels = 4;
ConfigureProxy(kNumChannels);
const int kNumSuccess = 10;
const int kNumFailed = 11;
for (int i = 0; i < kNumSuccess; ++i) {
SendSuccessfulEcho(0);
SendSuccessfulEcho(2);
}
for (int i = 0; i < kNumFailed; ++i) {
SendFailedEcho(1);
SendFailedEcho(2);
}
// the first channel saw only successes
{
GetChannelRequest request;
GetChannelResponse response;
request.set_channel_id(1);
ClientContext context;
Status s = channelz_stub_->GetChannel(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel().data().calls_started(), kNumSuccess);
EXPECT_EQ(response.channel().data().calls_succeeded(), kNumSuccess);
EXPECT_EQ(response.channel().data().calls_failed(), 0);
}
// the second channel saw only failures
{
GetChannelRequest request;
GetChannelResponse response;
request.set_channel_id(2);
ClientContext context;
Status s = channelz_stub_->GetChannel(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel().data().calls_started(), kNumFailed);
EXPECT_EQ(response.channel().data().calls_succeeded(), 0);
EXPECT_EQ(response.channel().data().calls_failed(), kNumFailed);
}
// the third channel saw both
{
GetChannelRequest request;
GetChannelResponse response;
request.set_channel_id(3);
ClientContext context;
Status s = channelz_stub_->GetChannel(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel().data().calls_started(),
kNumSuccess + kNumFailed);
EXPECT_EQ(response.channel().data().calls_succeeded(), kNumSuccess);
EXPECT_EQ(response.channel().data().calls_failed(), kNumFailed);
}
// the fourth channel saw nothing
{
GetChannelRequest request;
GetChannelResponse response;
request.set_channel_id(4);
ClientContext context;
Status s = channelz_stub_->GetChannel(&context, request, &response);
EXPECT_TRUE(s.ok());
EXPECT_EQ(response.channel().data().calls_started(), 0);
EXPECT_EQ(response.channel().data().calls_succeeded(), 0);
EXPECT_EQ(response.channel().data().calls_failed(), 0);
}
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -46,6 +46,7 @@ DEFINE_string(
"all : all test cases;\n"
"cancel_after_begin : cancel stream after starting it;\n"
"cancel_after_first_response: cancel on first response;\n"
"channel_soak: sends 'soak_iterations' rpcs, rebuilds channel each time;\n"
"client_compressed_streaming : compressed request streaming with "
"client_compressed_unary : single compressed request;\n"
"client_streaming : request streaming with single response;\n"
@ -60,6 +61,7 @@ DEFINE_string(
"per_rpc_creds: raw oauth2 access token on a single rpc;\n"
"ping_pong : full-duplex streaming;\n"
"response streaming;\n"
"rpc_soak: 'sends soak_iterations' large_unary rpcs;\n"
"server_compressed_streaming : single request with compressed "
"server_compressed_unary : single compressed response;\n"
"server_streaming : single request with response streaming;\n"
@ -83,6 +85,10 @@ DEFINE_bool(do_not_abort_on_transient_failures, false,
"test is retried in case of transient failures (and currently the "
"interop tests are not retried even if this flag is set to true)");
DEFINE_int32(soak_iterations, 1000,
"number of iterations to use for the two soak tests; rpc_soak and "
"channel_soak");
using grpc::testing::CreateChannelForTestCase;
using grpc::testing::GetServiceAccountJsonKey;
using grpc::testing::UpdateActions;
@ -91,8 +97,9 @@ int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
int ret = 0;
grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
true,
grpc::testing::ChannelCreationFunc channel_creation_func =
std::bind(&CreateChannelForTestCase, FLAGS_test_case);
grpc::testing::InteropClient client(channel_creation_func, true,
FLAGS_do_not_abort_on_transient_failures);
std::unordered_map<grpc::string, std::function<bool()>> actions;
@ -151,6 +158,11 @@ int main(int argc, char** argv) {
std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
actions["cacheable_unary"] =
std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client);
actions["channel_soak"] =
std::bind(&grpc::testing::InteropClient::DoChannelSoakTest, &client,
FLAGS_soak_iterations);
actions["rpc_soak"] = std::bind(&grpc::testing::InteropClient::DoRpcSoakTest,
&client, FLAGS_soak_iterations);
UpdateActions(&actions);

@ -74,13 +74,15 @@ void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
}
} // namespace
InteropClient::ServiceStub::ServiceStub(const std::shared_ptr<Channel>& channel,
bool new_stub_every_call)
: channel_(channel), new_stub_every_call_(new_stub_every_call) {
InteropClient::ServiceStub::ServiceStub(
ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
: channel_creation_func_(channel_creation_func),
channel_(channel_creation_func_()),
new_stub_every_call_(new_stub_every_call) {
// If new_stub_every_call is false, then this is our chance to initialize
// stub_. (see Get())
if (!new_stub_every_call) {
stub_ = TestService::NewStub(channel);
stub_ = TestService::NewStub(channel_);
}
}
@ -100,27 +102,17 @@ InteropClient::ServiceStub::GetUnimplementedServiceStub() {
return unimplemented_service_stub_.get();
}
void InteropClient::ServiceStub::Reset(
const std::shared_ptr<Channel>& channel) {
channel_ = channel;
// Update stub_ as well. Note: If new_stub_every_call_ is true, we can reset
// the stub_ since the next call to Get() will create a new stub
if (new_stub_every_call_) {
stub_.reset();
} else {
stub_ = TestService::NewStub(channel);
void InteropClient::ServiceStub::ResetChannel() {
channel_ = channel_creation_func_();
if (!new_stub_every_call_) {
stub_ = TestService::NewStub(channel_);
}
}
void InteropClient::Reset(const std::shared_ptr<Channel>& channel) {
serviceStub_.Reset(std::move(channel));
}
InteropClient::InteropClient(const std::shared_ptr<Channel>& channel,
InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures)
: serviceStub_(std::move(channel), new_stub_every_test_case),
: serviceStub_(channel_creation_func, new_stub_every_test_case),
do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
bool InteropClient::AssertStatusOk(const Status& s,
@ -1028,6 +1020,38 @@ bool InteropClient::DoCustomMetadata() {
return true;
}
bool InteropClient::DoRpcSoakTest(int32_t soak_iterations) {
gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
GPR_ASSERT(soak_iterations > 0);
SimpleRequest request;
SimpleResponse response;
for (int i = 0; i < soak_iterations; ++i) {
if (!PerformLargeUnary(&request, &response)) {
gpr_log(GPR_ERROR, "rpc_soak test failed on iteration %d", i);
return false;
}
}
gpr_log(GPR_DEBUG, "rpc_soak test done.");
return true;
}
bool InteropClient::DoChannelSoakTest(int32_t soak_iterations) {
gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
soak_iterations);
GPR_ASSERT(soak_iterations > 0);
SimpleRequest request;
SimpleResponse response;
for (int i = 0; i < soak_iterations; ++i) {
serviceStub_.ResetChannel();
if (!PerformLargeUnary(&request, &response)) {
gpr_log(GPR_ERROR, "channel_soak test failed on iteration %d", i);
return false;
}
}
gpr_log(GPR_DEBUG, "channel_soak test done.");
return true;
}
bool InteropClient::DoUnimplementedService() {
gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");

@ -34,13 +34,15 @@ typedef std::function<void(const InteropClientContextInspector&,
const SimpleRequest*, const SimpleResponse*)>
CheckerFn;
typedef std::function<std::shared_ptr<Channel>(void)> ChannelCreationFunc;
class InteropClient {
public:
/// If new_stub_every_test_case is true, a new TestService::Stub object is
/// created for every test case
/// If do_not_abort_on_transient_failures is true, abort() is not called in
/// case of transient failures (like connection failures)
explicit InteropClient(const std::shared_ptr<Channel>& channel,
explicit InteropClient(ChannelCreationFunc channel_creation_func,
bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures);
~InteropClient() {}
@ -67,6 +69,14 @@ class InteropClient {
bool DoUnimplementedMethod();
bool DoUnimplementedService();
bool DoCacheableUnary();
// The following interop test are not yet part of the interop spec, and are
// not implemented cross-language. They are considered experimental for now,
// but at some point in the future, might be codified and implemented in all
// languages
bool DoChannelSoakTest(int32_t soak_iterations);
bool DoRpcSoakTest(int32_t soak_iterations);
// Auth tests.
// username is a string containing the user email
bool DoJwtTokenCreds(const grpc::string& username);
@ -83,15 +93,17 @@ class InteropClient {
public:
// If new_stub_every_call = true, pointer to a new instance of
// TestServce::Stub is returned by Get() everytime it is called
ServiceStub(const std::shared_ptr<Channel>& channel,
ServiceStub(ChannelCreationFunc channel_creation_func,
bool new_stub_every_call);
TestService::Stub* Get();
UnimplementedService::Stub* GetUnimplementedServiceStub();
void Reset(const std::shared_ptr<Channel>& channel);
// forces channel to be recreated.
void ResetChannel();
private:
ChannelCreationFunc channel_creation_func_;
std::unique_ptr<TestService::Stub> stub_;
std::unique_ptr<UnimplementedService::Stub> unimplemented_service_stub_;
std::shared_ptr<Channel> channel_;
@ -109,8 +121,8 @@ class InteropClient {
bool AssertStatusCode(const Status& s, StatusCode expected_code,
const grpc::string& optional_debug_string);
bool TransientFailureOrAbort();
ServiceStub serviceStub_;
ServiceStub serviceStub_;
/// If true, abort() is not called for transient failures
bool do_not_abort_on_transient_failures_;
};

@ -68,13 +68,13 @@ TestCaseType WeightedRandomTestSelector::GetNextTest() const {
StressTestInteropClient::StressTestInteropClient(
int test_id, const grpc::string& server_address,
const std::shared_ptr<Channel>& channel,
ChannelCreationFunc channel_creation_func,
const WeightedRandomTestSelector& test_selector, long test_duration_secs,
long sleep_duration_ms, bool do_not_abort_on_transient_failures)
: test_id_(test_id),
server_address_(server_address),
channel_(channel),
interop_client_(new InteropClient(channel, false,
channel_creation_func_(channel_creation_func),
interop_client_(new InteropClient(channel_creation_func_, false,
do_not_abort_on_transient_failures)),
test_selector_(test_selector),
test_duration_secs_(test_duration_secs),

@ -91,7 +91,7 @@ class WeightedRandomTestSelector {
class StressTestInteropClient {
public:
StressTestInteropClient(int test_id, const grpc::string& server_address,
const std::shared_ptr<Channel>& channel,
ChannelCreationFunc channel_creation_func,
const WeightedRandomTestSelector& test_selector,
long test_duration_secs, long sleep_duration_ms,
bool do_not_abort_on_transient_failures);
@ -105,7 +105,7 @@ class StressTestInteropClient {
int test_id_;
const grpc::string& server_address_;
std::shared_ptr<Channel> channel_;
ChannelCreationFunc channel_creation_func_;
std::unique_ptr<InteropClient> interop_client_;
const WeightedRandomTestSelector& test_selector_;
long test_duration_secs_;

@ -283,15 +283,20 @@ int main(int argc, char** argv) {
channel_idx++) {
gpr_log(GPR_INFO, "Starting test with %s channel_idx=%d..", it->c_str(),
channel_idx);
std::shared_ptr<grpc::Channel> channel = grpc::CreateTestChannel(
grpc::testing::ChannelCreationFunc channel_creation_func = std::bind(
static_cast<std::shared_ptr<grpc::Channel> (*)(
const grpc::string&, const grpc::string&,
grpc::testing::transport_security, bool)>(
grpc::CreateTestChannel),
*it, FLAGS_server_host_override, security_type, !FLAGS_use_test_ca);
// Create stub(s) for each channel
for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel;
stub_idx++) {
clients.emplace_back(new StressTestInteropClient(
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures));
++thread_idx, *it, channel_creation_func, test_selector,
FLAGS_test_duration_secs, FLAGS_sleep_duration_ms,
FLAGS_do_not_abort_on_transient_failures));
bool is_already_created = false;
// QpsGauge name

@ -172,9 +172,9 @@ class LbFeedbackTest : public LoadReporterTest {
// TODO(juanlishen): The error is big because we use sleep(). It should be
// much smaller when we use fake clock.
ASSERT_THAT(static_cast<double>(lb_feedback.calls_per_second()),
DoubleNear(expected_qps, expected_qps / 50));
DoubleNear(expected_qps, expected_qps * 0.05));
ASSERT_THAT(static_cast<double>(lb_feedback.errors_per_second()),
DoubleNear(expected_eps, expected_eps / 50));
DoubleNear(expected_eps, expected_eps * 0.05));
gpr_log(GPR_INFO,
"Verified LB feedback matches the samples of index [%lu, %lu).",
start, start + count);

@ -77,5 +77,10 @@ void ValidateGetTopChannelsResponseProtoJsonTranslation(char* json_c_str) {
json_c_str);
}
void ValidateGetChannelResponseProtoJsonTranslation(char* json_c_str) {
VaidateProtoJsonTranslation<grpc::channelz::v1::GetChannelResponse>(
json_c_str);
}
} // namespace testing
} // namespace grpc

@ -25,6 +25,7 @@ namespace testing {
void ValidateChannelTraceProtoJsonTranslation(char* json_c_str);
void ValidateChannelProtoJsonTranslation(char* json_c_str);
void ValidateGetTopChannelsResponseProtoJsonTranslation(char* json_c_str);
void ValidateGetChannelResponseProtoJsonTranslation(char* json_c_str);
} // namespace testing
} // namespace grpc

@ -12,9 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Docker file for building gRPC artifacts.
# Docker file for building gRPC artifacts for Android.
# Recent enough cmake (>=3.9) needed by Android SDK
FROM debian:sid
RUN apt-get update && apt-get install -y debian-keyring && apt-key update
@ -47,20 +46,26 @@ RUN apt-get update && apt-key update && apt-get install -y \
wget \
zip && apt-get clean
# Cmake for cross-compilation
RUN apt-get update && apt-get install -y cmake golang && apt-get clean
# golang needed to build BoringSSL with cmake
RUN apt-get update && apt-get install -y golang && apt-get clean
##################
# Android NDK
# Java required by Android SDK
RUN apt-get update && apt-get -y install openjdk-8-jdk && apt-get clean
# Download and install Android NDK
RUN wget -q https://dl.google.com/android/repository/android-ndk-r16b-linux-x86_64.zip -O android_ndk.zip \
&& unzip -q android_ndk.zip \
&& rm android_ndk.zip \
&& mv ./android-ndk-r16b /opt
ENV ANDROID_NDK_PATH /opt/android-ndk-r16b
# Install Android SDK
ENV ANDROID_SDK_VERSION 4333796
RUN mkdir -p /opt/android-sdk && cd /opt/android-sdk && \
wget -q https://dl.google.com/android/repository/sdk-tools-linux-${ANDROID_SDK_VERSION}.zip && \
unzip -q sdk-tools-linux-${ANDROID_SDK_VERSION}.zip && \
rm sdk-tools-linux-${ANDROID_SDK_VERSION}.zip
ENV ANDROID_SDK_PATH /opt/android-sdk
RUN apt-get update && apt-get install -y libpthread-stubs0-dev && apt-get clean
# Install Android NDK and cmake using sdkmanager
RUN mkdir -p ~/.android && touch ~/.android/repositories.cfg
RUN yes | ${ANDROID_SDK_PATH}/tools/bin/sdkmanager --licenses # accept all licenses
RUN ${ANDROID_SDK_PATH}/tools/bin/sdkmanager ndk-bundle 'cmake;3.6.4111459'
ENV ANDROID_NDK_PATH ${ANDROID_SDK_PATH}/ndk-bundle
ENV ANDROID_SDK_CMAKE ${ANDROID_SDK_PATH}/cmake/3.6.4111459/bin/cmake
RUN mkdir /var/local/jenkins

@ -0,0 +1,26 @@
# Copyright 2018 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/linux/grpc_publish_packages.sh"
timeout_mins: 120
action {
define_artifacts {
regex: "**/*sponge_log.xml"
regex: "github/grpc/reports/**"
regex: "github/grpc/artifacts/**"
}
}

@ -0,0 +1,144 @@
#!/bin/bash
# Copyright 2018 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
shopt -s nullglob
INPUT_ARTIFACTS=$KOKORO_GFILE_DIR/github/grpc/artifacts
INDEX_FILENAME=index.xml
BUILD_ID=${KOKORO_BUILD_ID:-$(uuidgen)}
BUILD_BRANCH_NAME=master
BUILD_GIT_COMMIT=${KOKORO_GIT_COMMIT:-unknown}
BUILD_TIMESTAMP=$(date -Iseconds)
BUILD_RELPATH=$(date "+%Y/%m")/$BUILD_ID/
GCS_ROOT=gs://packages.grpc.io/
GCS_ARCHIVE_PREFIX=archive/
GCS_ARCHIVE_ROOT=$GCS_ROOT$GCS_ARCHIVE_PREFIX
GCS_INDEX=$GCS_ROOT$INDEX_FILENAME
LOCAL_STAGING_TEMPDIR=$(mktemp -d)
LOCAL_BUILD_ROOT=$LOCAL_STAGING_TEMPDIR/$BUILD_RELPATH
LOCAL_BUILD_INDEX=$LOCAL_BUILD_ROOT$INDEX_FILENAME
mkdir -p "$LOCAL_BUILD_ROOT"
find "$INPUT_ARTIFACTS" -type f
UNZIPPED_CSHARP_PACKAGES=$(mktemp -d)
unzip "$INPUT_ARTIFACTS/csharp_nugets_windows_dotnetcli.zip" -d "$UNZIPPED_CSHARP_PACKAGES"
CSHARP_PACKAGES=(
"$UNZIPPED_CSHARP_PACKAGES"/*
)
PYTHON_PACKAGES=(
"$INPUT_ARTIFACTS"/grpcio-[0-9]*.tar.gz
"$INPUT_ARTIFACTS"/grpcio-[0-9]*.whl
"$INPUT_ARTIFACTS"/python_linux_extra_arm*/grpcio-[0-9]*.whl
"$INPUT_ARTIFACTS"/grpcio-tools-[0-9]*.tar.gz
"$INPUT_ARTIFACTS"/grpcio_tools-[0-9]*.whl
"$INPUT_ARTIFACTS"/python_linux_extra_arm*/grpcio_tools-[0-9]*.whl
"$INPUT_ARTIFACTS"/grpcio-health-checking-[0-9]*.tar.gz
"$INPUT_ARTIFACTS"/grpcio-reflection-[0-9]*.tar.gz
"$INPUT_ARTIFACTS"/grpcio-testing-[0-9]*.tar.gz
)
PHP_PACKAGES=(
"$INPUT_ARTIFACTS"/grpc-[0-9]*.tgz
)
RUBY_PACKAGES=(
"$INPUT_ARTIFACTS"/grpc-[0-9]*.gem
"$INPUT_ARTIFACTS"/grpc-tools-[0-9]*.gem
)
function add_to_manifest() {
local artifact_type=$1
local artifact_file=$2
local artifact_name
artifact_name=$(basename "$artifact_file")
local artifact_sha256
artifact_sha256=$(openssl sha256 -r "$artifact_file" | cut -d " " -f 1)
local artifact_target=$LOCAL_BUILD_ROOT/$artifact_type
mkdir -p "$artifact_target"
cp "$artifact_file" "$artifact_target"
cat <<EOF
<artifact name='$artifact_name'
type='$artifact_type'
path='$artifact_type/$artifact_name'
sha256='$artifact_sha256' />
EOF
}
{
cat <<EOF
<?xml version="1.0"?>
<?xml-stylesheet href="/web-assets/build-201807.xsl" type="text/xsl"?>
<build id='$BUILD_ID' timestamp='$BUILD_TIMESTAMP'>
<metadata>
<project>gRPC</project>
<repository>https://github.com/grpc/grpc</repository>
<branch>$BUILD_BRANCH_NAME</branch>
<commit>$BUILD_GIT_COMMIT</commit>
</metadata>
<artifacts>
EOF
for pkg in "${CSHARP_PACKAGES[@]}"; do add_to_manifest csharp "$pkg"; done
for pkg in "${PHP_PACKAGES[@]}"; do add_to_manifest php "$pkg"; done
for pkg in "${PYTHON_PACKAGES[@]}"; do add_to_manifest python "$pkg"; done
for pkg in "${RUBY_PACKAGES[@]}"; do add_to_manifest ruby "$pkg"; done
cat <<EOF
</artifacts>
</build>
EOF
}> "$LOCAL_BUILD_INDEX"
LOCAL_BUILD_INDEX_SHA256=$(openssl sha256 -r "$LOCAL_BUILD_INDEX" | cut -d " " -f 1)
OLD_INDEX=$(mktemp)
NEW_INDEX=$(mktemp)
# Download the current /index.xml into $OLD_INDEX
gsutil cp "$GCS_INDEX" "$OLD_INDEX"
{
# we want to add an entry as the first child under <builds> tag
# we can get by without a real XML parser by rewriting the header,
# injecting our new tag, and then dumping the rest of the file as is.
cat <<EOF
<?xml version="1.0"?>
<?xml-stylesheet href="/web-assets/home.xsl" type="text/xsl"?>
<packages>
<builds>
<build id='$BUILD_ID'
timestamp='$BUILD_TIMESTAMP'
branch='$BUILD_BRANCH_NAME'
commit='$BUILD_GIT_COMMIT'
path='$GCS_ARCHIVE_PREFIX$BUILD_RELPATH$INDEX_FILENAME'
sha256='$LOCAL_BUILD_INDEX_SHA256' />
EOF
tail --lines=+5 "$OLD_INDEX"
}> "$NEW_INDEX"
# Upload the current build artifacts
gsutil -m cp -r "$LOCAL_STAGING_TEMPDIR/${BUILD_RELPATH%%/*}" "$GCS_ARCHIVE_ROOT"
# Upload the new /index.xml
gsutil -h "Content-Type:application/xml" cp "$NEW_INDEX" "$GCS_INDEX"

@ -86,7 +86,7 @@ def build(where, frameworks):
'src/objective-c/examples/Sample/Build-%s' % where)
text = ''
text = 'Objective-C binary sizes\n'
for frameworks in [False, True]:
build('new', frameworks)
new_size = get_size('new', frameworks)

@ -212,11 +212,15 @@ class RubyArtifact:
class CSharpExtArtifact:
"""Builds C# native extension library"""
def __init__(self, platform, arch):
def __init__(self, platform, arch, arch_abi=None):
self.name = 'csharp_ext_%s_%s' % (platform, arch)
self.platform = platform
self.arch = arch
self.arch_abi = arch_abi
self.labels = ['artifact', 'csharp', platform, arch]
if arch_abi:
self.name += '_%s' % arch_abi
self.labels.append(arch_abi)
def pre_build_jobspecs(self):
return []
@ -227,7 +231,9 @@ class CSharpExtArtifact:
self.name,
'tools/dockerfile/grpc_artifact_android_ndk',
'tools/run_tests/artifacts/build_artifact_csharp_android.sh',
environ={})
environ={
'ANDROID_ABI': self.arch_abi
})
elif self.platform == 'windows':
cmake_arch_option = 'Win32' if self.arch == 'x86' else self.arch
return create_jobspec(
@ -348,7 +354,8 @@ def targets():
for Cls in (CSharpExtArtifact, ProtocArtifact)
for platform in ('linux', 'macos', 'windows') for arch in ('x86', 'x64')
] + [
CSharpExtArtifact('linux', 'android'),
CSharpExtArtifact('linux', 'android', arch_abi='arm64-v8a'),
CSharpExtArtifact('linux', 'android', arch_abi='armeabi-v7a'),
PythonArtifact('linux', 'x86', 'cp27-cp27m'),
PythonArtifact('linux', 'x86', 'cp27-cp27mu'),
PythonArtifact('linux', 'x86', 'cp34-cp34m'),

@ -17,6 +17,7 @@ set -ex
cd "$(dirname "$0")/../../.."
# ANDROID_ABI is set by the job definition in artifact_targets.py
src/csharp/experimental/build_native_ext_for_android.sh
mkdir -p "${ARTIFACTS_OUT}"

@ -3124,10 +3124,10 @@
"gpr_test_util",
"grpc",
"grpc++",
"grpc++_channelz_proto",
"grpc++_test",
"grpc++_test_util",
"grpc_test_util"
"grpc_test_util",
"grpcpp_channelz_proto"
],
"headers": [],
"is_filegroup": false,
@ -3165,10 +3165,31 @@
"gpr_test_util",
"grpc",
"grpc++",
"grpc++_channelz_proto",
"grpc++_test_util",
"grpc_test_util",
"grpcpp_channelz",
"grpcpp_channelz_proto"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "channelz_service_test",
"src": [
"test/cpp/end2end/channelz_service_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc++",
"grpc++_test",
"grpc++_test_util",
"grpc_test_util"
"grpc_test_util",
"grpcpp_channelz_proto"
],
"headers": [],
"is_filegroup": false,
@ -7528,6 +7549,28 @@
"third_party": false,
"type": "lib"
},
{
"deps": [
"grpc",
"grpc++",
"grpcpp_channelz_proto"
],
"headers": [
"include/grpcpp/ext/channelz_service_plugin.h",
"src/cpp/server/channelz/channelz_service.h"
],
"is_filegroup": false,
"language": "c++",
"name": "grpcpp_channelz",
"src": [
"include/grpcpp/ext/channelz_service_plugin.h",
"src/cpp/server/channelz/channelz_service.cc",
"src/cpp/server/channelz/channelz_service.h",
"src/cpp/server/channelz/channelz_service_plugin.cc"
],
"third_party": false,
"type": "lib"
},
{
"deps": [
"grpc",
@ -10888,20 +10931,6 @@
"third_party": false,
"type": "filegroup"
},
{
"deps": [],
"headers": [
"src/proto/grpc/channelz/channelz.grpc.pb.h",
"src/proto/grpc/channelz/channelz.pb.h",
"src/proto/grpc/channelz/channelz_mock.grpc.pb.h"
],
"is_filegroup": true,
"language": "c++",
"name": "grpc++_channelz_proto",
"src": [],
"third_party": false,
"type": "filegroup"
},
{
"deps": [
"grpc_codegen"
@ -11377,5 +11406,19 @@
],
"third_party": false,
"type": "filegroup"
},
{
"deps": [],
"headers": [
"src/proto/grpc/channelz/channelz.grpc.pb.h",
"src/proto/grpc/channelz/channelz.pb.h",
"src/proto/grpc/channelz/channelz_mock.grpc.pb.h"
],
"is_filegroup": true,
"language": "c++",
"name": "grpcpp_channelz_proto",
"src": [],
"third_party": false,
"type": "filegroup"
}
]

@ -3755,6 +3755,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "channelz_service_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save