Merge remote-tracking branch 'upstream/master' into bump_core_version_202404081658

pull/36293/head
yijiem 8 months ago
commit b306ec1e69
  1. 3
      .clang-format
  2. 5
      BUILD
  3. 183
      CMakeLists.txt
  4. 1
      Makefile
  5. 1
      Package.swift
  6. 65
      build_autogenerated.yaml
  7. 42
      examples/cpp/health/BUILD
  8. 98
      examples/cpp/health/CMakeLists.txt
  9. 6
      examples/cpp/health/README.md
  10. 140
      examples/cpp/health/health_client.cc
  11. 100
      examples/cpp/health/health_server.cc
  12. 1
      gRPC-C++.podspec
  13. 1
      gRPC-Core.podspec
  14. 1
      grpc.gemspec
  15. 3
      include/grpc/.clang-format
  16. 3
      include/grpc/byte_buffer.h
  17. 3
      include/grpc/census.h
  18. 3
      include/grpc/compression.h
  19. 4
      include/grpc/event_engine/endpoint_config.h
  20. 3
      include/grpc/event_engine/event_engine.h
  21. 4
      include/grpc/event_engine/extensible.h
  22. 3
      include/grpc/event_engine/internal/memory_allocator_impl.h
  23. 3
      include/grpc/event_engine/memory_allocator.h
  24. 4
      include/grpc/event_engine/memory_request.h
  25. 3
      include/grpc/event_engine/slice.h
  26. 3
      include/grpc/event_engine/slice_buffer.h
  27. 3
      include/grpc/grpc.h
  28. 3
      include/grpc/grpc_audit_logging.h
  29. 3
      include/grpc/grpc_crl_provider.h
  30. 3
      include/grpc/grpc_cronet.h
  31. 3
      include/grpc/grpc_posix.h
  32. 3
      include/grpc/grpc_security.h
  33. 3
      include/grpc/impl/call.h
  34. 3
      include/grpc/impl/grpc_types.h
  35. 4
      include/grpc/impl/slice_type.h
  36. 1
      include/grpc/module.modulemap
  37. 63
      include/grpc/passive_listener.h
  38. 3
      include/grpc/slice.h
  39. 3
      include/grpc/slice_buffer.h
  40. 4
      include/grpc/support/alloc.h
  41. 4
      include/grpc/support/json.h
  42. 4
      include/grpc/support/log.h
  43. 4
      include/grpc/support/metrics.h
  44. 1
      include/grpc/support/string_util.h
  45. 1
      include/grpc/support/sync.h
  46. 1
      include/grpc/support/sync_abseil.h
  47. 1
      include/grpc/support/sync_custom.h
  48. 3
      include/grpc/support/sync_generic.h
  49. 3
      include/grpc/support/sync_posix.h
  50. 4
      include/grpc/support/time.h
  51. 3
      include/grpcpp/impl/.clang-format
  52. 3
      include/grpcpp/impl/codegen/.clang-format
  53. 3
      include/grpcpp/impl/status.h
  54. 3
      include/grpcpp/security/.clang-format
  55. 1
      include/grpcpp/security/server_credentials.h
  56. 29
      include/grpcpp/server_builder.h
  57. 1
      package.xml
  58. 3
      src/compiler/.clang-format
  59. 4
      src/core/BUILD
  60. 7
      src/core/ext/transport/binder/server/binder_server.cc
  61. 7
      src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc
  62. 4
      src/core/ext/transport/chaotic_good/server/chaotic_good_server.h
  63. 241
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  64. 34
      src/core/ext/transport/chttp2/server/chttp2_server.h
  65. 161
      src/core/ext/xds/xds_client.cc
  66. 12
      src/core/ext/xds/xds_client.h
  67. 4
      src/core/ext/xds/xds_client_grpc.cc
  68. 4
      src/core/lib/channel/tcp_tracer.h
  69. 7
      src/core/lib/event_engine/extensions/supports_fd.h
  70. 18
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  71. 2
      src/core/lib/event_engine/posix_engine/posix_engine.h
  72. 8
      src/core/lib/promise/status_flag.h
  73. 14
      src/core/lib/surface/server.h
  74. 118
      src/core/lib/transport/call_filters.cc
  75. 299
      src/core/lib/transport/call_filters.h
  76. 60
      src/cpp/server/server_builder.cc
  77. 4
      test/core/address_utils/sockaddr_utils_test.cc
  78. 3
      test/core/client_channel/client_channel_test.cc
  79. 3
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  80. 4
      test/core/compiler_bugs/miscompile_with_no_unique_address_test.cc
  81. 4
      test/core/end2end/BUILD
  82. 56
      test/core/end2end/end2end_test_fuzzer.cc
  83. 19
      test/core/end2end/end2end_test_fuzzer.h
  84. 20
      test/core/end2end/end2end_test_fuzzer_main.cc
  85. 6
      test/core/end2end/grpc_core_end2end_test.bzl
  86. 3
      test/core/end2end/invalid_call_argument_test.cc
  87. 4
      test/core/event_engine/common_closures_test.cc
  88. 3
      test/core/event_engine/default_engine_methods_test.cc
  89. 4
      test/core/event_engine/endpoint_config_test.cc
  90. 65
      test/core/event_engine/event_engine_test_utils.h
  91. 3
      test/core/event_engine/factory_test.cc
  92. 4
      test/core/event_engine/forkable_test.cc
  93. 4
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine_unittest.cc
  94. 3
      test/core/event_engine/handle_tests.cc
  95. 3
      test/core/event_engine/query_extensions_test.cc
  96. 3
      test/core/event_engine/slice_buffer_test.cc
  97. 3
      test/core/event_engine/smoke_test.cc
  98. 4
      test/core/event_engine/tcp_socket_utils_test.cc
  99. 4
      test/core/event_engine/test_init.cc
  100. 4
      test/core/event_engine/test_init.h
  101. Some files were not shown because too many files have changed in this diff Show More

@ -5,9 +5,6 @@ DerivePointerAlignment: false
PointerAlignment: Left
IncludeBlocks: Regroup
IncludeCategories:
# port_platform.h is before almost everything
- Regex: '^<grpc/(support|impl/codegen)/port_platform.h>'
Priority: -100
# ruby.h is even more first if it's included
- Regex: '^<ruby/ruby.h>'
Priority: -200

@ -291,7 +291,6 @@ GRPC_PUBLIC_HDRS = [
"include/grpc/grpc_posix.h",
"include/grpc/grpc_security.h",
"include/grpc/grpc_security_constants.h",
"include/grpc/passive_listener.h",
"include/grpc/slice.h",
"include/grpc/slice_buffer.h",
"include/grpc/status.h",
@ -453,7 +452,6 @@ GRPCXX_PUBLIC_HDRS = [
"include/grpcpp/impl/service_type.h",
"include/grpcpp/impl/status.h",
"include/grpcpp/impl/sync.h",
"include/grpcpp/passive_listener.h",
"include/grpcpp/resource_quota.h",
"include/grpcpp/security/audit_logging.h",
"include/grpcpp/security/tls_crl_provider.h",
@ -2460,7 +2458,6 @@ grpc_cc_library(
"//src/core:grpc_backend_metric_provider",
"//src/core:grpc_crl_provider",
"//src/core:grpc_service_config",
"//src/core:grpc_transport_chttp2_server",
"//src/core:grpc_transport_inproc",
"//src/core:json",
"//src/core:json_reader",
@ -2517,7 +2514,6 @@ grpc_cc_library(
"grpc_security_base",
"grpc_service_config_impl",
"grpc_trace",
"grpc_transport_chttp2",
"grpc_unsecure",
"grpcpp_backend_metric_recorder",
"grpcpp_call_metric_recorder",
@ -2539,7 +2535,6 @@ grpc_cc_library(
"//src/core:grpc_backend_metric_provider",
"//src/core:grpc_insecure_credentials",
"//src/core:grpc_service_config",
"//src/core:grpc_transport_chttp2_server",
"//src/core:grpc_transport_inproc",
"//src/core:ref_counted",
"//src/core:resource_quota",

183
CMakeLists.txt generated

@ -1557,6 +1557,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx xds_end2end_test)
endif()
add_dependencies(buildtests_cxx xds_endpoint_resource_type_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_fallback_end2end_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_fault_injection_end2end_test)
endif()
@ -2738,7 +2741,6 @@ foreach(_hdr
include/grpc/impl/propagation_bits.h
include/grpc/impl/slice_type.h
include/grpc/load_reporting.h
include/grpc/passive_listener.h
include/grpc/slice.h
include/grpc/slice_buffer.h
include/grpc/status.h
@ -3441,7 +3443,6 @@ foreach(_hdr
include/grpc/impl/propagation_bits.h
include/grpc/impl/slice_type.h
include/grpc/load_reporting.h
include/grpc/passive_listener.h
include/grpc/slice.h
include/grpc/slice_buffer.h
include/grpc/status.h
@ -4310,7 +4311,6 @@ foreach(_hdr
include/grpcpp/impl/service_type.h
include/grpcpp/impl/status.h
include/grpcpp/impl/sync.h
include/grpcpp/passive_listener.h
include/grpcpp/resource_quota.h
include/grpcpp/security/audit_logging.h
include/grpcpp/security/auth_context.h
@ -5051,7 +5051,6 @@ foreach(_hdr
include/grpcpp/impl/service_type.h
include/grpcpp/impl/status.h
include/grpcpp/impl/sync.h
include/grpcpp/passive_listener.h
include/grpcpp/resource_quota.h
include/grpcpp/security/audit_logging.h
include/grpcpp/security/auth_context.h
@ -5502,7 +5501,6 @@ foreach(_hdr
include/grpc/impl/propagation_bits.h
include/grpc/impl/slice_type.h
include/grpc/load_reporting.h
include/grpc/passive_listener.h
include/grpc/slice.h
include/grpc/slice_buffer.h
include/grpc/status.h
@ -26547,7 +26545,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h
test/core/event_engine/event_engine_test_utils.cc
test/core/util/cmdline.cc
test/core/util/fuzzer_util.cc
test/core/util/grpc_profiler.cc
@ -34250,6 +34247,180 @@ target_link_libraries(xds_endpoint_resource_type_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(xds_fallback_end2end_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/address.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/ads.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_source.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/expr.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/extension.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/health_check.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/health_check.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/health_check.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/health_check.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/listener.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/load_report.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/metadata.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/outlier_detection.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/outlier_detection.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/outlier_detection.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/outlier_detection.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/path.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/protocol.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/range.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/rbac.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/regex.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/route.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/router.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h
test/cpp/end2end/test_service_impl.cc
test/cpp/end2end/xds/xds_end2end_test_lib.cc
test/cpp/end2end/xds/xds_fallback_end2end_test.cc
test/cpp/end2end/xds/xds_server.cc
test/cpp/end2end/xds/xds_utils.cc
test/cpp/util/tls_test_utils.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(xds_fallback_end2end_test
PRIVATE
"GPR_DLL_IMPORTS"
"GRPC_DLL_IMPORTS"
"GRPCXX_DLL_IMPORTS"
)
endif()
endif()
target_compile_features(xds_fallback_end2end_test PUBLIC cxx_std_14)
target_include_directories(xds_fallback_end2end_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(xds_fallback_end2end_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc++_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

1
Makefile generated

@ -1773,7 +1773,6 @@ PUBLIC_HEADERS_C += \
include/grpc/impl/propagation_bits.h \
include/grpc/impl/slice_type.h \
include/grpc/load_reporting.h \
include/grpc/passive_listener.h \
include/grpc/slice.h \
include/grpc/slice_buffer.h \
include/grpc/status.h \

1
Package.swift generated

@ -92,7 +92,6 @@ let package = Package(
"include/grpc/impl/propagation_bits.h",
"include/grpc/impl/slice_type.h",
"include/grpc/load_reporting.h",
"include/grpc/passive_listener.h",
"include/grpc/slice.h",
"include/grpc/slice_buffer.h",
"include/grpc/status.h",

@ -196,7 +196,6 @@ libs:
- include/grpc/impl/propagation_bits.h
- include/grpc/impl/slice_type.h
- include/grpc/load_reporting.h
- include/grpc/passive_listener.h
- include/grpc/slice.h
- include/grpc/slice_buffer.h
- include/grpc/status.h
@ -2184,7 +2183,6 @@ libs:
- include/grpc/impl/propagation_bits.h
- include/grpc/impl/slice_type.h
- include/grpc/load_reporting.h
- include/grpc/passive_listener.h
- include/grpc/slice.h
- include/grpc/slice_buffer.h
- include/grpc/status.h
@ -3793,7 +3791,6 @@ libs:
- include/grpcpp/impl/service_type.h
- include/grpcpp/impl/status.h
- include/grpcpp/impl/sync.h
- include/grpcpp/passive_listener.h
- include/grpcpp/resource_quota.h
- include/grpcpp/security/audit_logging.h
- include/grpcpp/security/auth_context.h
@ -4222,7 +4219,6 @@ libs:
- include/grpcpp/impl/service_type.h
- include/grpcpp/impl/status.h
- include/grpcpp/impl/sync.h
- include/grpcpp/passive_listener.h
- include/grpcpp/resource_quota.h
- include/grpcpp/security/audit_logging.h
- include/grpcpp/security/auth_context.h
@ -4370,7 +4366,6 @@ libs:
- include/grpc/impl/propagation_bits.h
- include/grpc/impl/slice_type.h
- include/grpc/load_reporting.h
- include/grpc/passive_listener.h
- include/grpc/slice.h
- include/grpc/slice_buffer.h
- include/grpc/status.h
@ -6416,6 +6411,7 @@ targets:
- src/core/lib/promise/detail/seq_state.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/poll.h
@ -17295,7 +17291,6 @@ targets:
build: test
language: c++
headers:
- test/core/event_engine/event_engine_test_utils.h
- test/core/util/cmdline.h
- test/core/util/evaluate_args_test_util.h
- test/core/util/fuzzer_util.h
@ -17312,7 +17307,6 @@ targets:
- src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto
- src/proto/grpc/testing/xds/v3/orca_load_report.proto
- test/core/event_engine/event_engine_test_utils.cc
- test/core/util/cmdline.cc
- test/core/util/fuzzer_util.cc
- test/core/util/grpc_profiler.cc
@ -20867,6 +20861,63 @@ targets:
- protobuf
- grpc_test_util
uses_polling: false
- name: xds_fallback_end2end_test
gtest: true
build: test
language: c++
headers:
- test/core/util/scoped_env_var.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
- test/cpp/end2end/xds/xds_end2end_test_lib.h
- test/cpp/end2end/xds/xds_server.h
- test/cpp/end2end/xds/xds_utils.h
- test/cpp/util/tls_test_utils.h
src:
- src/proto/grpc/testing/duplicate/echo_duplicate.proto
- src/proto/grpc/testing/echo.proto
- src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto
- src/proto/grpc/testing/xds/v3/address.proto
- src/proto/grpc/testing/xds/v3/ads.proto
- src/proto/grpc/testing/xds/v3/base.proto
- src/proto/grpc/testing/xds/v3/cluster.proto
- src/proto/grpc/testing/xds/v3/config_source.proto
- src/proto/grpc/testing/xds/v3/discovery.proto
- src/proto/grpc/testing/xds/v3/endpoint.proto
- src/proto/grpc/testing/xds/v3/expr.proto
- src/proto/grpc/testing/xds/v3/extension.proto
- src/proto/grpc/testing/xds/v3/health_check.proto
- src/proto/grpc/testing/xds/v3/http_connection_manager.proto
- src/proto/grpc/testing/xds/v3/http_filter_rbac.proto
- src/proto/grpc/testing/xds/v3/listener.proto
- src/proto/grpc/testing/xds/v3/load_report.proto
- src/proto/grpc/testing/xds/v3/lrs.proto
- src/proto/grpc/testing/xds/v3/metadata.proto
- src/proto/grpc/testing/xds/v3/orca_load_report.proto
- src/proto/grpc/testing/xds/v3/outlier_detection.proto
- src/proto/grpc/testing/xds/v3/path.proto
- src/proto/grpc/testing/xds/v3/percent.proto
- src/proto/grpc/testing/xds/v3/protocol.proto
- src/proto/grpc/testing/xds/v3/range.proto
- src/proto/grpc/testing/xds/v3/rbac.proto
- src/proto/grpc/testing/xds/v3/regex.proto
- src/proto/grpc/testing/xds/v3/route.proto
- src/proto/grpc/testing/xds/v3/router.proto
- src/proto/grpc/testing/xds/v3/string.proto
- test/cpp/end2end/test_service_impl.cc
- test/cpp/end2end/xds/xds_end2end_test_lib.cc
- test/cpp/end2end/xds/xds_fallback_end2end_test.cc
- test/cpp/end2end/xds/xds_server.cc
- test/cpp/end2end/xds/xds_utils.cc
- test/cpp/util/tls_test_utils.cc
deps:
- gtest
- grpc++_test_util
platforms:
- linux
- posix
- mac
- name: xds_fault_injection_end2end_test
gtest: true
build: test

@ -0,0 +1,42 @@
# Copyright 2024 the gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
licenses(["notice"])
cc_binary(
name = "health_client",
srcs = ["health_client.cc"],
defines = ["BAZEL_BUILD"],
deps = [
"//:grpc++",
"//examples/protos:helloworld_cc_grpc",
"//src/proto/grpc/health/v1:health_proto",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
],
)
cc_binary(
name = "health_server",
srcs = ["health_server.cc"],
defines = ["BAZEL_BUILD"],
deps = [
"//:grpc++",
"//:grpc++_reflection",
"//examples/protos:helloworld_cc_grpc",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/strings:str_format",
],
)

@ -0,0 +1,98 @@
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cmake build file for C++ helloworld example.
# Assumes protobuf and gRPC have been installed using cmake.
# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build
# that automatically builds all the dependencies before building helloworld.
cmake_minimum_required(VERSION 3.8)
project(HelloWorld C CXX)
include(../cmake/common.cmake)
# Proto file
get_filename_component(hw_proto "../../protos/helloworld.proto" ABSOLUTE)
get_filename_component(hw_proto_path "${hw_proto}" PATH)
get_filename_component(health_proto "../../../src/proto/grpc/health/v1/health.proto" ABSOLUTE)
get_filename_component(health_proto_path "${health_proto}" PATH)
# Generated sources
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc")
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h")
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc")
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h")
add_custom_command(
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${hw_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${hw_proto}"
DEPENDS "${hw_proto}")
# Health protos
set(health_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/health.pb.cc")
set(health_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/health.pb.h")
set(health_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/health.grpc.pb.cc")
set(health_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/health.grpc.pb.h")
add_custom_command(
OUTPUT "${health_proto_srcs}" "${health_proto_hdrs}" "${health_grpc_srcs}" "${health_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${health_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${health_proto}"
DEPENDS "${health_proto}")
# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# hw_grpc_proto
add_library(hw_grpc_proto
${hw_grpc_srcs}
${hw_grpc_hdrs}
${hw_proto_srcs}
${hw_proto_hdrs})
target_link_libraries(hw_grpc_proto
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
#health_grpc_proto
add_library(health_grpc_proto
${health_grpc_srcs}
${health_grpc_hdrs}
${health_proto_srcs}
${health_proto_hdrs})
target_link_libraries(health_grpc_proto
${_PROTOBUF_LIBPROTOBUF})
# Targets greeter_[async_](client|server)
foreach(_target
health_client health_server)
add_executable(${_target} "${_target}.cc")
target_link_libraries(${_target}
hw_grpc_proto
health_grpc_proto
absl::flags
absl::flags_parse
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
endforeach()

@ -0,0 +1,6 @@
# gRPC C++ Health Check Example
You can find a complete set of instructions for building gRPC and running the
example in the [C++ Quick Start][].
[C++ Quick Start]: https://grpc.io/docs/languages/cpp/quickstart

@ -0,0 +1,140 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include <grpcpp/grpcpp.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#include "src/proto/grpc/health/v1/health.grpc.pb.h"
#else
#include "health.grpc.pb.h"
#include "helloworld.grpc.pb.h"
#endif
ABSL_FLAG(std::string, target, "localhost:50051", "Server address");
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using grpc::health::v1::Health;
using grpc::health::v1::HealthCheckRequest;
using grpc::health::v1::HealthCheckResponse;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
class GreeterClient {
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)),
health_stub_(Health::NewStub(channel)) {}
// Assembles the client's payload, sends it and presents the response back
// from the server.
std::string SayHello(const std::string& user) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// The actual RPC.
std::mutex mu;
std::condition_variable cv;
bool done = false;
Status status;
stub_->async()->SayHello(&context, &request, &reply,
[&mu, &cv, &done, &status](Status s) {
status = std::move(s);
std::lock_guard<std::mutex> lock(mu);
done = true;
cv.notify_one();
});
std::unique_lock<std::mutex> lock(mu);
while (!done) {
cv.wait(lock);
}
// Act upon its status.
if (status.ok()) {
return reply.message();
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return "RPC failed";
}
}
void CheckHealth(const std::string& message) {
ClientContext context;
HealthCheckResponse response;
Status status = health_stub_->Check(
&context, HealthCheckRequest::default_instance(), &response);
if (!status.ok()) {
std::cerr << "Failed to check service health: " << status.error_code()
<< ": " << status.error_message() << "\n";
return;
}
std::cout << message << ": " << response.DebugString();
}
private:
std::unique_ptr<Greeter::Stub> stub_;
std::unique_ptr<Health::Stub> health_stub_;
};
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint specified by
// the argument "--target=" which is the only expected argument.
std::string target_str = absl::GetFlag(FLAGS_target);
// We indicate that the channel isn't authenticated (use of
// InsecureChannelCredentials()).
grpc::ChannelArguments args;
args.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"\"}}");
GreeterClient greeter(grpc::CreateCustomChannel(
target_str, grpc::InsecureChannelCredentials(), args));
std::string user = "world";
greeter.CheckHealth("Before call");
std::string reply = greeter.SayHello(user);
std::cout << "Greeter received: " << reply << std::endl;
greeter.CheckHealth("After call");
reply = greeter.SayHello(user);
std::cout << "Greeter received: " << reply << std::endl;
greeter.CheckHealth("After second call");
return 0;
}

@ -0,0 +1,100 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <iostream>
#include <memory>
#include <string>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_format.h"
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");
using grpc::CallbackServerContext;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerUnaryReactor;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Greeter::CallbackService {
public:
void set_health_check_service(
grpc::HealthCheckServiceInterface* health_check_service) {
health_check_service_ = health_check_service;
}
private:
ServerUnaryReactor* SayHello(CallbackServerContext* context,
const HelloRequest* request,
HelloReply* reply) override {
std::string prefix("Hello ");
reply->set_message(prefix + request->name());
ServerUnaryReactor* reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
// Goes down, then up
is_serving_ = !is_serving_;
health_check_service_->SetServingStatus(is_serving_);
return reactor;
}
grpc::HealthCheckServiceInterface* health_check_service_ = nullptr;
bool is_serving_ = true;
};
void RunServer(uint16_t port) {
std::string server_address = absl::StrFormat("0.0.0.0:%d", port);
GreeterServiceImpl service;
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
service.set_health_check_service(server->GetHealthCheckService());
std::cout << "Server listening on " << server_address << std::endl;
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server->Wait();
}
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
RunServer(absl::GetFlag(FLAGS_port));
return 0;
}

1
gRPC-C++.podspec generated

@ -175,7 +175,6 @@ Pod::Spec.new do |s|
'include/grpcpp/impl/service_type.h',
'include/grpcpp/impl/status.h',
'include/grpcpp/impl/sync.h',
'include/grpcpp/passive_listener.h',
'include/grpcpp/resource_quota.h',
'include/grpcpp/security/audit_logging.h',
'include/grpcpp/security/auth_context.h',

1
gRPC-Core.podspec generated

@ -166,7 +166,6 @@ Pod::Spec.new do |s|
'include/grpc/impl/propagation_bits.h',
'include/grpc/impl/slice_type.h',
'include/grpc/load_reporting.h',
'include/grpc/passive_listener.h',
'include/grpc/slice.h',
'include/grpc/slice_buffer.h',
'include/grpc/status.h',

1
grpc.gemspec generated

@ -98,7 +98,6 @@ Gem::Specification.new do |s|
s.files += %w( include/grpc/impl/propagation_bits.h )
s.files += %w( include/grpc/impl/slice_type.h )
s.files += %w( include/grpc/load_reporting.h )
s.files += %w( include/grpc/passive_listener.h )
s.files += %w( include/grpc/slice.h )
s.files += %w( include/grpc/slice_buffer.h )
s.files += %w( include/grpc/status.h )

@ -5,9 +5,6 @@ DerivePointerAlignment: false
PointerAlignment: Left
IncludeBlocks: Regroup
IncludeCategories:
# port_platform.h is before almost everything
- Regex: '^<grpc/(support|impl/codegen)/port_platform.h>'
Priority: -100
# ruby.h is even more first if it's included
- Regex: '^<ruby/ruby.h>'
Priority: -200

@ -19,10 +19,9 @@
#ifndef GRPC_BYTE_BUFFER_H
#define GRPC_BYTE_BUFFER_H
#include <grpc/support/port_platform.h>
#include <grpc/impl/grpc_types.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {

@ -19,9 +19,8 @@
#ifndef GRPC_CENSUS_H
#define GRPC_CENSUS_H
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {

@ -19,12 +19,11 @@
#ifndef GRPC_COMPRESSION_H
#define GRPC_COMPRESSION_H
#include <grpc/support/port_platform.h>
#include <stdlib.h>
#include <grpc/impl/compression_types.h> // IWYU pragma: export
#include <grpc/slice.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {

@ -14,13 +14,13 @@
#ifndef GRPC_EVENT_ENGINE_ENDPOINT_CONFIG_H
#define GRPC_EVENT_ENGINE_ENDPOINT_CONFIG_H
#include <grpc/support/port_platform.h>
#include <string>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {

@ -14,8 +14,6 @@
#ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H
#define GRPC_EVENT_ENGINE_EVENT_ENGINE_H
#include <grpc/support/port_platform.h>
#include <vector>
#include "absl/functional/any_invocable.h"
@ -27,6 +25,7 @@
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/port.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/support/port_platform.h>
// TODO(vigneshbabu): Define the Endpoint::Write metrics collection system
namespace grpc_event_engine {

@ -15,10 +15,10 @@
#ifndef GRPC_EVENT_ENGINE_EXTENSIBLE_H
#define GRPC_EVENT_ENGINE_EXTENSIBLE_H
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {

@ -14,8 +14,6 @@
#ifndef GRPC_EVENT_ENGINE_INTERNAL_MEMORY_ALLOCATOR_IMPL_H
#define GRPC_EVENT_ENGINE_INTERNAL_MEMORY_ALLOCATOR_IMPL_H
#include <grpc/support/port_platform.h>
#include <algorithm>
#include <memory>
#include <type_traits>
@ -23,6 +21,7 @@
#include <grpc/event_engine/memory_request.h>
#include <grpc/slice.h>
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {

@ -14,8 +14,6 @@
#ifndef GRPC_EVENT_ENGINE_MEMORY_ALLOCATOR_H
#define GRPC_EVENT_ENGINE_MEMORY_ALLOCATOR_H
#include <grpc/support/port_platform.h>
#include <stdlib.h> // for abort()
#include <algorithm>
@ -25,6 +23,7 @@
#include <grpc/event_engine/internal/memory_allocator_impl.h>
#include <grpc/slice.h>
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {

@ -14,12 +14,12 @@
#ifndef GRPC_EVENT_ENGINE_MEMORY_REQUEST_H
#define GRPC_EVENT_ENGINE_MEMORY_REQUEST_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include "absl/strings/string_view.h"
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {

@ -15,8 +15,6 @@
#ifndef GRPC_EVENT_ENGINE_SLICE_H
#define GRPC_EVENT_ENGINE_SLICE_H
#include <grpc/support/port_platform.h>
#include <string.h>
#include <cstdint>
@ -28,6 +26,7 @@
#include <grpc/event_engine/internal/slice_cast.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
// This public slice definition largely based of the internal grpc_core::Slice
// implementation. Changes to this implementation might warrant changes to the

@ -15,8 +15,6 @@
#ifndef GRPC_EVENT_ENGINE_SLICE_BUFFER_H
#define GRPC_EVENT_ENGINE_SLICE_BUFFER_H
#include <grpc/support/port_platform.h>
#include <string.h>
#include <cstdint>
@ -31,6 +29,7 @@
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {

@ -19,8 +19,6 @@
#ifndef GRPC_GRPC_H
#define GRPC_GRPC_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <grpc/byte_buffer.h>
@ -29,6 +27,7 @@
#include <grpc/impl/propagation_bits.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#ifdef __cplusplus

@ -19,8 +19,6 @@
#ifndef GRPC_GRPC_AUDIT_LOGGING_H
#define GRPC_GRPC_AUDIT_LOGGING_H
#include <grpc/support/port_platform.h>
#include <memory>
#include <string>
@ -28,6 +26,7 @@
#include "absl/strings/string_view.h"
#include <grpc/support/json.h>
#include <grpc/support/port_platform.h>
namespace grpc_core {
namespace experimental {

@ -19,8 +19,6 @@
#ifndef GRPC_GRPC_CRL_PROVIDER_H
#define GRPC_GRPC_CRL_PROVIDER_H
#include <grpc/support/port_platform.h>
#include <memory>
#include <string>
@ -28,6 +26,7 @@
#include "absl/strings/string_view.h"
#include <grpc/grpc_security.h>
#include <grpc/support/port_platform.h>
namespace grpc_core {
namespace experimental {

@ -19,9 +19,8 @@
#ifndef GRPC_GRPC_CRONET_H
#define GRPC_GRPC_CRONET_H
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {

@ -19,12 +19,11 @@
#ifndef GRPC_GRPC_POSIX_H
#define GRPC_GRPC_POSIX_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <grpc/grpc.h>
#include <grpc/impl/grpc_types.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {

@ -19,13 +19,12 @@
#ifndef GRPC_GRPC_SECURITY_H
#define GRPC_GRPC_SECURITY_H
#include <grpc/support/port_platform.h>
#include <stdbool.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security_constants.h>
#include <grpc/status.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {

@ -15,11 +15,10 @@
#ifndef GRPC_IMPL_CALL_H
#define GRPC_IMPL_CALL_H
#include <grpc/support/port_platform.h>
#include "absl/functional/any_invocable.h"
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
// Run a callback in the call's EventEngine.
// Internal-only

@ -21,14 +21,13 @@
// IWYU pragma: private, include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/impl/compression_types.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#ifdef __cplusplus

@ -21,10 +21,10 @@
// IWYU pragma: private, include <grpc/slice.h>
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <grpc/support/port_platform.h>
typedef struct grpc_slice grpc_slice;
/** Slice API

@ -37,7 +37,6 @@ header "byte_buffer.h"
header "impl/propagation_bits.h"
header "impl/slice_type.h"
header "load_reporting.h"
header "passive_listener.h"
header "slice.h"
header "slice_buffer.h"
header "status.h"

@ -1,63 +0,0 @@
// Copyright 2024 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_PASSIVE_LISTENER_H
#define GRPC_PASSIVE_LISTENER_H
#include <grpc/support/port_platform.h>
#include <memory>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
namespace grpc_core {
class Server;
namespace experimental {
class PassiveListenerImpl;
/// -- EXPERIMENTAL API --
/// Interface for used for Server Endpoint injection.
class PassiveListener {
public:
virtual ~PassiveListener() = default;
/// -- EXPERIMENTAL API --
///
/// Takes an Endpoint for an established connection, and treats it as if the
/// connection had been accepted by the server.
///
/// The server must be started before endpoints can be accepted.
virtual absl::Status AcceptConnectedEndpoint(
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint) = 0;
/// -- EXPERIMENTAL API --
///
/// Takes a connected file descriptor, and treats it as if the server had
/// accepted the connection itself.
///
/// Returns a failure status if the server's active EventEngine does not
/// support Endpoint creation from fds.
virtual absl::Status AcceptConnectedFd(int fd) = 0;
};
} // namespace experimental
} // namespace grpc_core
absl::Status grpc_server_add_passive_listener(
grpc_core::Server* server, grpc_server_credentials* credentials,
std::shared_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener);
#endif /* GRPC_PASSIVE_LISTENER_H */

@ -19,9 +19,8 @@
#ifndef GRPC_SLICE_H
#define GRPC_SLICE_H
#include <grpc/support/port_platform.h>
#include <grpc/impl/slice_type.h> // IWYU pragma: export
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
#ifdef __cplusplus

@ -19,9 +19,8 @@
#ifndef GRPC_SLICE_BUFFER_H
#define GRPC_SLICE_BUFFER_H
#include <grpc/support/port_platform.h>
#include <grpc/slice.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {

@ -19,10 +19,10 @@
#ifndef GRPC_SUPPORT_ALLOC_H
#define GRPC_SUPPORT_ALLOC_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {
#endif

@ -17,8 +17,6 @@
#ifndef GRPC_SUPPORT_JSON_H
#define GRPC_SUPPORT_JSON_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <map>
@ -29,6 +27,8 @@
#include "absl/strings/str_cat.h"
#include "absl/types/variant.h"
#include <grpc/support/port_platform.h>
namespace grpc_core {
namespace experimental {

@ -19,11 +19,11 @@
#ifndef GRPC_SUPPORT_LOG_H
#define GRPC_SUPPORT_LOG_H
#include <grpc/support/port_platform.h>
#include <stdarg.h>
#include <stdlib.h> /* for abort() */
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {
#endif

@ -15,10 +15,10 @@
#ifndef GRPC_SUPPORT_METRICS_H
#define GRPC_SUPPORT_METRICS_H
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
#include <grpc/support/port_platform.h>
namespace grpc_core {
namespace experimental {

@ -20,7 +20,6 @@
#define GRPC_SUPPORT_STRING_UTIL_H
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#ifdef __cplusplus

@ -21,7 +21,6 @@
/* Platform-specific type declarations of gpr_mu and gpr_cv. */
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h> /* for gpr_timespec */
#ifdef __cplusplus

@ -20,7 +20,6 @@
#define GRPC_SUPPORT_SYNC_ABSEIL_H
#include <grpc/support/port_platform.h>
#include <grpc/support/sync_generic.h>
#ifdef GPR_ABSEIL_SYNC

@ -20,7 +20,6 @@
#define GRPC_SUPPORT_SYNC_CUSTOM_H
#include <grpc/support/port_platform.h>
#include <grpc/support/sync_generic.h>
/* Users defining GPR_CUSTOM_SYNC need to define the following macros. */

@ -21,9 +21,8 @@
/* Generic type definitions for gpr_sync. */
#include <grpc/support/port_platform.h>
#include <grpc/support/atm.h>
#include <grpc/support/port_platform.h>
/* gpr_event */
typedef struct {

@ -19,10 +19,9 @@
#ifndef GRPC_SUPPORT_SYNC_POSIX_H
#define GRPC_SUPPORT_SYNC_POSIX_H
#include <grpc/support/port_platform.h>
#include <pthread.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync_generic.h>
#ifdef GRPC_ASAN_ENABLED

@ -19,11 +19,11 @@
#ifndef GRPC_SUPPORT_TIME_H
#define GRPC_SUPPORT_TIME_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <time.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {
#endif

@ -5,9 +5,6 @@ DerivePointerAlignment: false
PointerAlignment: Left
IncludeBlocks: Regroup
IncludeCategories:
# port_platform.h is before almost everything
- Regex: '^<grpc/(support|impl/codegen)/port_platform.h>'
Priority: -100
# ruby.h is even more first if it's included
- Regex: '^<ruby/ruby.h>'
Priority: -200

@ -5,9 +5,6 @@ DerivePointerAlignment: false
PointerAlignment: Left
IncludeBlocks: Regroup
IncludeCategories:
# port_platform.h is before almost everything
- Regex: '^<grpc/(support|impl/codegen)/port_platform.h>'
Priority: -100
# ruby.h is even more first if it's included
- Regex: '^<ruby/ruby.h>'
Priority: -200

@ -21,9 +21,8 @@
// IWYU pragma: private, include <grpcpp/support/status.h>
#include <grpc/support/port_platform.h>
#include <grpc/status.h>
#include <grpc/support/port_platform.h>
#include <grpcpp/support/config.h>
#include <grpcpp/support/status_code_enum.h>

@ -5,9 +5,6 @@ DerivePointerAlignment: false
PointerAlignment: Left
IncludeBlocks: Regroup
IncludeCategories:
# port_platform.h is before almost everything
- Regex: '^<grpc/(support|impl/codegen)/port_platform.h>'
Priority: -100
# ruby.h is even more first if it's included
- Regex: '^<ruby/ruby.h>'
Priority: -200

@ -84,7 +84,6 @@ class ServerCredentials : private grpc::internal::GrpcLibrary {
// Needed for access to AddPortToServer.
friend class Server;
// Needed for access to c_creds_.
friend class ServerBuilder;
friend std::shared_ptr<ServerCredentials> grpc::XdsServerCredentials(
const std::shared_ptr<ServerCredentials>& fallback_credentials);

@ -27,16 +27,12 @@
#include <vector>
#include <grpc/compression.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/passive_listener.h>
#include <grpc/support/cpu.h>
#include <grpc/support/workaround_list.h>
#include <grpcpp/impl/channel_argument_option.h>
#include <grpcpp/impl/server_builder_option.h>
#include <grpcpp/impl/server_builder_plugin.h>
#include <grpcpp/passive_listener.h>
#include <grpcpp/security/authorization_policy_provider.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/support/config.h>
#include <grpcpp/support/server_interceptor.h>
@ -63,7 +59,6 @@ class ExternalConnectionAcceptorImpl;
class CallbackGenericService;
namespace experimental {
// EXPERIMENTAL API:
// Interface for a grpc server to build transports with connections created out
// of band.
@ -297,18 +292,6 @@ class ServerBuilder {
void EnableCallMetricRecording(
experimental::ServerMetricRecorder* server_metric_recorder = nullptr);
// Creates a passive listener for Server Endpoint injection.
///
/// \a PasiveListener lets applications provide pre-established connections
/// to gRPC Servers. The server will behave as if it accepted the connection
/// itself on its own listening addresses.
///
/// This can be called multiple times to create passive listeners with
/// different server credentials.
ServerBuilder& AddPassiveListener(
std::shared_ptr<grpc::ServerCredentials> creds,
std::unique_ptr<grpc::experimental::PassiveListener>& passive_listener);
private:
ServerBuilder* builder_;
};
@ -382,17 +365,6 @@ class ServerBuilder {
private:
friend class grpc::testing::ServerBuilderPluginTest;
struct UnstartedPassiveListener {
std::weak_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener;
std::shared_ptr<grpc::ServerCredentials> credentials;
UnstartedPassiveListener(
std::weak_ptr<grpc_core::experimental::PassiveListenerImpl> listener,
std::shared_ptr<grpc::ServerCredentials> creds)
: passive_listener(std::move(listener)),
credentials(std::move(creds)) {}
};
struct SyncServerSettings {
SyncServerSettings()
: num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
@ -417,7 +389,6 @@ class ServerBuilder {
std::vector<std::unique_ptr<grpc::ServerBuilderOption>> options_;
std::vector<std::unique_ptr<NamedService>> services_;
std::vector<Port> ports_;
std::vector<UnstartedPassiveListener> unstarted_passive_listeners_;
SyncServerSettings sync_server_settings_;

1
package.xml generated

@ -80,7 +80,6 @@
<file baseinstalldir="/" name="include/grpc/impl/propagation_bits.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/slice_type.h" role="src" />
<file baseinstalldir="/" name="include/grpc/load_reporting.h" role="src" />
<file baseinstalldir="/" name="include/grpc/passive_listener.h" role="src" />
<file baseinstalldir="/" name="include/grpc/slice.h" role="src" />
<file baseinstalldir="/" name="include/grpc/slice_buffer.h" role="src" />
<file baseinstalldir="/" name="include/grpc/status.h" role="src" />

@ -5,9 +5,6 @@ DerivePointerAlignment: false
PointerAlignment: Left
IncludeBlocks: Regroup
IncludeCategories:
# port_platform.h is before almost everything
- Regex: '^<grpc/(support|impl/codegen)/port_platform.h>'
Priority: -100
# ruby.h is even more first if it's included
- Regex: '^<ruby/ruby.h>'
Priority: -200

@ -6786,8 +6786,6 @@ grpc_cc_library(
"connection_quota",
"error",
"error_utils",
"event_engine_extensions",
"event_engine_query_extensions",
"grpc_insecure_credentials",
"handshaker_registry",
"iomgr_fwd",
@ -7254,6 +7252,8 @@ grpc_cc_library(
],
deps = [
"call_final_info",
"latch",
"map",
"message",
"metadata",
"ref_counted",

@ -159,7 +159,7 @@ class BinderServerListener : public Server::ListenerInterface {
on_destroy_done_ = on_destroy_done;
}
void Orphan() override { Unref(); }
void Orphan() override { delete this; }
~BinderServerListener() override {
ExecCtx::Get()->Flush();
@ -239,8 +239,9 @@ bool AddBinderPort(const std::string& addr, grpc_server* server,
}
std::string conn_id = addr.substr(kBinderUriScheme.size());
Server* core_server = Server::FromC(server);
core_server->AddListener(MakeOrphanable<BinderServerListener>(
core_server, conn_id, std::move(factory), security_policy));
core_server->AddListener(
OrphanablePtr<Server::ListenerInterface>(new BinderServerListener(
core_server, conn_id, std::move(factory), security_policy)));
return true;
}

@ -103,8 +103,8 @@ absl::StatusOr<int> ChaoticGoodServerListener::Bind(
str.ok() ? str->c_str() : str.status().ToString().c_str());
}
EventEngine::Listener::AcceptCallback accept_cb =
[self = RefAsSubclass<ChaoticGoodServerListener>()](
std::unique_ptr<EventEngine::Endpoint> ep, MemoryAllocator) {
[self = Ref()](std::unique_ptr<EventEngine::Endpoint> ep,
MemoryAllocator) {
ExecCtx exec_ctx;
MutexLock lock(&self->mu_);
if (self->shutdown_) return;
@ -149,8 +149,7 @@ absl::Status ChaoticGoodServerListener::StartListening() {
ChaoticGoodServerListener::ActiveConnection::ActiveConnection(
RefCountedPtr<ChaoticGoodServerListener> listener,
std::unique_ptr<EventEngine::Endpoint> endpoint)
: memory_allocator_(listener->memory_allocator_),
listener_(std::move(listener)) {
: memory_allocator_(listener->memory_allocator_), listener_(listener) {
handshaking_state_ = MakeRefCounted<HandshakingState>(Ref());
handshaking_state_->Start(std::move(endpoint));
}

@ -50,7 +50,9 @@
namespace grpc_core {
namespace chaotic_good {
class ChaoticGoodServerListener final : public Server::ListenerInterface {
class ChaoticGoodServerListener final
: public Server::ListenerInterface,
public RefCounted<ChaoticGoodServerListener> {
public:
static absl::AnyInvocable<std::string()> DefaultConnectionIDGenerator() {
return [bitgen = absl::BitGen()]() mutable {

@ -43,7 +43,6 @@
#include <grpc/grpc.h>
#include <grpc/grpc_posix.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/passive_listener.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -57,8 +56,6 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/extensions/supports_fd.h"
#include "src/core/lib/event_engine/query_extensions.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -68,7 +65,6 @@
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -97,11 +93,9 @@
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
namespace grpc_core {
namespace {
using grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::EventEngineSupportsFdExtension;
using grpc_event_engine::experimental::QueryExtension;
using ::grpc_event_engine::experimental::EventEngine;
const char kUnixUriPrefix[] = "unix:";
const char kUnixAbstractUriPrefix[] = "unix-abstract:";
@ -118,23 +112,14 @@ class Chttp2ServerListener : public Server::ListenerInterface {
Server* server, const char* name, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier);
static Chttp2ServerListener* CreateForPassiveListener(
Server* server, const ChannelArgs& args,
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener);
// Do not instantiate directly. Use one of the factory methods above.
Chttp2ServerListener(Server* server, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier,
grpc_server_config_fetcher* config_fetcher,
std::shared_ptr<experimental::PassiveListenerImpl>
passive_listener = nullptr);
Chttp2ServerArgsModifier args_modifier);
~Chttp2ServerListener() override;
void Start(Server* server,
const std::vector<grpc_pollset*>* pollsets) override;
void AcceptConnectedEndpoint(std::unique_ptr<EventEngine::Endpoint> endpoint);
channelz::ListenSocketNode* channelz_listen_socket_node() const override {
return channelz_listen_socket_.get();
}
@ -144,8 +129,6 @@ class Chttp2ServerListener : public Server::ListenerInterface {
void Orphan() override;
private:
friend class experimental::PassiveListenerImpl;
class ConfigFetcherWatcher
: public grpc_server_config_fetcher::WatcherInterface {
public:
@ -252,8 +235,34 @@ class Chttp2ServerListener : public Server::ListenerInterface {
static void DestroyListener(Server* /*server*/, void* arg,
grpc_closure* destroy_done);
Server* const server_ = nullptr;
grpc_tcp_server* tcp_server_ = nullptr;
// The interface required by RefCountedPtr<> has been manually implemented
// here to take a ref on tcp_server_ instead. Note that, the handshaker
// needs tcp_server_ to exist for the lifetime of the handshake since it's
// needed by acceptor. Sharing refs between the listener and tcp_server_ is
// just an optimization to avoid taking additional refs on the listener,
// since TcpServerShutdownComplete already holds a ref to the listener.
void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
void IncrementRefCount(const DebugLocation& /* location */,
const char* /* reason */) {
IncrementRefCount();
}
GRPC_MUST_USE_RESULT RefCountedPtr<Chttp2ServerListener> Ref() {
IncrementRefCount();
return RefCountedPtr<Chttp2ServerListener>(this);
}
GRPC_MUST_USE_RESULT RefCountedPtr<Chttp2ServerListener> Ref(
const DebugLocation& /* location */, const char* /* reason */) {
return Ref();
}
void Unref() { grpc_tcp_server_unref(tcp_server_); }
void Unref(const DebugLocation& /* location */, const char* /* reason */) {
Unref();
}
Server* const server_;
grpc_tcp_server* tcp_server_;
grpc_resolved_address resolved_address_;
Chttp2ServerArgsModifier const args_modifier_;
ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
@ -276,10 +285,6 @@ class Chttp2ServerListener : public Server::ListenerInterface {
RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
MemoryQuotaRefPtr memory_quota_;
ConnectionQuotaRefPtr connection_quota_;
grpc_server_config_fetcher* config_fetcher_ = nullptr;
// TODO(yashykt): consider using absl::variant<> to minimize memory usage for
// disjoint cases where different fields are used.
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener_;
};
//
@ -376,17 +381,13 @@ Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
handshake_mgr_(MakeRefCounted<HandshakeManager>()),
deadline_(GetConnectionDeadline(args)),
interested_parties_(grpc_pollset_set_create()) {
if (accepting_pollset != nullptr) {
grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
}
CoreConfiguration::Get().handshaker_registry().AddHandshakers(
HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get());
}
Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
if (accepting_pollset_ != nullptr) {
grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
}
grpc_pollset_set_destroy(interested_parties_);
gpr_free(acceptor_);
}
@ -708,18 +709,19 @@ void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() {
grpc_error_handle Chttp2ServerListener::Create(
Server* server, grpc_resolved_address* addr, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier, int* port_num) {
Chttp2ServerListener* listener = nullptr;
// The bulk of this method is inside of a lambda to make cleanup
// easier without using goto.
grpc_error_handle error = [&]() {
grpc_error_handle error;
// Create Chttp2ServerListener.
OrphanablePtr<Chttp2ServerListener> listener =
MakeOrphanable<Chttp2ServerListener>(server, args, args_modifier,
server->config_fetcher());
// The tcp_server will be unreffed when the listener is orphaned, which could
// be at the end of this function if the listener was not added to the
// server's set of listeners.
grpc_error_handle error = grpc_tcp_server_create(
&listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args),
OnAccept, listener.get(), &listener->tcp_server_);
listener = new Chttp2ServerListener(server, args, args_modifier);
error = grpc_tcp_server_create(
&listener->tcp_server_shutdown_complete_,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
OnAccept, listener, &listener->tcp_server_);
if (!error.ok()) return error;
if (listener->config_fetcher_ != nullptr) {
if (server->config_fetcher() != nullptr) {
listener->resolved_address_ = *addr;
// TODO(yashykt): Consider binding so as to be able to return the port
// number.
@ -736,54 +738,54 @@ grpc_error_handle Chttp2ServerListener::Create(
}
listener->channelz_listen_socket_ =
MakeRefCounted<channelz::ListenSocketNode>(
*string_address, absl::StrCat("chttp2 listener ", *string_address));
*string_address,
absl::StrCat("chttp2 listener ", *string_address));
}
// Register with the server only upon success
server->AddListener(std::move(listener));
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
return absl::OkStatus();
}();
if (!error.ok()) {
if (listener != nullptr) {
if (listener->tcp_server_ != nullptr) {
// listener is deleted when tcp_server_ is shutdown.
grpc_tcp_server_unref(listener->tcp_server_);
} else {
delete listener;
}
}
}
return error;
}
grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
Server* server, const char* name, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier) {
auto listener = MakeOrphanable<Chttp2ServerListener>(
server, args, args_modifier, server->config_fetcher());
Chttp2ServerListener* listener =
new Chttp2ServerListener(server, args, args_modifier);
grpc_error_handle error = grpc_tcp_server_create(
&listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args),
OnAccept, listener.get(), &listener->tcp_server_);
if (!error.ok()) return error;
&listener->tcp_server_shutdown_complete_,
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
OnAccept, listener, &listener->tcp_server_);
if (!error.ok()) {
delete listener;
return error;
}
// TODO(yangg) channelz
TcpServerFdHandler** arg_val = args.GetPointer<TcpServerFdHandler*>(name);
*arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
server->AddListener(std::move(listener));
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
return absl::OkStatus();
}
Chttp2ServerListener* Chttp2ServerListener::CreateForPassiveListener(
Server* server, const ChannelArgs& args,
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener) {
// TODO(hork): figure out how to handle channelz in this case
auto listener = MakeOrphanable<Chttp2ServerListener>(
server, args, /*args_modifier=*/
[](const ChannelArgs& args, grpc_error_handle*) { return args; }, nullptr,
std::move(passive_listener));
auto listener_ptr = listener.get();
server->AddListener(std::move(listener));
return listener_ptr;
}
Chttp2ServerListener::Chttp2ServerListener(
Server* server, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier,
grpc_server_config_fetcher* config_fetcher,
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener)
Chttp2ServerArgsModifier args_modifier)
: server_(server),
args_modifier_(args_modifier),
args_(args),
memory_quota_(args.GetObject<ResourceQuota>()->memory_quota()),
connection_quota_(MakeRefCounted<ConnectionQuota>()),
config_fetcher_(config_fetcher),
passive_listener_(std::move(passive_listener)) {
connection_quota_(MakeRefCounted<ConnectionQuota>()) {
auto max_allowed_incoming_connections =
args.GetInt(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS);
if (max_allowed_incoming_connections.has_value()) {
@ -798,9 +800,6 @@ Chttp2ServerListener::~Chttp2ServerListener() {
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
ExecCtx::Get()->Flush();
if (passive_listener_ != nullptr) {
passive_listener_->ListenerDestroyed();
}
if (on_destroy_done_ != nullptr) {
ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, absl::OkStatus());
ExecCtx::Get()->Flush();
@ -810,11 +809,10 @@ Chttp2ServerListener::~Chttp2ServerListener() {
// Server callback: start listening on our ports
void Chttp2ServerListener::Start(
Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
if (config_fetcher_ != nullptr) {
auto watcher = std::make_unique<ConfigFetcherWatcher>(
RefAsSubclass<Chttp2ServerListener>());
if (server_->config_fetcher() != nullptr) {
auto watcher = std::make_unique<ConfigFetcherWatcher>(Ref());
config_fetcher_watcher_ = watcher.get();
config_fetcher_->StartWatch(
server_->config_fetcher()->StartWatch(
grpc_sockaddr_to_string(&resolved_address_, false).value(),
std::move(watcher));
} else {
@ -828,9 +826,7 @@ void Chttp2ServerListener::Start(
}
void Chttp2ServerListener::StartListening() {
if (tcp_server_ != nullptr) {
grpc_tcp_server_start(tcp_server_, &server_->pollsets());
}
}
void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
@ -838,12 +834,6 @@ void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
on_destroy_done_ = on_destroy_done;
}
void Chttp2ServerListener::AcceptConnectedEndpoint(
std::unique_ptr<EventEngine::Endpoint> endpoint) {
OnAccept(this, grpc_event_engine_endpoint_create(std::move(endpoint)),
/*accepting_pollset=*/nullptr, /*acceptor=*/nullptr);
}
void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) {
@ -868,7 +858,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
endpoint_cleanup(error);
return;
}
if (self->config_fetcher_ != nullptr) {
if (self->server_->config_fetcher() != nullptr) {
if (connection_manager == nullptr) {
grpc_error_handle error = GRPC_ERROR_CREATE(
"No ConnectionManager configured. Closing connection.");
@ -909,7 +899,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
// heap-use-after-free issues where `Ref()` is invoked when the ref of
// tcp_server_ has already reached 0. (Ref() implementation of
// Chttp2ServerListener is grpc_tcp_server_ref().)
listener_ref = self->RefAsSubclass<Chttp2ServerListener>();
listener_ref = self->Ref();
self->connections_.emplace(connection.get(), std::move(connection));
}
}
@ -924,7 +914,7 @@ void Chttp2ServerListener::TcpServerShutdownComplete(
void* arg, grpc_error_handle /*error*/) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
self->channelz_listen_socket_.reset();
self->Unref();
delete self;
}
// Server callback: destroy the tcp listener (so we don't generate further
@ -933,8 +923,7 @@ void Chttp2ServerListener::Orphan() {
// Cancel the watch before shutting down so as to avoid holding a ref to the
// listener in the watcher.
if (config_fetcher_watcher_ != nullptr) {
GPR_ASSERT(config_fetcher_ != nullptr);
config_fetcher_->CancelWatch(config_fetcher_watcher_);
server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
}
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
grpc_tcp_server* tcp_server;
@ -952,14 +941,12 @@ void Chttp2ServerListener::Orphan() {
}
tcp_server = tcp_server_;
}
if (tcp_server != nullptr) {
grpc_tcp_server_shutdown_listeners(tcp_server);
grpc_tcp_server_unref(tcp_server);
} else {
Unref();
}
}
} // namespace
//
// Chttp2ServerAddPort()
//
@ -1060,50 +1047,6 @@ ChannelArgs ModifyArgsForConnection(const ChannelArgs& args,
}
} // namespace
namespace experimental {
absl::Status PassiveListenerImpl::AcceptConnectedEndpoint(
std::unique_ptr<EventEngine::Endpoint> endpoint) {
GPR_ASSERT(server_ != nullptr);
RefCountedPtr<Chttp2ServerListener> listener;
{
MutexLock lock(&mu_);
if (listener_ != nullptr) {
listener =
listener_->RefIfNonZero().TakeAsSubclass<Chttp2ServerListener>();
}
}
if (listener == nullptr) {
return absl::UnavailableError("passive listener already shut down");
}
ExecCtx exec_ctx;
listener->AcceptConnectedEndpoint(std::move(endpoint));
return absl::OkStatus();
}
absl::Status PassiveListenerImpl::AcceptConnectedFd(int fd) {
GPR_ASSERT(server_ != nullptr);
ExecCtx exec_ctx;
auto& args = server_->channel_args();
auto* supports_fd = QueryExtension<EventEngineSupportsFdExtension>(
/*engine=*/args.GetObjectRef<EventEngine>().get());
if (supports_fd == nullptr) {
return absl::UnimplementedError(
"The server's EventEngine does not support adding endpoints from "
"connected file descriptors.");
}
auto endpoint =
supports_fd->CreateEndpointFromFd(fd, ChannelArgsEndpointConfig(args));
return AcceptConnectedEndpoint(std::move(endpoint));
}
void PassiveListenerImpl::ListenerDestroyed() {
MutexLock lock(&mu_);
listener_ = nullptr;
}
} // namespace experimental
} // namespace grpc_core
int grpc_server_add_http2_port(grpc_server* server, const char* addr,
@ -1201,31 +1144,3 @@ void grpc_server_add_channel_from_fd(grpc_server* /* server */, int /* fd */,
}
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
absl::Status grpc_server_add_passive_listener(
grpc_core::Server* server, grpc_server_credentials* credentials,
std::shared_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener) {
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_add_passive_listener(server=%p, credentials=%p)",
2, (server, credentials));
// Create security context.
if (credentials == nullptr) {
return absl::UnavailableError(
"No credentials specified for passive listener");
}
auto sc = credentials->create_security_connector(grpc_core::ChannelArgs());
if (sc == nullptr) {
return absl::UnavailableError(
absl::StrCat("Unable to create secure server with credentials of type ",
credentials->type().name()));
}
auto args = server->channel_args()
.SetObject(credentials->Ref())
.SetObject(std::move(sc));
passive_listener->listener_ =
grpc_core::Chttp2ServerListener::CreateForPassiveListener(
server, args, passive_listener);
passive_listener->server_ = server->Ref();
return absl::OkStatus();
}

@ -23,8 +23,6 @@
#include <functional>
#include <grpc/passive_listener.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/surface/server.h"
@ -44,38 +42,6 @@ grpc_error_handle Chttp2ServerAddPort(
Server* server, const char* addr, const ChannelArgs& args,
Chttp2ServerArgsModifier connection_args_modifier, int* port_num);
class Chttp2ServerListener;
namespace experimental {
// An implementation of the public C++ passive listener interface.
// The server builder holds a weak_ptr to one of these objects, and the
// application owns the instance.
class PassiveListenerImpl final : public PassiveListener {
public:
absl::Status AcceptConnectedEndpoint(
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint) override ABSL_LOCKS_EXCLUDED(mu_);
absl::Status AcceptConnectedFd(GRPC_UNUSED int fd) override
ABSL_LOCKS_EXCLUDED(mu_);
void ListenerDestroyed() ABSL_LOCKS_EXCLUDED(mu_);
private:
// note: the grpc_core::Server redundant namespace qualification is
// required for older gcc versions.
friend absl::Status(::grpc_server_add_passive_listener)(
grpc_core::Server* server, grpc_server_credentials* credentials,
std::shared_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener);
Mutex mu_;
// Data members will be populated when initialized.
RefCountedPtr<Server> server_;
Chttp2ServerListener* listener_;
};
} // namespace experimental
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H

@ -24,7 +24,9 @@
#include <algorithm>
#include <functional>
#include <memory>
#include <string>
#include <type_traits>
#include <vector>
#include "absl/cleanup/cleanup.h"
#include "absl/strings/match.h"
@ -555,6 +557,67 @@ void XdsClient::XdsChannel::UnsubscribeLocked(const XdsResourceType* type,
}
}
bool XdsClient::XdsChannel::MaybeFallbackLocked(
const std::string& authority, AuthorityState& authority_state) {
if (!xds_client_->HasUncachedResources(authority_state)) {
return false;
}
std::vector<const XdsBootstrap::XdsServer*> xds_servers;
if (authority != kOldStyleAuthority) {
xds_servers =
xds_client_->bootstrap().LookupAuthority(authority)->servers();
}
if (xds_servers.empty()) xds_servers = xds_client_->bootstrap().servers();
for (size_t i = authority_state.xds_channels.size(); i < xds_servers.size();
++i) {
authority_state.xds_channels.emplace_back(
xds_client_->GetOrCreateXdsChannelLocked(*xds_servers[i], "fallback"));
for (const auto& type_resource : authority_state.resource_map) {
for (const auto& key_state : type_resource.second) {
authority_state.xds_channels.back()->SubscribeLocked(
type_resource.first, {authority, key_state.first});
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] authority %s: added fallback server %s (%s)",
xds_client_.get(), authority.c_str(),
xds_servers[i]->server_uri().c_str(),
authority_state.xds_channels.back()->status().ToString().c_str());
}
if (authority_state.xds_channels.back()->status().ok()) return true;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] authority %s: No fallback server",
xds_client_.get(), authority.c_str());
}
return false;
}
void XdsClient::XdsChannel::SetHealthyLocked() {
status_ = absl::OkStatus();
// Make this channel active iff:
// 1. Channel is on the list of authority channels
// 2. Channel is not the last channel on the list (i.e. not the active
// channel)
for (auto& authority : xds_client_->authority_state_map_) {
auto& channels = authority.second.xds_channels;
// Skip if channel is active.
if (channels.back() == this) continue;
auto channel_it = std::find(channels.begin(), channels.end(), this);
// Skip if this is not on the list
if (channel_it != channels.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] authority %s: Falling forward to %s",
xds_client_.get(), authority.first.c_str(),
server_.server_uri().c_str());
}
// Lower priority channels are no longer needed, connection is back!
channels.erase(channel_it + 1, channels.end());
}
}
}
void XdsClient::XdsChannel::OnConnectivityFailure(absl::Status status) {
{
MutexLock lock(&xds_client_->mu_);
@ -584,8 +647,11 @@ void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) {
status_ = status;
// Find all watchers for this channel.
std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
for (const auto& a : xds_client_->authority_state_map_) { // authority
if (a.second.xds_channel != this) continue;
for (auto& a : xds_client_->authority_state_map_) { // authority
if (a.second.xds_channels.empty() || a.second.xds_channels.back() != this ||
MaybeFallbackLocked(a.first, a.second)) {
continue;
}
for (const auto& t : a.second.resource_map) { // type
for (const auto& r : t.second) { // resource id
for (const auto& w : r.second.watchers) { // watchers
@ -594,6 +660,7 @@ void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) {
}
}
}
if (!watchers.empty()) {
// Enqueue notification for the watchers.
xds_client_->work_serializer_.Schedule(
[watchers = std::move(watchers), status = std::move(status)]()
@ -603,6 +670,7 @@ void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) {
}
},
DEBUG_LOCATION);
}
}
//
@ -955,10 +1023,13 @@ XdsClient::XdsChannel::AdsCall::AdsCall(
}
// If this is a reconnect, add any necessary subscriptions from what's
// already in the cache.
for (const auto& a : xds_client()->authority_state_map_) {
for (auto& a : xds_client()->authority_state_map_) {
const std::string& authority = a.first;
// Skip authorities that are not using this xDS channel.
if (a.second.xds_channel != xds_channel()) continue;
auto it = std::find(a.second.xds_channels.begin(),
a.second.xds_channels.end(), xds_channel());
// Skip authorities that are not using this xDS channel. The channel can be
// anywhere in the list.
if (it == a.second.xds_channels.end()) continue;
for (const auto& t : a.second.resource_map) {
const XdsResourceType* type = t.first;
for (const auto& r : t.second) {
@ -1097,7 +1168,7 @@ void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) {
status.ToString().c_str());
} else {
seen_response_ = true;
xds_channel()->status_ = absl::OkStatus();
xds_channel()->SetHealthyLocked();
// Update nonce.
auto& state = state_map_[result.type];
state.nonce = result.nonce;
@ -1120,7 +1191,9 @@ void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) {
const std::string& authority = a.first;
AuthorityState& authority_state = a.second;
// Skip authorities that are not using this xDS channel.
if (authority_state.xds_channel != xds_channel()) continue;
if (authority_state.xds_channels.back() != xds_channel()) {
continue;
}
auto seen_authority_it = result.resources_seen.find(authority);
// Find this resource type.
auto type_it = authority_state.resource_map.find(result.type);
@ -1567,6 +1640,18 @@ RefCountedPtr<XdsClient::XdsChannel> XdsClient::GetOrCreateXdsChannelLocked(
return xds_channel;
}
bool XdsClient::HasUncachedResources(const AuthorityState& authority_state) {
for (const auto& type_resource : authority_state.resource_map) {
for (const auto& key_state : type_resource.second) {
if (key_state.second.meta.client_status ==
XdsApi::ResourceMetadata::REQUESTED) {
return true;
}
}
}
return false;
}
void XdsClient::WatchResource(const XdsResourceType* type,
absl::string_view name,
RefCountedPtr<ResourceWatcherInterface> watcher) {
@ -1592,7 +1677,7 @@ void XdsClient::WatchResource(const XdsResourceType* type,
return;
}
// Find server to use.
const XdsBootstrap::XdsServer* xds_server = nullptr;
std::vector<const XdsBootstrap::XdsServer*> xds_servers;
if (resource_name->authority != kOldStyleAuthority) {
auto* authority =
bootstrap_->LookupAuthority(std::string(resource_name->authority));
@ -1602,18 +1687,47 @@ void XdsClient::WatchResource(const XdsResourceType* type,
"\" not present in bootstrap config")));
return;
}
xds_server =
authority->servers().empty() ? nullptr : authority->servers().front();
xds_servers = authority->servers();
}
if (xds_server == nullptr) xds_server = bootstrap_->servers().front();
if (xds_servers.empty()) xds_servers = bootstrap_->servers();
{
MutexLock lock(&mu_);
MaybeRegisterResourceTypeLocked(type);
AuthorityState& authority_state =
authority_state_map_[resource_name->authority];
ResourceState& resource_state =
authority_state.resource_map[type][resource_name->key];
auto it_is_new = authority_state.resource_map[type].emplace(
resource_name->key, ResourceState());
bool first_watcher_for_resource = it_is_new.second;
ResourceState& resource_state = it_is_new.first->second;
resource_state.watchers[w] = watcher;
if (first_watcher_for_resource) {
// We try to add new channels in 2 cases:
// - This is the first resource for this authority (i.e., the list
// of channels is empty).
// - The last channel in the list is failing. That failure may not
// have previously triggered fallback if there were no uncached
// resources, but we've just added a new uncached resource,
// so we need to trigger fallback now.
//
// Note that when we add a channel, it might already be failing
// due to being used in a different authority. So we keep going
// until either we add one that isn't failing or we've added them all.
if (authority_state.xds_channels.empty() ||
!authority_state.xds_channels.back()->status().ok()) {
for (size_t i = authority_state.xds_channels.size();
i < xds_servers.size(); ++i) {
authority_state.xds_channels.emplace_back(
GetOrCreateXdsChannelLocked(*xds_servers[i], "start watch"));
if (authority_state.xds_channels.back()->status().ok()) {
break;
}
}
}
for (const auto& channel : authority_state.xds_channels) {
channel->SubscribeLocked(type, *resource_name);
}
} else {
// If we already have a cached value for the resource, notify the new
// watcher immediately.
if (resource_state.resource != nullptr) {
@ -1633,8 +1747,8 @@ void XdsClient::WatchResource(const XdsResourceType* type,
XdsApi::ResourceMetadata::DOES_NOT_EXIST) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] reporting cached does-not-exist for %s", this,
std::string(name).c_str());
"[xds_client %p] reporting cached does-not-exist for %s",
this, std::string(name).c_str());
}
work_serializer_.Schedule(
[watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
@ -1653,7 +1767,8 @@ void XdsClient::WatchResource(const XdsResourceType* type,
std::string details = resource_state.meta.failed_details;
const auto* node = bootstrap_->node();
if (node != nullptr) {
absl::StrAppend(&details, " (node ID:", bootstrap_->node()->id(), ")");
absl::StrAppend(&details, " (node ID:", bootstrap_->node()->id(),
")");
}
work_serializer_.Schedule(
[watcher, details = std::move(details)]()
@ -1664,13 +1779,8 @@ void XdsClient::WatchResource(const XdsResourceType* type,
},
DEBUG_LOCATION);
}
// If the authority doesn't yet have a channel, set it, creating it if
// needed.
if (authority_state.xds_channel == nullptr) {
authority_state.xds_channel =
GetOrCreateXdsChannelLocked(*xds_server, "start watch");
}
absl::Status channel_status = authority_state.xds_channel->status();
absl::Status channel_status = authority_state.xds_channels.back()->status();
if (!channel_status.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
@ -1685,7 +1795,6 @@ void XdsClient::WatchResource(const XdsResourceType* type,
},
DEBUG_LOCATION);
}
authority_state.xds_channel->SubscribeLocked(type, *resource_name);
}
work_serializer_.DrainQueue();
}
@ -1723,13 +1832,15 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type,
this, std::string(type->type_url()).c_str(),
std::string(name).c_str());
}
authority_state.xds_channel->UnsubscribeLocked(type, *resource_name,
for (const auto& xds_channel : authority_state.xds_channels) {
xds_channel->UnsubscribeLocked(type, *resource_name,
delay_unsubscription);
}
type_map.erase(resource_it);
if (type_map.empty()) {
authority_state.resource_map.erase(type_it);
if (authority_state.resource_map.empty()) {
authority_state.xds_channel.reset();
authority_state.xds_channels.clear();
}
}
}

@ -201,6 +201,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
}
};
struct AuthorityState;
struct XdsResourceName {
std::string authority;
XdsResourceKey key;
@ -244,6 +246,12 @@ class XdsClient : public DualRefCounted<XdsClient> {
absl::string_view server_uri() const { return server_.server_uri(); }
private:
// Attempts to find a suitable Xds fallback server. Returns true if
// a connection to a suitable server had been established.
bool MaybeFallbackLocked(const std::string& authority,
XdsClient::AuthorityState& authority_state)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void SetHealthyLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void Orphaned() override;
void OnConnectivityFailure(absl::Status status);
@ -283,7 +291,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
};
struct AuthorityState {
RefCountedPtr<XdsChannel> xds_channel;
std::vector<RefCountedPtr<XdsChannel>> xds_channels;
std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>>
resource_map;
};
@ -339,10 +347,10 @@ class XdsClient : public DualRefCounted<XdsClient> {
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
RefCountedPtr<XdsChannel> GetOrCreateXdsChannelLocked(
const XdsBootstrap::XdsServer& server, const char* reason)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
bool HasUncachedResources(const AuthorityState& authority_state);
std::unique_ptr<XdsBootstrap> bootstrap_;
OrphanablePtr<XdsTransportFactory> transport_factory_;

@ -257,8 +257,8 @@ absl::StatusOr<RefCountedPtr<GrpcXdsClient>> GrpcXdsClient::GetOrCreate(
MakeOrphanable<GrpcXdsTransportFactory>(channel_args));
g_xds_client_map->emplace(xds_client->key(), xds_client.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "xDS client for key: %s was created",
std::string(key).c_str());
gpr_log(GPR_INFO, "[xds_client %p] Created xDS client for key %s",
xds_client.get(), std::string(key).c_str());
}
return xds_client;
}

@ -131,8 +131,12 @@ class TcpTracerInterface {
};
virtual ~TcpTracerInterface() = default;
// Records a per-message event with an optional snapshot of connection
// metrics.
virtual void RecordEvent(Type type, absl::Time time, size_t byte_offset,
absl::optional<ConnectionMetrics> metrics) = 0;
// Records a snapshot of connection metrics.
virtual void RecordConnectionMetrics(ConnectionMetrics metrics) = 0;
};
} // namespace grpc_core

@ -113,13 +113,6 @@ class EventEngineSupportsFdExtension {
int fd, const EndpointConfig& config,
MemoryAllocator memory_allocator) = 0;
/// Creates an EventEngine::Endpoint from an fd which is already assumed to be
/// connected to a remote peer. See \a CreatePosixEndpointFromFd for details.
/// This has the same behavior, but the \a memory_allocator is taken from the
/// EndpointConfig's resource quota.
virtual std::unique_ptr<EventEngine::Endpoint> CreateEndpointFromFd(
int fd, const EndpointConfig& config) = 0;
/// Called when the posix listener has accepted a new client connection.
/// \a listener_fd - The listening socket fd that accepted the new client
/// connection.

@ -662,7 +662,7 @@ PosixEventEngine::CreatePosixEndpointFromFd(int fd,
const EndpointConfig& config,
MemoryAllocator memory_allocator) {
#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
GPR_ASSERT(fd > 0);
GPR_DEBUG_ASSERT(fd > 0);
PosixEventPoller* poller = poller_manager_->Poller();
GPR_DEBUG_ASSERT(poller != nullptr);
EventHandle* handle =
@ -677,22 +677,6 @@ PosixEventEngine::CreatePosixEndpointFromFd(int fd,
#endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
std::unique_ptr<EventEngine::Endpoint> PosixEventEngine::CreateEndpointFromFd(
int fd, const EndpointConfig& config) {
auto options = TcpOptionsFromEndpointConfig(config);
MemoryAllocator allocator;
if (options.memory_allocator_factory != nullptr) {
return CreatePosixEndpointFromFd(
fd, config,
options.memory_allocator_factory->CreateMemoryAllocator(
absl::StrCat("allocator:", fd)));
}
return CreatePosixEndpointFromFd(
fd, config,
options.resource_quota->memory_quota()->CreateMemoryAllocator(
absl::StrCat("allocator:", fd)));
}
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
PosixEventEngine::CreateListener(
Listener::AcceptCallback on_accept,

@ -173,8 +173,6 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
std::unique_ptr<EventEngine::Endpoint> CreatePosixEndpointFromFd(
int fd, const EndpointConfig& config,
MemoryAllocator memory_allocator) override;
std::unique_ptr<EventEngine::Endpoint> CreateEndpointFromFd(
int fd, const EndpointConfig& config) override;
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,

@ -176,6 +176,14 @@ class ValueOrFailure {
return value_ == other.value_;
}
bool operator!=(const ValueOrFailure& other) const {
return value_ != other.value_;
}
bool operator==(const T& other) const { return value_ == other; }
bool operator!=(const T& other) const { return value_ != other; }
private:
absl::optional<T> value_;
};

@ -41,7 +41,6 @@
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include <grpc/passive_listener.h>
#include <grpc/slice.h>
#include <grpc/support/time.h>
@ -76,9 +75,6 @@
"grpc.server.max_pending_requests_hard_limit"
namespace grpc_core {
namespace experimental {
class PassiveListenerImpl;
} // namespace experimental
extern TraceFlag grpc_server_channel_trace;
@ -117,7 +113,7 @@ class Server : public ServerInterface,
/// Interface for listeners.
/// Implementations must override the Orphan() method, which should stop
/// listening and initiate destruction of the listener.
class ListenerInterface : public InternallyRefCounted<ListenerInterface> {
class ListenerInterface : public Orphanable {
public:
~ListenerInterface() override = default;
@ -217,14 +213,6 @@ class Server : public ServerInterface,
void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
private:
// note: the grpc_core::Server redundant namespace qualification is
// required for older gcc versions.
// TODO(yashykt): eliminate this friend statement as part of your upcoming
// server listener refactoring.
friend absl::Status(::grpc_server_add_passive_listener)(
grpc_core::Server* server, grpc_server_credentials* credentials,
std::shared_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener);
struct RequestedCall;
class RequestMatcherInterface;

@ -202,15 +202,53 @@ void CallFilters::Finalize(const grpc_call_final_info* final_info) {
}
}
void CallFilters::CancelDueToFailedPipeOperation() {
void CallFilters::CancelDueToFailedPipeOperation(SourceLocation but_where) {
// We expect something cancelled before now
if (server_trailing_metadata_ == nullptr) return;
gpr_log(GPR_DEBUG, "Cancelling due to failed pipe operation");
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(but_where.file(), but_where.line(), GPR_LOG_SEVERITY_DEBUG,
"Cancelling due to failed pipe operation: %s",
DebugString().c_str());
}
server_trailing_metadata_ =
ServerMetadataFromStatus(absl::CancelledError("Failed pipe operation"));
server_trailing_metadata_waiter_.Wake();
}
void CallFilters::PushServerTrailingMetadata(ServerMetadataHandle md) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_DEBUG, "%s Push server trailing metadata: %s into %s",
GetContext<Activity>()->DebugTag().c_str(),
md->DebugString().c_str(), DebugString().c_str());
}
GPR_ASSERT(md != nullptr);
if (server_trailing_metadata_ != nullptr) return;
server_trailing_metadata_ = std::move(md);
client_initial_metadata_state_.CloseWithError();
server_initial_metadata_state_.CloseSending();
client_to_server_message_state_.CloseWithError();
server_to_client_message_state_.CloseWithError();
server_trailing_metadata_waiter_.Wake();
}
std::string CallFilters::DebugString() const {
std::vector<std::string> components = {
absl::StrFormat("this:%p", this),
absl::StrCat("client_initial_metadata:",
client_initial_metadata_state_.DebugString()),
ServerInitialMetadataPromises::DebugString("server_initial_metadata",
this),
ClientToServerMessagePromises::DebugString("client_to_server_message",
this),
ServerToClientMessagePromises::DebugString("server_to_client_message",
this),
absl::StrCat("server_trailing_metadata:",
server_trailing_metadata_ == nullptr
? "not-set"
: server_trailing_metadata_->DebugString())};
return absl::StrCat("CallFilters{", absl::StrJoin(components, ", "), "}");
};
///////////////////////////////////////////////////////////////////////////////
// CallFilters::Stack
@ -251,6 +289,49 @@ void filters_detail::PipeState::Start() {
wait_recv_.Wake();
}
void filters_detail::PipeState::CloseWithError() {
if (state_ == ValueState::kClosed) return;
state_ = ValueState::kError;
wait_recv_.Wake();
wait_send_.Wake();
}
Poll<bool> filters_detail::PipeState::PollClosed() {
switch (state_) {
case ValueState::kIdle:
case ValueState::kWaiting:
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
return wait_recv_.pending();
case ValueState::kClosed:
return false;
case ValueState::kError:
return true;
}
GPR_UNREACHABLE_CODE(return Pending{});
}
void filters_detail::PipeState::CloseSending() {
switch (state_) {
case ValueState::kIdle:
state_ = ValueState::kClosed;
break;
case ValueState::kWaiting:
state_ = ValueState::kClosed;
wait_recv_.Wake();
break;
case ValueState::kClosed:
case ValueState::kError:
break;
case ValueState::kQueued:
case ValueState::kReady:
case ValueState::kProcessing:
Crash("Only one push allowed to be outstanding");
break;
}
}
void filters_detail::PipeState::BeginPush() {
switch (state_) {
case ValueState::kIdle:
@ -320,7 +401,7 @@ Poll<StatusFlag> filters_detail::PipeState::PollPush() {
GPR_UNREACHABLE_CODE(return Pending{});
}
Poll<StatusFlag> filters_detail::PipeState::PollPull() {
Poll<ValueOrFailure<bool>> filters_detail::PipeState::PollPull() {
switch (state_) {
case ValueState::kWaiting:
return wait_recv_.pending();
@ -331,10 +412,11 @@ Poll<StatusFlag> filters_detail::PipeState::PollPull() {
case ValueState::kQueued:
if (!started_) return wait_recv_.pending();
state_ = ValueState::kProcessing;
return Success{};
return true;
case ValueState::kProcessing:
Crash("Only one pull allowed to be outstanding");
case ValueState::kClosed:
return false;
case ValueState::kError:
return Failure{};
}
@ -358,4 +440,32 @@ void filters_detail::PipeState::AckPull() {
}
}
std::string filters_detail::PipeState::DebugString() const {
const char* state_str = "<<invalid-value>>";
switch (state_) {
case ValueState::kIdle:
state_str = "Idle";
break;
case ValueState::kWaiting:
state_str = "Waiting";
break;
case ValueState::kQueued:
state_str = "Queued";
break;
case ValueState::kReady:
state_str = "Ready";
break;
case ValueState::kProcessing:
state_str = "Processing";
break;
case ValueState::kClosed:
state_str = "Closed";
break;
case ValueState::kError:
state_str = "Error";
break;
}
return absl::StrCat(state_str, started_ ? "" : " (not started)");
}
} // namespace grpc_core

@ -23,6 +23,8 @@
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/transport/call_final_info.h"
@ -771,6 +773,55 @@ struct AddOpImpl<
}
};
// PROMISE_RETURNING(absl::StatusOr<$VALUE_HANDLE>)
// $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*)
template <typename FilterType, typename T, typename R,
R (FilterType::Call::*impl)(T, FilterType*)>
struct AddOpImpl<FilterType, T, R (FilterType::Call::*)(T, FilterType*), impl,
absl::enable_if_t<std::is_same<absl::StatusOr<T>,
PromiseResult<R>>::value>> {
static void Add(FilterType* channel_data, size_t call_offset,
Layout<FallibleOperator<T>>& to) {
class Promise {
public:
Promise(T value, typename FilterType::Call* call_data,
FilterType* channel_data)
: impl_((call_data->*impl)(std::move(value), channel_data)) {}
Poll<ResultOr<T>> PollOnce() {
auto p = impl_();
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
this->~Promise();
if (r->ok()) return ResultOr<T>{std::move(**r), nullptr};
return ResultOr<T>{nullptr, ServerMetadataFromStatus(r->status())};
}
private:
GPR_NO_UNIQUE_ADDRESS R impl_;
};
to.Add(sizeof(Promise), alignof(Promise),
FallibleOperator<T>{
channel_data,
call_offset,
[](void* promise_data, void* call_data, void* channel_data,
T value) -> Poll<ResultOr<T>> {
auto* promise = new (promise_data)
Promise(std::move(value),
static_cast<typename FilterType::Call*>(call_data),
static_cast<FilterType*>(channel_data));
return promise->PollOnce();
},
[](void* promise_data) {
return static_cast<Promise*>(promise_data)->PollOnce();
},
[](void* promise_data) {
static_cast<Promise*>(promise_data)->~Promise();
},
});
}
};
struct ChannelDataDestructor {
void (*destroy)(void* channel_data);
void* channel_data;
@ -783,7 +834,7 @@ struct ChannelDataDestructor {
// in-flight calls.
struct StackData {
// Overall size and alignment of the call data for this stack.
size_t call_data_alignment = 0;
size_t call_data_alignment = 1;
size_t call_data_size = 0;
// A complete list of filters for this call, so that we can construct the
// call data for each filter.
@ -1104,14 +1155,25 @@ class PipeState {
void DropPush();
// Poll for push completion: occurs after the corresponding Pull()
Poll<StatusFlag> PollPush();
Poll<StatusFlag> PollPull();
// Poll for pull completion; returns Failure{} if closed with error,
// true if a value is available, or false if the pipe was closed without
// error.
Poll<ValueOrFailure<bool>> PollPull();
// A pulled value has been consumed: we can unblock the push
void AckPull();
// A previously started pull operation has completed
void DropPull();
// Close sending
void CloseSending();
// Close sending with error
void CloseWithError();
// Poll for closedness - if true, closed with error
Poll<bool> PollClosed();
bool holds_error() const { return state_ == ValueState::kError; }
std::string DebugString() const;
private:
enum class ValueState : uint8_t {
// Nothing sending nor receiving
@ -1248,6 +1310,44 @@ class CallFilters {
filters_detail::StackData data_;
};
class NextMessage {
public:
NextMessage() : has_value_(false), cancelled_(false) {}
explicit NextMessage(MessageHandle value)
: has_value_(true), value_(std::move(value)) {}
explicit NextMessage(bool cancelled)
: has_value_(false), cancelled_(cancelled) {}
NextMessage(const NextMessage&) = delete;
NextMessage& operator=(const NextMessage&) = delete;
NextMessage(NextMessage&& other) noexcept = default;
NextMessage& operator=(NextMessage&& other) = default;
using value_type = MessageHandle;
void reset() {
has_value_ = false;
cancelled_ = false;
value_.reset();
}
bool has_value() const { return has_value_; }
const MessageHandle& value() const {
GPR_DEBUG_ASSERT(has_value_);
return value_;
}
MessageHandle& value() {
GPR_DEBUG_ASSERT(has_value_);
return value_;
}
const MessageHandle& operator*() const { return value(); }
MessageHandle& operator*() { return value(); }
bool cancelled() const { return !has_value_ && cancelled_; }
private:
bool has_value_;
bool cancelled_;
MessageHandle value_;
};
explicit CallFilters(ClientMetadataHandle client_initial_metadata);
~CallFilters();
@ -1258,25 +1358,59 @@ class CallFilters {
void SetStack(RefCountedPtr<Stack> stack);
// Access client initial metadata before it's processed
ClientMetadata* unprocessed_client_initial_metadata() {
return client_initial_metadata_.get();
}
// Client: Fetch client initial metadata
// Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle>
GRPC_MUST_USE_RESULT auto PullClientInitialMetadata();
// Server: Indicate that no server initial metadata will be sent
void NoServerInitialMetadata() {
server_initial_metadata_state_.CloseSending();
}
// Server: Push server initial metadata
// Returns a promise that resolves to a StatusFlag indicating success
GRPC_MUST_USE_RESULT auto PushServerInitialMetadata(ServerMetadataHandle md);
// Client: Fetch server initial metadata
// Returns a promise that resolves to ValueOrFailure<ServerMetadataHandle>
GRPC_MUST_USE_RESULT auto PullServerInitialMetadata();
// Client: Push client to server message
// Returns a promise that resolves to a StatusFlag indicating success
GRPC_MUST_USE_RESULT auto PushClientToServerMessage(MessageHandle message);
// Client: Indicate that no more messages will be sent
void FinishClientToServerSends() {
client_to_server_message_state_.CloseSending();
}
// Server: Fetch client to server message
// Returns a promise that resolves to ValueOrFailure<MessageHandle>
GRPC_MUST_USE_RESULT auto PullClientToServerMessage();
// Server: Push server to client message
// Returns a promise that resolves to a StatusFlag indicating success
GRPC_MUST_USE_RESULT auto PushServerToClientMessage(MessageHandle message);
// Server: Fetch server to client message
// Returns a promise that resolves to ValueOrFailure<MessageHandle>
GRPC_MUST_USE_RESULT auto PullServerToClientMessage();
void PushServerTrailingMetadata(ServerMetadataHandle md) {
GPR_ASSERT(md != nullptr);
if (server_trailing_metadata_ != nullptr) return;
server_trailing_metadata_ = std::move(md);
server_trailing_metadata_waiter_.Wake();
}
// Server: Indicate end of response
// Closes the request entirely - no messages can be sent/received
// If no server initial metadata has been sent, implies
// NoServerInitialMetadata() called.
void PushServerTrailingMetadata(ServerMetadataHandle md);
// Client: Fetch server trailing metadata
// Returns a promise that resolves to ServerMetadataHandle
GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata();
// Server: Wait for server trailing metadata to have been sent
// Returns a promise that resolves to a StatusFlag indicating whether the
// request was cancelled or not -- failure to send trailing metadata is
// considered a cancellation, as is actual cancellation -- but not application
// errors.
GRPC_MUST_USE_RESULT auto WasCancelled();
// Client & server: fill in final_info with the final status of the call.
void Finalize(const grpc_call_final_info* final_info);
std::string DebugString() const;
private:
template <filters_detail::PipeState(CallFilters::*state_ptr),
void*(CallFilters::*push_ptr), typename T,
@ -1315,6 +1449,10 @@ class CallFilters {
T TakeValue() { return std::move(value_); }
absl::string_view DebugString() const {
return value_ != nullptr ? " (not pulled)" : "";
}
private:
filters_detail::PipeState& state() { return filters_->*state_ptr; }
void*& push_slot() { return filters_->*push_ptr; }
@ -1323,24 +1461,36 @@ class CallFilters {
T value_;
};
class Pull {
static std::string DebugString(absl::string_view name,
const CallFilters* filters) {
auto* push = static_cast<Push*>(filters->*push_ptr);
return absl::StrCat(name, ":", (filters->*state_ptr).DebugString(),
push == nullptr ? "" : push->DebugString());
}
class PullMaybe {
public:
explicit Pull(CallFilters* filters) : filters_(filters) {}
~Pull() {
explicit PullMaybe(CallFilters* filters) : filters_(filters) {}
~PullMaybe() {
if (filters_ != nullptr) {
state().DropPull();
}
}
Pull(const Pull&) = delete;
Pull& operator=(const Pull&) = delete;
Pull(Pull&& other) noexcept
PullMaybe(const PullMaybe&) = delete;
PullMaybe& operator=(const PullMaybe&) = delete;
PullMaybe(PullMaybe&& other) noexcept
: filters_(std::exchange(other.filters_, nullptr)),
executor_(std::move(other.executor_)) {}
Pull& operator=(Pull&&) = delete;
PullMaybe& operator=(PullMaybe&&) = delete;
Poll<ValueOrFailure<T>> operator()() {
Poll<ValueOrFailure<absl::optional<T>>> operator()() {
if (executor_.IsRunning()) {
auto c = state().PollClosed();
if (c.ready() && c.value()) {
filters_->CancelDueToFailedPipeOperation();
return Failure{};
}
return FinishOperationExecutor(executor_.Step(filters_->call_data_));
}
auto p = state().PollPull();
@ -1350,6 +1500,7 @@ class CallFilters {
filters_->CancelDueToFailedPipeOperation();
return Failure{};
}
if (!**r) return absl::nullopt;
return FinishOperationExecutor(executor_.Start(
layout(), push()->TakeValue(), filters_->call_data_));
}
@ -1362,7 +1513,7 @@ class CallFilters {
return &(filters_->stack_->data_.*layout_ptr);
}
Poll<ValueOrFailure<T>> FinishOperationExecutor(
Poll<ValueOrFailure<absl::optional<T>>> FinishOperationExecutor(
Poll<filters_detail::ResultOr<T>> p) {
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
@ -1376,6 +1527,66 @@ class CallFilters {
CallFilters* filters_;
filters_detail::OperationExecutor<T> executor_;
};
class PullMessage {
public:
explicit PullMessage(CallFilters* filters) : filters_(filters) {}
~PullMessage() {
if (filters_ != nullptr) {
state().DropPull();
}
}
PullMessage(const PullMessage&) = delete;
PullMessage& operator=(const PullMessage&) = delete;
PullMessage(PullMessage&& other) noexcept
: filters_(std::exchange(other.filters_, nullptr)),
executor_(std::move(other.executor_)) {}
PullMessage& operator=(PullMessage&&) = delete;
Poll<NextMessage> operator()() {
if (executor_.IsRunning()) {
auto c = state().PollClosed();
if (c.ready() && c.value()) {
filters_->CancelDueToFailedPipeOperation();
return NextMessage(true);
}
return FinishOperationExecutor(executor_.Step(filters_->call_data_));
}
auto p = state().PollPull();
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
if (!r->ok()) {
filters_->CancelDueToFailedPipeOperation();
return NextMessage(true);
}
if (!**r) return NextMessage(false);
return FinishOperationExecutor(executor_.Start(
layout(), push()->TakeValue(), filters_->call_data_));
}
private:
filters_detail::PipeState& state() { return filters_->*state_ptr; }
Push* push() { return static_cast<Push*>(filters_->*push_ptr); }
const filters_detail::Layout<filters_detail::FallibleOperator<T>>*
layout() {
return &(filters_->stack_->data_.*layout_ptr);
}
Poll<NextMessage> FinishOperationExecutor(
Poll<filters_detail::ResultOr<T>> p) {
auto* r = p.value_if_ready();
if (r == nullptr) return Pending{};
GPR_DEBUG_ASSERT(!executor_.IsRunning());
state().AckPull();
if (r->ok != nullptr) return NextMessage(std::move(r->ok));
filters_->PushServerTrailingMetadata(std::move(r->error));
return NextMessage(true);
}
CallFilters* filters_;
filters_detail::OperationExecutor<T> executor_;
};
};
class PullClientInitialMetadataPromise {
@ -1400,7 +1611,12 @@ class CallFilters {
}
auto p = state().PollPull();
auto* r = p.value_if_ready();
gpr_log(GPR_INFO, "%s", r == nullptr ? "PENDING" : r->ToString().c_str());
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO, "%s",
r == nullptr
? "PENDING"
: (r->ok() ? (r->value() ? "TRUE" : "FALSE") : "FAILURE"));
}
if (r == nullptr) return Pending{};
if (!r->ok()) {
filters_->CancelDueToFailedPipeOperation();
@ -1450,11 +1666,39 @@ class CallFilters {
Poll<ServerMetadataHandle> operator()() {
if (executor_.IsRunning()) {
return executor_.Step(filters_->call_data_);
auto r = executor_.Step(filters_->call_data_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
if (r.pending()) {
gpr_log(GPR_INFO,
"%s PullServerTrailingMetadata[%p]: Pending(but executing)",
GetContext<Activity>()->DebugTag().c_str(), filters_);
} else {
gpr_log(GPR_INFO, "%s PullServerTrailingMetadata[%p]: Ready: %s",
GetContext<Activity>()->DebugTag().c_str(), filters_,
r.value()->DebugString().c_str());
}
}
return r;
}
if (filters_->server_trailing_metadata_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO,
"%s PullServerTrailingMetadata[%p]: Pending(not pushed)",
GetContext<Activity>()->DebugTag().c_str(), filters_);
}
return filters_->server_trailing_metadata_waiter_.pending();
}
// If no stack has been set, we can just return the result of the call
if (filters_->stack_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) {
gpr_log(GPR_INFO,
"%s PullServerTrailingMetadata[%p]: Ready(no-stack): %s",
GetContext<Activity>()->DebugTag().c_str(), filters_,
filters_->server_trailing_metadata_->DebugString().c_str());
}
return std::move(filters_->server_trailing_metadata_);
}
// Otherwise we need to process it through all the filters.
return executor_.Start(&filters_->stack_->data_.server_trailing_metadata,
std::move(filters_->server_trailing_metadata_),
filters_->call_data_);
@ -1465,7 +1709,7 @@ class CallFilters {
filters_detail::InfallibleOperationExecutor<ServerMetadataHandle> executor_;
};
void CancelDueToFailedPipeOperation();
void CancelDueToFailedPipeOperation(SourceLocation but_where = {});
RefCountedPtr<Stack> stack_;
@ -1475,6 +1719,7 @@ class CallFilters {
filters_detail::PipeState client_to_server_message_state_;
filters_detail::PipeState server_to_client_message_state_;
IntraActivityWaiter server_trailing_metadata_waiter_;
Latch<bool> cancelled_;
void* call_data_;
ClientMetadataHandle client_initial_metadata_;
@ -1516,7 +1761,7 @@ inline auto CallFilters::PushServerInitialMetadata(ServerMetadataHandle md) {
}
inline auto CallFilters::PullServerInitialMetadata() {
return ServerInitialMetadataPromises::Pull{this};
return ServerInitialMetadataPromises::PullMaybe{this};
}
inline auto CallFilters::PushClientToServerMessage(MessageHandle message) {
@ -1526,7 +1771,7 @@ inline auto CallFilters::PushClientToServerMessage(MessageHandle message) {
}
inline auto CallFilters::PullClientToServerMessage() {
return ClientToServerMessagePromises::Pull{this};
return ClientToServerMessagePromises::PullMessage{this};
}
inline auto CallFilters::PushServerToClientMessage(MessageHandle message) {
@ -1536,13 +1781,19 @@ inline auto CallFilters::PushServerToClientMessage(MessageHandle message) {
}
inline auto CallFilters::PullServerToClientMessage() {
return ServerToClientMessagePromises::Pull{this};
return ServerToClientMessagePromises::PullMessage{this};
}
inline auto CallFilters::PullServerTrailingMetadata() {
return PullServerTrailingMetadataPromise(this);
return Map(PullServerTrailingMetadataPromise(this),
[this](ServerMetadataHandle h) {
cancelled_.Set(h->get(GrpcCallWasCancelled()).value_or(false));
return h;
});
}
inline auto CallFilters::WasCancelled() { return cancelled_.Wait(); }
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H

@ -15,7 +15,6 @@
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include <limits.h>
#include <string.h>
@ -46,38 +45,11 @@
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/server_interceptor.h>
#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/surface/server.h"
#include "src/cpp/server/external_connection_acceptor_impl.h"
namespace grpc {
namespace {
// A PIMPL wrapper class that owns the only ref to the passive listener
// implementation. This is returned to the application.
class PassiveListenerOwner final
: public grpc_core::experimental::PassiveListener {
public:
explicit PassiveListenerOwner(std::shared_ptr<PassiveListener> listener)
: listener_(std::move(listener)) {}
absl::Status AcceptConnectedEndpoint(
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint) override {
return listener_->AcceptConnectedEndpoint(std::move(endpoint));
}
absl::Status AcceptConnectedFd(int fd) override {
return listener_->AcceptConnectedFd(fd);
}
private:
std::shared_ptr<PassiveListener> listener_;
};
} // namespace
static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
g_plugin_factory_list;
@ -251,18 +223,6 @@ ServerBuilder& ServerBuilder::SetResourceQuota(
return *this;
}
ServerBuilder& ServerBuilder::experimental_type::AddPassiveListener(
std::shared_ptr<grpc::ServerCredentials> creds,
std::unique_ptr<experimental::PassiveListener>& passive_listener) {
auto core_passive_listener =
std::make_shared<grpc_core::experimental::PassiveListenerImpl>();
builder_->unstarted_passive_listeners_.emplace_back(core_passive_listener,
std::move(creds));
passive_listener =
std::make_unique<PassiveListenerOwner>(std::move(core_passive_listener));
return *builder_;
}
ServerBuilder& ServerBuilder::AddListeningPort(
const std::string& addr_uri, std::shared_ptr<ServerCredentials> creds,
int* selected_port) {
@ -436,26 +396,6 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
cq->RegisterServer(server.get());
}
for (auto& unstarted_listener : unstarted_passive_listeners_) {
has_frequently_polled_cqs = true;
auto passive_listener = unstarted_listener.passive_listener.lock();
auto* core_server = grpc_core::Server::FromC(server->c_server());
if (passive_listener != nullptr) {
auto* creds = unstarted_listener.credentials->c_creds();
if (creds == nullptr) {
gpr_log(GPR_ERROR, "Credentials missing for PassiveListener");
return nullptr;
}
auto success = grpc_server_add_passive_listener(
core_server, creds, std::move(passive_listener));
if (!success.ok()) {
gpr_log(GPR_ERROR, "Failed to create a passive listener: %s",
success.ToString().c_str());
return nullptr;
}
}
}
if (!has_frequently_polled_cqs) {
gpr_log(GPR_ERROR,
"At least one of the completion queues must be frequently polled");

@ -14,8 +14,6 @@
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include <errno.h>
@ -27,6 +25,8 @@
#include "absl/strings/str_cat.h"
#include "gtest/gtest.h"
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/resolved_address.h"
#ifdef GRPC_HAVE_UNIX_SOCKET

@ -14,14 +14,13 @@
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/types/optional.h"
#include "gtest/gtest.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/port_platform.h>
#include "src/core/client_channel/client_channel_filter.h"
#include "src/core/client_channel/subchannel_pool_interface.h"

@ -17,8 +17,6 @@
#ifndef GRPC_TEST_CORE_CLIENT_CHANNEL_LB_POLICY_LB_POLICY_TEST_LIB_H
#define GRPC_TEST_CORE_CLIENT_CHANNEL_LB_POLICY_LB_POLICY_TEST_LIB_H
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <stddef.h>
@ -53,6 +51,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/client_channel/client_channel_internal.h"
#include "src/core/client_channel/subchannel_interface_internal.h"

@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <memory>
#include "gtest/gtest.h"
#include <grpc/support/port_platform.h>
// Make a template argument to test which bit pattern remains in A's destructor
// to try and detect similar bugs in non-MSAN builds (none have been detected
// yet thankfully)

@ -256,12 +256,14 @@ grpc_cc_library(
srcs = [
"end2end_test_fuzzer.cc",
],
hdrs = [
"end2end_test_fuzzer.h",
],
external_deps = [
"absl/functional:any_invocable",
"absl/strings",
"absl/types:optional",
"gtest",
"libprotobuf_mutator",
],
tags = ["nofixdeps"],
deps = [

@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/end2end/end2end_test_fuzzer.h"
#include <stdio.h>
#include <algorithm>
@ -24,11 +26,6 @@
#include <gtest/gtest.h>
#include "absl/functional/any_invocable.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
@ -39,8 +36,6 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/end2end/end2end_test_fuzzer.pb.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/end2end/fixtures/h2_tls_common.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
@ -53,23 +48,23 @@ using ::grpc_event_engine::experimental::GetDefaultEventEngine;
bool squelch = true;
static void dont_log(gpr_log_func_args* /*args*/) {}
DEFINE_PROTO_FUZZER(const core_end2end_test_fuzzer::Msg& msg) {
namespace grpc_core {
void RunEnd2endFuzzer(const core_end2end_test_fuzzer::Msg& msg) {
struct Test {
std::string name;
absl::AnyInvocable<std::unique_ptr<grpc_core::CoreEnd2endTest>() const>
factory;
absl::AnyInvocable<std::unique_ptr<CoreEnd2endTest>() const> factory;
};
static const auto only_suite = grpc_core::GetEnv("GRPC_TEST_FUZZER_SUITE");
static const auto only_test = grpc_core::GetEnv("GRPC_TEST_FUZZER_TEST");
static const auto only_config = grpc_core::GetEnv("GRPC_TEST_FUZZER_CONFIG");
static const auto only_suite = GetEnv("GRPC_TEST_FUZZER_SUITE");
static const auto only_test = GetEnv("GRPC_TEST_FUZZER_TEST");
static const auto only_config = GetEnv("GRPC_TEST_FUZZER_CONFIG");
static const auto all_tests =
grpc_core::CoreEnd2endTestRegistry::Get().AllTests();
static const auto all_tests = CoreEnd2endTestRegistry::Get().AllTests();
static const auto tests = []() {
grpc_core::g_is_fuzzing_core_e2e_tests = true;
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);
g_is_fuzzing_core_e2e_tests = true;
ForceEnableExperiment("event_engine_client", true);
ForceEnableExperiment("event_engine_listener", true);
std::vector<Test> tests;
for (const auto& test : all_tests) {
@ -81,9 +76,8 @@ DEFINE_PROTO_FUZZER(const core_end2end_test_fuzzer::Msg& msg) {
}
std::string test_name =
absl::StrCat(test.suite, ".", test.name, "/", test.config->name);
tests.emplace_back(
Test{std::move(test_name), [&test]() {
return std::unique_ptr<grpc_core::CoreEnd2endTest>(
tests.emplace_back(Test{std::move(test_name), [&test]() {
return std::unique_ptr<CoreEnd2endTest>(
test.make_test(test.config));
}});
}
@ -95,16 +89,16 @@ DEFINE_PROTO_FUZZER(const core_end2end_test_fuzzer::Msg& msg) {
const int test_id = msg.test_id() % tests.size();
if (squelch && !grpc_core::GetEnv("GRPC_TRACE_FUZZER").has_value()) {
if (squelch && !GetEnv("GRPC_TRACE_FUZZER").has_value()) {
gpr_set_log_function(dont_log);
}
// TODO(ctiller): make this per fixture?
grpc_core::ConfigVars::Overrides overrides =
grpc_core::OverridesFromFuzzConfigVars(msg.config_vars());
ConfigVars::Overrides overrides =
OverridesFromFuzzConfigVars(msg.config_vars());
overrides.default_ssl_roots_file_path = CA_CERT_PATH;
grpc_core::ConfigVars::SetOverrides(overrides);
grpc_core::TestOnlyReloadExperimentsFromConfigVariables();
ConfigVars::SetOverrides(overrides);
TestOnlyReloadExperimentsFromConfigVariables();
grpc_event_engine::experimental::SetEventEngineFactory(
[actions = msg.event_engine_actions()]() {
FuzzingEventEngine::Options options;
@ -126,18 +120,20 @@ DEFINE_PROTO_FUZZER(const core_end2end_test_fuzzer::Msg& msg) {
test->SetCqVerifierStepFn(
[engine = std::move(engine)](
grpc_event_engine::experimental::EventEngine::Duration max_step) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
engine->Tick(max_step);
grpc_timer_manager_tick();
});
test->SetPostGrpcInitFunc([]() {
grpc_timer_manager_set_threading(false);
grpc_core::ExecCtx exec_ctx;
grpc_core::Executor::SetThreadingAll(false);
ExecCtx exec_ctx;
Executor::SetThreadingAll(false);
});
test->SetUp();
test->RunTest();
test->TearDown();
GPR_ASSERT(!::testing::Test::HasFailure());
}
} // namespace grpc_core

@ -1,4 +1,4 @@
// Copyright 2024 The gRPC Authors
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -11,17 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPCPP_PASSIVE_LISTENER_H
#define GRPCPP_PASSIVE_LISTENER_H
#include <grpc/passive_listener.h>
#ifndef GRPC_TEST_CORE_END2END_END2END_TEST_FUZZER_H
#define GRPC_TEST_CORE_END2END_END2END_TEST_FUZZER_H
namespace grpc {
namespace experimental {
#include "test/core/end2end/end2end_test_fuzzer.pb.h"
using grpc_core::experimental::PassiveListener;
namespace grpc_core {
void RunEnd2endFuzzer(const core_end2end_test_fuzzer::Msg& msg);
}
} // namespace experimental
} // namespace grpc
#endif // GRPCPP_PASSIVE_LISTENER_H
#endif

@ -0,0 +1,20 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/libfuzzer/libfuzzer_macro.h"
#include "test/core/end2end/end2end_test_fuzzer.h"
DEFINE_PROTO_FUZZER(const core_end2end_test_fuzzer::Msg& msg) {
grpc_core::RunEnd2endFuzzer(msg);
}

@ -114,7 +114,9 @@ def grpc_core_end2end_test(name, shard_count = 10, tags = []):
grpc_proto_fuzzer(
name = "%s_fuzzer" % name,
srcs = [],
srcs = [
"end2end_test_fuzzer_main.cc",
],
corpus = "end2end_test_corpus/%s" % name,
data = END2END_TEST_DATA,
external_deps = [
@ -135,7 +137,7 @@ def grpc_core_end2end_test(name, shard_count = 10, tags = []):
uses_event_engine = False,
uses_polling = False,
deps = [
"end2end_test_fuzzer",
"%s_library" % name,
"end2end_test_fuzzer",
],
)

@ -16,8 +16,6 @@
//
//
#include <grpc/support/port_platform.h>
#include <limits.h>
#include <string.h>
@ -32,6 +30,7 @@
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/host_port.h"

@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/common_closures.h"
#include <memory>
@ -21,6 +19,8 @@
#include "absl/functional/any_invocable.h"
#include "gtest/gtest.h"
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/notification.h"
using ::grpc_event_engine::experimental::AnyInvocableClosure;

@ -11,8 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <algorithm>
#include <memory>
#include <thread>
@ -29,6 +27,7 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "test/core/util/test_config.h"

@ -11,13 +11,13 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/types/optional.h"
#include "gtest/gtest.h"
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"

@ -19,7 +19,6 @@
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>
@ -160,70 +159,6 @@ class NotifyOnDelete {
grpc_core::Notification* signal_;
};
// An endpoint implementation that supports Read and Write via std::threads.
// Passing a grpc_core::Notification will allow owners to know when all
// in-flight callbacks have been run, and all endpoint state has been destroyed.
class ThreadedNoopEndpoint : public EventEngine::Endpoint {
public:
explicit ThreadedNoopEndpoint(grpc_core::Notification* destroyed)
: state_(std::make_shared<EndpointState>(destroyed)) {}
~ThreadedNoopEndpoint() override {
std::thread deleter([state = state_]() {
CleanupThread(state->read);
CleanupThread(state->write);
});
deleter.detach();
}
bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* /* args */) override {
buffer->Clear();
CleanupThread(state_->read);
state_->read = new std::thread([cb = std::move(on_read)]() mutable {
cb(absl::UnknownError("test"));
});
return false;
}
bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* /* args */) override {
data->Clear();
CleanupThread(state_->write);
state_->write = new std::thread([cb = std::move(on_writable)]() mutable {
cb(absl::UnknownError("test"));
});
return false;
}
const EventEngine::ResolvedAddress& GetPeerAddress() const override {
return peer_;
}
const EventEngine::ResolvedAddress& GetLocalAddress() const override {
return local_;
}
private:
struct EndpointState {
explicit EndpointState(grpc_core::Notification* deleter)
: delete_notifier_(deleter) {}
std::thread* read = nullptr;
std::thread* write = nullptr;
NotifyOnDelete delete_notifier_;
};
static void CleanupThread(std::thread* thd) {
if (thd != nullptr) {
thd->join();
delete thd;
}
}
std::shared_ptr<EndpointState> state_;
EventEngine::ResolvedAddress peer_;
EventEngine::ResolvedAddress local_;
};
} // namespace experimental
} // namespace grpc_event_engine

@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <memory>
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "test/core/event_engine/util/aborting_event_engine.h"

@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/forkable.h"
#include <grpc/support/port_platform.h>
#ifdef GPR_POSIX_SUBPROCESS
#include <errno.h>
#include <stdlib.h>

@ -14,13 +14,13 @@
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "absl/synchronization/notification.h"
#include "gtest/gtest.h"
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/time.h"
using ::grpc_event_engine::experimental::FuzzingEventEngine;

@ -11,13 +11,12 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <memory>
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/port_platform.h>
using ::grpc_event_engine::experimental::EventEngine;

@ -11,8 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/query_extensions.h"
#include <string>
@ -23,6 +21,7 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/crash.h"

@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <string.h>
#include <memory>
@ -26,6 +24,7 @@
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
using ::grpc_event_engine::experimental::Slice;
using ::grpc_event_engine::experimental::SliceBuffer;

@ -11,8 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <memory>
#include "gmock/gmock.h"
@ -20,6 +18,7 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/port_platform.h>
#include "test/core/util/test_config.h"

@ -11,12 +11,12 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include <errno.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h" // IWYU pragma: keep
#ifdef GRPC_HAVE_VSOCK

@ -11,14 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "test/core/event_engine/test_init.h"
#include <string>
#include "absl/strings/str_cat.h"
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {

@ -13,11 +13,11 @@
// limitations under the License.
#ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_INIT_H
#define GRPC_TEST_CORE_EVENT_ENGINE_TEST_INIT_H
#include <grpc/support/port_platform.h>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save