Merge remote-tracking branch 'upstream/master' into stats_api

pull/26714/head
Mark D. Roth 4 years ago
commit c108c5004c
  1. 5
      BUILD
  2. 3
      BUILD.gn
  3. 6
      CMakeLists.txt
  4. 4
      Makefile
  5. 8
      build_autogenerated.yaml
  6. 2
      config.m4
  7. 2
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 5
      gRPC-Core.podspec
  10. 3
      grpc.gemspec
  11. 6
      grpc.gyp
  12. 24
      include/grpc/event_engine/event_engine.h
  13. 41
      include/grpc/event_engine/slice_allocator.h
  14. 3
      package.xml
  15. 2
      src/compiler/cpp_generator.cc
  16. 64
      src/core/ext/filters/client_channel/retry_filter.cc
  17. 53
      src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
  18. 74
      src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h
  19. 14
      src/core/lib/iomgr/event_engine/endpoint.cc
  20. 40
      src/core/lib/iomgr/event_engine/tcp.cc
  21. 2
      src/core/lib/iomgr/resource_quota.cc
  22. 2
      src/python/grpcio/grpc_core_dependencies.py
  23. 1
      templates/test/core/end2end/end2end_defs.include
  24. 9
      test/core/end2end/end2end_nosec_tests.cc
  25. 9
      test/core/end2end/end2end_tests.cc
  26. 6
      test/core/end2end/generate_tests.bzl
  27. 263
      test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
  28. 3
      tools/doxygen/Doxyfile.c++.internal
  29. 3
      tools/doxygen/Doxyfile.core.internal
  30. 2
      tools/internal_ci/macos/grpc_build_artifacts.cfg
  31. 21
      tools/run_tests/xds_k8s_test_driver/framework/xds_url_map_test_resources.py

@ -855,7 +855,6 @@ grpc_cc_library(
"src/core/lib/debug/stats_data.cc",
"src/core/lib/event_engine/endpoint_config.cc",
"src/core/lib/event_engine/event_engine.cc",
"src/core/lib/event_engine/slice_allocator.cc",
"src/core/lib/event_engine/sockaddr.cc",
"src/core/lib/http/format_request.cc",
"src/core/lib/http/httpcli.cc",
@ -2447,6 +2446,7 @@ grpc_cc_library(
"src/core/ext/transport/chttp2/transport/bin_decoder.cc",
"src/core/ext/transport/chttp2/transport/bin_encoder.cc",
"src/core/ext/transport/chttp2/transport/chttp2_plugin.cc",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc",
"src/core/ext/transport/chttp2/transport/chttp2_transport.cc",
"src/core/ext/transport/chttp2/transport/context_list.cc",
"src/core/ext/transport/chttp2/transport/flow_control.cc",
@ -2471,6 +2471,7 @@ grpc_cc_library(
hdrs = [
"src/core/ext/transport/chttp2/transport/bin_decoder.h",
"src/core/ext/transport/chttp2/transport/bin_encoder.h",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/context_list.h",
"src/core/ext/transport/chttp2/transport/flow_control.h",
@ -2492,6 +2493,8 @@ grpc_cc_library(
"src/core/ext/transport/chttp2/transport/varint.h",
],
external_deps = [
"absl/memory",
"absl/status",
"absl/strings:str_format",
"absl/strings",
],

@ -397,6 +397,8 @@ config("grpc_config") {
"src/core/ext/transport/chttp2/transport/bin_encoder.cc",
"src/core/ext/transport/chttp2/transport/bin_encoder.h",
"src/core/ext/transport/chttp2/transport/chttp2_plugin.cc",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc",
"src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h",
"src/core/ext/transport/chttp2/transport/chttp2_transport.cc",
"src/core/ext/transport/chttp2/transport/chttp2_transport.h",
"src/core/ext/transport/chttp2/transport/context_list.cc",
@ -868,7 +870,6 @@ config("grpc_config") {
"src/core/lib/event_engine/endpoint_config.cc",
"src/core/lib/event_engine/endpoint_config_internal.h",
"src/core/lib/event_engine/event_engine.cc",
"src/core/lib/event_engine/slice_allocator.cc",
"src/core/lib/event_engine/sockaddr.cc",
"src/core/lib/event_engine/sockaddr.h",
"src/core/lib/gprpp/atomic.h",

@ -1132,6 +1132,7 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/retry_non_retriable_status.cc
test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
test/core/end2end/tests/retry_recv_initial_metadata.cc
test/core/end2end/tests/retry_recv_message.cc
test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
@ -1270,6 +1271,7 @@ add_library(end2end_tests
test/core/end2end/tests/retry_non_retriable_status.cc
test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
test/core/end2end/tests/retry_recv_initial_metadata.cc
test/core/end2end/tests/retry_recv_message.cc
test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
@ -1597,6 +1599,7 @@ add_library(grpc
src/core/ext/transport/chttp2/transport/bin_decoder.cc
src/core/ext/transport/chttp2/transport/bin_encoder.cc
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
src/core/ext/transport/chttp2/transport/chttp2_transport.cc
src/core/ext/transport/chttp2/transport/context_list.cc
src/core/ext/transport/chttp2/transport/flow_control.cc
@ -1832,7 +1835,6 @@ add_library(grpc
src/core/lib/debug/trace.cc
src/core/lib/event_engine/endpoint_config.cc
src/core/lib/event_engine/event_engine.cc
src/core/lib/event_engine/slice_allocator.cc
src/core/lib/event_engine/sockaddr.cc
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
@ -2417,6 +2419,7 @@ add_library(grpc_unsecure
src/core/ext/transport/chttp2/transport/bin_decoder.cc
src/core/ext/transport/chttp2/transport/bin_encoder.cc
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
src/core/ext/transport/chttp2/transport/chttp2_transport.cc
src/core/ext/transport/chttp2/transport/context_list.cc
src/core/ext/transport/chttp2/transport/flow_control.cc
@ -2469,7 +2472,6 @@ add_library(grpc_unsecure
src/core/lib/debug/trace.cc
src/core/lib/event_engine/endpoint_config.cc
src/core/lib/event_engine/event_engine.cc
src/core/lib/event_engine/slice_allocator.cc
src/core/lib/event_engine/sockaddr.cc
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc

@ -1136,6 +1136,7 @@ LIBGRPC_SRC = \
src/core/ext/transport/chttp2/transport/bin_decoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/context_list.cc \
src/core/ext/transport/chttp2/transport/flow_control.cc \
@ -1371,7 +1372,6 @@ LIBGRPC_SRC = \
src/core/lib/debug/trace.cc \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
@ -1807,6 +1807,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/chttp2/transport/bin_decoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/context_list.cc \
src/core/ext/transport/chttp2/transport/flow_control.cc \
@ -1859,7 +1860,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/debug/trace.cc \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \

@ -94,6 +94,7 @@ libs:
- test/core/end2end/tests/retry_non_retriable_status.cc
- test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
- test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
- test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
- test/core/end2end/tests/retry_recv_initial_metadata.cc
- test/core/end2end/tests/retry_recv_message.cc
- test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
@ -207,6 +208,7 @@ libs:
- test/core/end2end/tests/retry_non_retriable_status.cc
- test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
- test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
- test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
- test/core/end2end/tests/retry_recv_initial_metadata.cc
- test/core/end2end/tests/retry_recv_message.cc
- test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
@ -487,6 +489,7 @@ libs:
- src/core/ext/transport/chttp2/server/chttp2_server.h
- src/core/ext/transport/chttp2/transport/bin_decoder.h
- src/core/ext/transport/chttp2/transport/bin_encoder.h
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h
- src/core/ext/transport/chttp2/transport/chttp2_transport.h
- src/core/ext/transport/chttp2/transport/context_list.h
- src/core/ext/transport/chttp2/transport/flow_control.h
@ -1014,6 +1017,7 @@ libs:
- src/core/ext/transport/chttp2/transport/bin_decoder.cc
- src/core/ext/transport/chttp2/transport/bin_encoder.cc
- src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
- src/core/ext/transport/chttp2/transport/chttp2_transport.cc
- src/core/ext/transport/chttp2/transport/context_list.cc
- src/core/ext/transport/chttp2/transport/flow_control.cc
@ -1249,7 +1253,6 @@ libs:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/endpoint_config.cc
- src/core/lib/event_engine/event_engine.cc
- src/core/lib/event_engine/slice_allocator.cc
- src/core/lib/event_engine/sockaddr.cc
- src/core/lib/http/format_request.cc
- src/core/lib/http/httpcli.cc
@ -1697,6 +1700,7 @@ libs:
- src/core/ext/transport/chttp2/server/chttp2_server.h
- src/core/ext/transport/chttp2/transport/bin_decoder.h
- src/core/ext/transport/chttp2/transport/bin_encoder.h
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h
- src/core/ext/transport/chttp2/transport/chttp2_transport.h
- src/core/ext/transport/chttp2/transport/context_list.h
- src/core/ext/transport/chttp2/transport/flow_control.h
@ -1958,6 +1962,7 @@ libs:
- src/core/ext/transport/chttp2/transport/bin_decoder.cc
- src/core/ext/transport/chttp2/transport/bin_encoder.cc
- src/core/ext/transport/chttp2/transport/chttp2_plugin.cc
- src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc
- src/core/ext/transport/chttp2/transport/chttp2_transport.cc
- src/core/ext/transport/chttp2/transport/context_list.cc
- src/core/ext/transport/chttp2/transport/flow_control.cc
@ -2010,7 +2015,6 @@ libs:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/endpoint_config.cc
- src/core/lib/event_engine/event_engine.cc
- src/core/lib/event_engine/slice_allocator.cc
- src/core/lib/event_engine/sockaddr.cc
- src/core/lib/http/format_request.cc
- src/core/lib/http/httpcli.cc

@ -130,6 +130,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/transport/chttp2/transport/bin_decoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/context_list.cc \
src/core/ext/transport/chttp2/transport/flow_control.cc \
@ -378,7 +379,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/debug/trace.cc \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/gpr/alloc.cc \
src/core/lib/gpr/atm.cc \

@ -96,6 +96,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\transport\\chttp2\\transport\\bin_decoder.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\bin_encoder.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_plugin.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_slice_allocator.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\chttp2_transport.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\context_list.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\flow_control.cc " +
@ -344,7 +345,6 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\debug\\trace.cc " +
"src\\core\\lib\\event_engine\\endpoint_config.cc " +
"src\\core\\lib\\event_engine\\event_engine.cc " +
"src\\core\\lib\\event_engine\\slice_allocator.cc " +
"src\\core\\lib\\event_engine\\sockaddr.cc " +
"src\\core\\lib\\gpr\\alloc.cc " +
"src\\core\\lib\\gpr\\atm.cc " +

@ -269,6 +269,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',
@ -922,6 +923,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',

@ -350,6 +350,8 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.cc',
@ -847,7 +849,6 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/endpoint_config.cc',
'src/core/lib/event_engine/endpoint_config_internal.h',
'src/core/lib/event_engine/event_engine.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/event_engine/sockaddr.h',
'src/core/lib/gpr/alloc.cc',
@ -1502,6 +1503,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/server/chttp2_server.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
'src/core/ext/transport/chttp2/transport/bin_encoder.h',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h',
'src/core/ext/transport/chttp2/transport/chttp2_transport.h',
'src/core/ext/transport/chttp2/transport/context_list.h',
'src/core/ext/transport/chttp2/transport/flow_control.h',
@ -2139,6 +2141,7 @@ Pod::Spec.new do |s|
'test/core/end2end/tests/retry_non_retriable_status.cc',
'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc',
'test/core/end2end/tests/retry_recv_initial_metadata.cc',
'test/core/end2end/tests/retry_recv_message.cc',
'test/core/end2end/tests/retry_recv_trailing_metadata_error.cc',

@ -265,6 +265,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/bin_encoder.h )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_plugin.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/chttp2_transport.h )
s.files += %w( src/core/ext/transport/chttp2/transport/context_list.cc )
@ -762,7 +764,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/endpoint_config.cc )
s.files += %w( src/core/lib/event_engine/endpoint_config_internal.h )
s.files += %w( src/core/lib/event_engine/event_engine.cc )
s.files += %w( src/core/lib/event_engine/slice_allocator.cc )
s.files += %w( src/core/lib/event_engine/sockaddr.cc )
s.files += %w( src/core/lib/event_engine/sockaddr.h )
s.files += %w( src/core/lib/gpr/alloc.cc )

@ -248,6 +248,7 @@
'test/core/end2end/tests/retry_non_retriable_status.cc',
'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc',
'test/core/end2end/tests/retry_recv_initial_metadata.cc',
'test/core/end2end/tests/retry_recv_message.cc',
'test/core/end2end/tests/retry_recv_trailing_metadata_error.cc',
@ -354,6 +355,7 @@
'test/core/end2end/tests/retry_non_retriable_status.cc',
'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout.cc',
'test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc',
'test/core/end2end/tests/retry_recv_initial_metadata.cc',
'test/core/end2end/tests/retry_recv_message.cc',
'test/core/end2end/tests/retry_recv_trailing_metadata_error.cc',
@ -561,6 +563,7 @@
'src/core/ext/transport/chttp2/transport/bin_decoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/context_list.cc',
'src/core/ext/transport/chttp2/transport/flow_control.cc',
@ -796,7 +799,6 @@
'src/core/lib/debug/trace.cc',
'src/core/lib/event_engine/endpoint_config.cc',
'src/core/lib/event_engine/event_engine.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
@ -1204,6 +1206,7 @@
'src/core/ext/transport/chttp2/transport/bin_decoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/context_list.cc',
'src/core/ext/transport/chttp2/transport/flow_control.cc',
@ -1256,7 +1259,6 @@
'src/core/lib/debug/trace.cc',
'src/core/lib/event_engine/endpoint_config.cc',
'src/core/lib/event_engine/event_engine.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',

@ -80,7 +80,7 @@ class EventEngine {
using Callback = std::function<void(absl::Status)>;
/// A callback handle, used to cancel a callback.
struct TaskHandle {
intptr_t key;
intptr_t keys[2];
};
/// A thin wrapper around a platform-specific sockaddr type. A sockaddr struct
/// exists on all platforms that gRPC supports.
@ -127,10 +127,8 @@ class EventEngine {
///
/// For failed read operations, implementations should pass the appropriate
/// statuses to \a on_read. For example, callbacks might expect to receive
/// DEADLINE_EXCEEDED when the deadline is exceeded, and CANCELLED on
/// endpoint shutdown.
virtual void Read(Callback on_read, SliceBuffer* buffer,
absl::Time deadline) = 0;
/// CANCELLED on endpoint shutdown.
virtual void Read(Callback on_read, SliceBuffer* buffer) = 0;
/// Write data out on the connection.
///
/// \a on_writable is called when the connection is ready for more data. The
@ -140,15 +138,13 @@ class EventEngine {
///
/// For failed write operations, implementations should pass the appropriate
/// statuses to \a on_writable. For example, callbacks might expect to
/// receive DEADLINE_EXCEEDED when the deadline is exceeded, and CANCELLED
/// on endpoint shutdown.
virtual void Write(Callback on_writable, SliceBuffer* data,
absl::Time deadline) = 0;
/// receive CANCELLED on endpoint shutdown.
virtual void Write(Callback on_writable, SliceBuffer* data) = 0;
/// These methods return an address in the format described in DNSResolver.
/// The returned values are owned by the Endpoint and are expected to remain
/// valid for the life of the Endpoint.
virtual const ResolvedAddress* GetPeerAddress() const = 0;
virtual const ResolvedAddress* GetLocalAddress() const = 0;
virtual const ResolvedAddress& GetPeerAddress() const = 0;
virtual const ResolvedAddress& GetLocalAddress() const = 0;
};
/// Called when a new connection is established.
@ -197,7 +193,7 @@ class EventEngine {
virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept, Callback on_shutdown,
const EndpointConfig& args,
SliceAllocatorFactory slice_allocator_factory) = 0;
std::unique_ptr<SliceAllocatorFactory> slice_allocator_factory) = 0;
/// Creates a client network connection to a remote network listener.
///
/// \a Connect may return an error status immediately if there was a failure
@ -214,7 +210,7 @@ class EventEngine {
virtual absl::Status Connect(OnConnectCallback on_connect,
const ResolvedAddress& addr,
const EndpointConfig& args,
SliceAllocator slice_allocator,
std::unique_ptr<SliceAllocator> slice_allocator,
absl::Time deadline) = 0;
/// The DNSResolver that provides asynchronous resolution.
@ -222,7 +218,7 @@ class EventEngine {
public:
/// A task handle for DNS Resolution requests.
struct LookupTaskHandle {
intptr_t key;
intptr_t key[2];
};
/// A DNS SRV record type.
struct SRVRecord {

@ -37,18 +37,8 @@ class SliceBuffer {
class SliceAllocator {
public:
// gRPC-internal constructor
explicit SliceAllocator(grpc_resource_user* user);
// Not copyable
SliceAllocator(SliceAllocator& other) = delete;
SliceAllocator& operator=(const SliceAllocator& other) = delete;
// Moveable
SliceAllocator(SliceAllocator&& other) noexcept;
SliceAllocator& operator=(SliceAllocator&& other) noexcept;
~SliceAllocator();
using AllocateCallback =
std::function<void(absl::Status, SliceBuffer* buffer)>;
using AllocateCallback = std::function<void(absl::Status)>;
virtual ~SliceAllocator() = default;
/// Requests \a size bytes from gRPC, and populates \a dest with the allocated
/// slices. Ownership of the \a SliceBuffer is not transferred.
///
@ -57,32 +47,17 @@ class SliceAllocator {
/// interrupted to attempt to reclaim memory from participating gRPC
/// internals. When there is sufficient memory available, slice allocation
/// proceeds as normal.
absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb);
private:
grpc_resource_user* resource_user_;
virtual absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) = 0;
};
class SliceAllocatorFactory {
public:
// gRPC-internal constructor
explicit SliceAllocatorFactory(grpc_resource_quota* quota);
// Not copyable
SliceAllocatorFactory(SliceAllocatorFactory& other) = delete;
SliceAllocatorFactory& operator=(const SliceAllocatorFactory& other) = delete;
// Moveable
SliceAllocatorFactory(SliceAllocatorFactory&& other) noexcept;
SliceAllocatorFactory& operator=(SliceAllocatorFactory&& other) noexcept;
~SliceAllocatorFactory();
virtual ~SliceAllocatorFactory() = default;
/// On Endpoint creation, call \a CreateSliceAllocator with the name of the
/// endpoint peer (a URI string, most likely). Note: \a peer_name must outlive
/// the Endpoint.
SliceAllocator CreateSliceAllocator(absl::string_view peer_name);
private:
grpc_resource_quota* resource_quota_;
/// endpoint peer (a URI string, most likely).
virtual std::unique_ptr<SliceAllocator> CreateSliceAllocator(
absl::string_view peer_name) = 0;
};
} // namespace experimental

@ -245,6 +245,8 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/bin_encoder.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/chttp2_transport.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/context_list.cc" role="src" />
@ -742,7 +744,6 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/endpoint_config.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/endpoint_config_internal.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/event_engine.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/sockaddr.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/sockaddr.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/alloc.cc" role="src" />

@ -1365,7 +1365,7 @@ void PrintHeaderService(grpc_generator::Printer* printer,
}
PrintHeaderClientMethodCallbackInterfacesEnd(printer, vars);
printer->Outdent();
printer->Print("private:\n");
printer->Print(" private:\n");
printer->Indent();
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethodInterfaces(printer, service->method(i).get(), vars,

@ -257,7 +257,7 @@ class RetryFilter::CallData {
// Adds retriable recv_trailing_metadata op.
void AddRetriableRecvTrailingMetadataOp();
// Adds cancel_stream op.
void AddCancelStreamOp();
void AddCancelStreamOp(const char* reason);
private:
// Frees cached send ops that were completed by the completed batch in
@ -390,7 +390,7 @@ class RetryFilter::CallData {
// Cancels the call attempt. Unrefs any deferred batches.
// Adds a batch to closures to cancel this call attempt.
void Cancel(CallCombinerClosureList* closures);
void Cancel(const char* reason, CallCombinerClosureList* closures);
static void OnPerAttemptRecvTimer(void* arg, grpc_error_handle error);
static void OnPerAttemptRecvTimerLocked(void* arg, grpc_error_handle error);
@ -459,7 +459,7 @@ class RetryFilter::CallData {
// save space but will also result in a data race because compiler
// will generate a 2 byte store which overwrites the meta-data
// fields upon setting this field.
bool cancelled_ : 1;
bool abandoned_ : 1;
};
CallData(RetryFilter* chand, const grpc_call_element_args& args);
@ -654,7 +654,7 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
started_recv_trailing_metadata_(false),
completed_recv_trailing_metadata_(false),
seen_recv_trailing_metadata_from_surface_(false),
cancelled_(false) {
abandoned_(false) {
lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: create lb_call=%p",
@ -742,8 +742,8 @@ void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() {
// If we're not yet committed, we can't switch yet.
// TODO(roth): As part of implementing hedging, this logic needs to
// check that *this* call attempt is the one that we've committed to.
// Might need to replace cancelled_ with an enum indicating whether we're
// in flight, cancelled, or the winning call attempt.
// Might need to replace abandoned_ with an enum indicating whether we're
// in flight, abandoned, or the winning call attempt.
if (!calld_->retry_committed_) return;
// If we've already switched to fast path, there's nothing to do here.
if (calld_->committed_call_ != nullptr) return;
@ -1131,9 +1131,7 @@ bool RetryFilter::CallData::CallAttempt::ShouldRetry(
}
void RetryFilter::CallData::CallAttempt::Cancel(
CallCombinerClosureList* closures) {
// Record that this attempt has been cancelled.
cancelled_ = true;
const char* reason, CallCombinerClosureList* closures) {
// Unref batches for deferred completion callbacks that will now never
// be invoked.
if (started_recv_trailing_metadata_ &&
@ -1162,7 +1160,7 @@ void RetryFilter::CallData::CallAttempt::Cancel(
// transport knows that this call should be cleaned up, even if it
// hasn't received any ops.
BatchData* cancel_batch_data = CreateBatch(1, /*set_on_complete=*/true);
cancel_batch_data->AddCancelStreamOp();
cancel_batch_data->AddCancelStreamOp(reason);
AddClosureForBatch(cancel_batch_data->batch(),
"start cancellation batch on call attempt", closures);
}
@ -1196,11 +1194,13 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
// Cancel this attempt.
// TODO(roth): When implementing hedging, we should not cancel the
// current attempt.
call_attempt->Cancel(&closures);
call_attempt->Cancel("retry perAttemptRecvTimeout exceeded", &closures);
// Check whether we should retry.
if (call_attempt->ShouldRetry(
/*status=*/absl::nullopt, /*is_lb_drop=*/false,
/*server_pushback_md=*/nullptr, /*server_pushback_ms=*/nullptr)) {
// Mark current attempt as abandoned.
call_attempt->abandoned_ = true;
// We are retrying. Start backoff timer.
calld->StartRetryTimer(/*server_pushback_ms=*/-1);
} else {
@ -1350,11 +1350,12 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
grpc_error_std_string(error).c_str());
}
call_attempt->completed_recv_initial_metadata_ = true;
// If this attempt has been cancelled, then we're not going to use the
// If this attempt has been abandoned, then we're not going to use the
// result of this recv_initial_metadata op, so do nothing.
if (call_attempt->cancelled_) {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"recv_initial_metadata_ready after cancellation");
if (call_attempt->abandoned_) {
GRPC_CALL_COMBINER_STOP(
calld->call_combiner_,
"recv_initial_metadata_ready for abandoned attempt");
return;
}
// Cancel per-attempt recv timer, if any.
@ -1443,11 +1444,11 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
grpc_error_std_string(error).c_str());
}
++call_attempt->completed_recv_message_count_;
// If this attempt has been cancelled, then we're not going to use the
// If this attempt has been abandoned, then we're not going to use the
// result of this recv_message op, so do nothing.
if (call_attempt->cancelled_) {
if (call_attempt->abandoned_) {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"recv_message_ready after cancellation");
"recv_message_ready for abandoned attempt");
return;
}
// Cancel per-attempt recv timer, if any.
@ -1641,11 +1642,12 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
grpc_error_std_string(error).c_str());
}
call_attempt->completed_recv_trailing_metadata_ = true;
// If this attempt has been cancelled, then we're not going to use the
// If this attempt has been abandoned, then we're not going to use the
// result of this recv_trailing_metadata op, so do nothing.
if (call_attempt->cancelled_) {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"recv_trailing_metadata_ready after cancellation");
if (call_attempt->abandoned_) {
GRPC_CALL_COMBINER_STOP(
calld->call_combiner_,
"recv_trailing_metadata_ready for abandoned attempt");
return;
}
// Cancel per-attempt recv timer, if any.
@ -1673,7 +1675,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
calld->StartRetryTimer(server_pushback_ms);
// Cancel call attempt.
CallCombinerClosureList closures;
call_attempt->Cancel(&closures);
call_attempt->Cancel("call attempt failed", &closures);
// Record that this attempt has been abandoned.
call_attempt->abandoned_ = true;
// Yields call combiner.
closures.RunClosures(calld->call_combiner_);
return;
@ -1758,11 +1762,11 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
grpc_error_std_string(error).c_str(),
grpc_transport_stream_op_batch_string(&batch_data->batch_).c_str());
}
// If this attempt has been cancelled, then we're not going to propagate
// If this attempt has been abandoned, then we're not going to propagate
// the completion of this batch, so do nothing.
if (call_attempt->cancelled_) {
if (call_attempt->abandoned_) {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"on_complete after cancellation");
"on_complete for abandoned attempt");
return;
}
// If we got an error and have not yet gotten the
@ -1956,10 +1960,12 @@ void RetryFilter::CallData::CallAttempt::BatchData::
&call_attempt_->recv_trailing_metadata_ready_;
}
void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp() {
void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp(
const char* reason) {
batch_.cancel_stream = true;
batch_.payload->cancel_stream.cancel_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("retry attempt abandoned");
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(reason),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED);
}
//
@ -2439,7 +2445,7 @@ void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) {
// retry attempt is started, in which case we'll just pass the real
// call dispatch controller down into the LB call, and it won't be
// our problem anymore.
if (call_attempt_->lb_call_committed()) {
if (call_attempt->lb_call_committed()) {
auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
service_config_call_data->call_dispatch_controller()->Commit();

@ -17,71 +17,48 @@
#include <functional>
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h"
#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc_event_engine {
namespace experimental {
SliceAllocator::SliceAllocator(grpc_resource_user* user)
: resource_user_(user) {
grpc_resource_user_ref(resource_user_);
};
Chttp2SliceAllocator::Chttp2SliceAllocator(grpc_resource_user* user)
: resource_user_(user) {}
SliceAllocator::~SliceAllocator() {
Chttp2SliceAllocator::~Chttp2SliceAllocator() {
if (resource_user_ != nullptr) {
grpc_resource_user_unref(resource_user_);
}
};
SliceAllocator::SliceAllocator(SliceAllocator&& other) noexcept
: resource_user_(other.resource_user_) {
other.resource_user_ = nullptr;
}
SliceAllocator& SliceAllocator::operator=(SliceAllocator&& other) noexcept {
resource_user_ = other.resource_user_;
other.resource_user_ = nullptr;
return *this;
}
absl::Status SliceAllocator::Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) {
absl::Status Chttp2SliceAllocator::Allocate(
size_t size, SliceBuffer* dest, SliceAllocator::AllocateCallback cb) {
// TODO(hork): merge the implementation from the uv-ee branch.
(void)size;
(void)dest;
(void)cb;
return absl::OkStatus();
};
}
SliceAllocatorFactory::SliceAllocatorFactory(grpc_resource_quota* quota)
Chttp2SliceAllocatorFactory::Chttp2SliceAllocatorFactory(
grpc_resource_quota* quota)
: resource_quota_(quota) {
grpc_resource_quota_ref_internal(resource_quota_);
};
}
SliceAllocatorFactory::~SliceAllocatorFactory() {
Chttp2SliceAllocatorFactory::~Chttp2SliceAllocatorFactory() {
if (resource_quota_ != nullptr) {
grpc_resource_quota_unref_internal(resource_quota_);
}
}
SliceAllocatorFactory::SliceAllocatorFactory(
SliceAllocatorFactory&& other) noexcept
: resource_quota_(other.resource_quota_) {
other.resource_quota_ = nullptr;
}
SliceAllocatorFactory& SliceAllocatorFactory::operator=(
SliceAllocatorFactory&& other) noexcept {
resource_quota_ = other.resource_quota_;
other.resource_quota_ = nullptr;
return *this;
}
SliceAllocator SliceAllocatorFactory::CreateSliceAllocator(
absl::string_view peer_name) {
return SliceAllocator(
std::unique_ptr<SliceAllocator>
Chttp2SliceAllocatorFactory::CreateSliceAllocator(absl::string_view peer_name) {
return absl::make_unique<Chttp2SliceAllocator>(
grpc_resource_user_create(resource_quota_, peer_name.data()));
}

@ -0,0 +1,74 @@
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_SLICE_ALLOCATOR_H
#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_SLICE_ALLOCATOR_H
#include <grpc/support/port_platform.h>
#include "grpc/event_engine/slice_allocator.h"
#include <functional>
#include "absl/status/status.h"
#include "src/core/lib/iomgr/resource_quota.h"
namespace grpc_event_engine {
namespace experimental {
class Chttp2SliceAllocator
: public grpc_event_engine::experimental::SliceAllocator {
public:
/// gRPC-internal constructor. Takes ownership of a resource_user ref from the
/// caller.
explicit Chttp2SliceAllocator(grpc_resource_user* user);
// Not copyable
Chttp2SliceAllocator(Chttp2SliceAllocator& other) = delete;
Chttp2SliceAllocator& operator=(const Chttp2SliceAllocator& other) = delete;
// Not Moveable
Chttp2SliceAllocator(Chttp2SliceAllocator&& other) = delete;
Chttp2SliceAllocator& operator=(Chttp2SliceAllocator&& other) = delete;
~Chttp2SliceAllocator() override;
absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) override;
private:
grpc_resource_user* resource_user_;
};
class Chttp2SliceAllocatorFactory
: public grpc_event_engine::experimental::SliceAllocatorFactory {
public:
// gRPC-internal constructor
explicit Chttp2SliceAllocatorFactory(grpc_resource_quota* quota);
// Not copyable
Chttp2SliceAllocatorFactory(Chttp2SliceAllocatorFactory& other) = delete;
Chttp2SliceAllocatorFactory& operator=(
const Chttp2SliceAllocatorFactory& other) = delete;
// Not Moveable
Chttp2SliceAllocatorFactory(Chttp2SliceAllocatorFactory&& other) = delete;
Chttp2SliceAllocatorFactory& operator=(Chttp2SliceAllocatorFactory&& other) =
delete;
~Chttp2SliceAllocatorFactory() override;
std::unique_ptr<SliceAllocator> CreateSliceAllocator(
absl::string_view peer_name) override;
private:
grpc_resource_quota* resource_quota_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_SLICE_ALLOCATOR_H

@ -59,7 +59,7 @@ void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
exec_ctx.Flush();
grpc_pollset_ee_broadcast_event();
},
read_buffer, absl::InfiniteFuture());
read_buffer);
}
void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
@ -83,7 +83,7 @@ void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
exec_ctx.Flush();
grpc_pollset_ee_broadcast_event();
},
write_buffer, absl::InfiniteFuture());
write_buffer);
}
void endpoint_add_to_pollset(grpc_endpoint* /* ep */,
grpc_pollset* /* pollset */) {}
@ -123,9 +123,8 @@ absl::string_view endpoint_get_peer(grpc_endpoint* ep) {
return "";
}
if (eeep->peer_address.empty()) {
const EventEngine::ResolvedAddress* addr = eeep->endpoint->GetPeerAddress();
GPR_ASSERT(addr != nullptr);
eeep->peer_address = ResolvedAddressToURI(*addr);
const EventEngine::ResolvedAddress& addr = eeep->endpoint->GetPeerAddress();
eeep->peer_address = ResolvedAddressToURI(addr);
}
return eeep->peer_address;
}
@ -136,10 +135,9 @@ absl::string_view endpoint_get_local_address(grpc_endpoint* ep) {
return "";
}
if (eeep->local_address.empty()) {
const EventEngine::ResolvedAddress* addr =
const EventEngine::ResolvedAddress& addr =
eeep->endpoint->GetLocalAddress();
GPR_ASSERT(addr != nullptr);
eeep->local_address = ResolvedAddressToURI(*addr);
eeep->local_address = ResolvedAddressToURI(addr);
}
return eeep->local_address;
}

@ -38,20 +38,36 @@ using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GrpcClosureToCallback;
using ::grpc_event_engine::experimental::SliceAllocator;
using ::grpc_event_engine::experimental::SliceAllocatorFactory;
using ::grpc_event_engine::experimental::SliceBuffer;
} // namespace
// TODO(hork): remove these classes in PR #26643, when the iomgr APIs change to
// accept SliceAllocators and SliceAllocatorFactory(ie)s. In the meantime, the
// libuv work has temporary implementations as well.
class NoopSliceAllocator : public SliceAllocator {
public:
absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) {
return absl::OkStatus();
}
};
class NoopSliceAllocatorFactory : public SliceAllocatorFactory {
public:
std::unique_ptr<SliceAllocator> CreateSliceAllocator(
absl::string_view peer_name) {
return absl::make_unique<NoopSliceAllocator>();
};
};
struct grpc_tcp_server {
grpc_tcp_server(std::unique_ptr<EventEngine::Listener> listener,
grpc_resource_quota* rq)
explicit grpc_tcp_server(std::unique_ptr<EventEngine::Listener> listener)
: refcount(1, GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace) ? "tcp" : nullptr),
listener(std::move(listener)),
resource_quota(rq) {
listener(std::move(listener)) {
shutdown_starting.head = nullptr;
shutdown_starting.tail = nullptr;
};
~grpc_tcp_server() {
// TODO(nnoble): see if we can handle this in ~SliceAllocatorFactory
grpc_resource_quota_unref_internal(resource_quota);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &shutdown_starting);
grpc_core::ExecCtx::Get()->Flush();
}
@ -59,7 +75,6 @@ struct grpc_tcp_server {
grpc_core::Mutex mu;
std::unique_ptr<EventEngine::Listener> listener;
grpc_closure_list shutdown_starting ABSL_GUARDED_BY(mu);
grpc_resource_quota* resource_quota;
grpc_tcp_server_cb on_accept_internal;
void* on_accept_internal_arg;
};
@ -99,7 +114,9 @@ void tcp_connect(grpc_closure* on_connect, grpc_endpoint** endpoint,
*endpoint = &ee_endpoint->base;
EventEngine::OnConnectCallback ee_on_connect =
GrpcClosureToOnConnectCallback(on_connect, endpoint);
SliceAllocator sa(ee_endpoint->ru);
// TODO(hork): tcp_connect will change to accept a SliceAllocator. This is
// temporary.
auto sa = absl::make_unique<NoopSliceAllocator>();
EventEngine::ResolvedAddress ra(reinterpret_cast<const sockaddr*>(addr->addr),
addr->len);
absl::Time ee_deadline = grpc_core::ToAbslTime(
@ -124,6 +141,9 @@ grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
if (rq == nullptr) {
rq = grpc_resource_quota_create(nullptr);
}
// TODO(hork): tcp_server_create will change to accept a
// SliceAllocatorFactory. This is temporary.
auto saf = absl::make_unique<NoopSliceAllocatorFactory>();
EventEngine* event_engine = grpc_iomgr_event_engine();
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener =
event_engine->CreateListener(
@ -144,11 +164,11 @@ grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
grpc_pollset_ee_broadcast_event();
},
GrpcClosureToCallback(shutdown_complete, GRPC_ERROR_NONE),
endpoint_config, SliceAllocatorFactory(rq));
endpoint_config, std::move(saf));
if (!listener.ok()) {
return absl_status_to_grpc_error(listener.status());
}
*server = new grpc_tcp_server(std::move(*listener), rq);
*server = new grpc_tcp_server(std::move(*listener));
return GRPC_ERROR_NONE;
}

@ -805,6 +805,8 @@ grpc_resource_user* grpc_resource_user_create(
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_user->links[i].next = resource_user->links[i].prev = nullptr;
}
// TODO(hork): the RU should own a copy of the name. See Craig's comments on
// the EventEngine gRFC for justification.
if (name != nullptr) {
resource_user->name = name;
} else {

@ -105,6 +105,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/transport/bin_decoder.cc',
'src/core/ext/transport/chttp2/transport/bin_encoder.cc',
'src/core/ext/transport/chttp2/transport/chttp2_plugin.cc',
'src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc',
'src/core/ext/transport/chttp2/transport/chttp2_transport.cc',
'src/core/ext/transport/chttp2/transport/context_list.cc',
'src/core/ext/transport/chttp2/transport/flow_control.cc',
@ -353,7 +354,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/debug/trace.cc',
'src/core/lib/event_engine/endpoint_config.cc',
'src/core/lib/event_engine/event_engine.cc',
'src/core/lib/event_engine/slice_allocator.cc',
'src/core/lib/event_engine/sockaddr.cc',
'src/core/lib/gpr/alloc.cc',
'src/core/lib/gpr/atm.cc',

@ -43,6 +43,7 @@ void grpc_end2end_tests_pre_init(void) {
% endfor
}
// NOLINTNEXTLINE(readability-function-size)
void grpc_end2end_tests(int argc, char **argv,
grpc_end2end_test_config config) {
int i;

@ -145,6 +145,8 @@ extern void retry_non_retriable_status_before_recv_trailing_metadata_started(grp
extern void retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init(void);
extern void retry_per_attempt_recv_timeout(grpc_end2end_test_config config);
extern void retry_per_attempt_recv_timeout_pre_init(void);
extern void retry_per_attempt_recv_timeout_on_last_attempt(grpc_end2end_test_config config);
extern void retry_per_attempt_recv_timeout_on_last_attempt_pre_init(void);
extern void retry_recv_initial_metadata(grpc_end2end_test_config config);
extern void retry_recv_initial_metadata_pre_init(void);
extern void retry_recv_message(grpc_end2end_test_config config);
@ -261,6 +263,7 @@ void grpc_end2end_tests_pre_init(void) {
retry_non_retriable_status_pre_init();
retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init();
retry_per_attempt_recv_timeout_pre_init();
retry_per_attempt_recv_timeout_on_last_attempt_pre_init();
retry_recv_initial_metadata_pre_init();
retry_recv_message_pre_init();
retry_recv_trailing_metadata_error_pre_init();
@ -290,6 +293,7 @@ void grpc_end2end_tests_pre_init(void) {
write_buffering_at_end_pre_init();
}
// NOLINTNEXTLINE(readability-function-size)
void grpc_end2end_tests(int argc, char **argv,
grpc_end2end_test_config config) {
int i;
@ -355,6 +359,7 @@ void grpc_end2end_tests(int argc, char **argv,
retry_non_retriable_status(config);
retry_non_retriable_status_before_recv_trailing_metadata_started(config);
retry_per_attempt_recv_timeout(config);
retry_per_attempt_recv_timeout_on_last_attempt(config);
retry_recv_initial_metadata(config);
retry_recv_message(config);
retry_recv_trailing_metadata_error(config);
@ -618,6 +623,10 @@ void grpc_end2end_tests(int argc, char **argv,
retry_per_attempt_recv_timeout(config);
continue;
}
if (0 == strcmp("retry_per_attempt_recv_timeout_on_last_attempt", argv[i])) {
retry_per_attempt_recv_timeout_on_last_attempt(config);
continue;
}
if (0 == strcmp("retry_recv_initial_metadata", argv[i])) {
retry_recv_initial_metadata(config);
continue;

@ -147,6 +147,8 @@ extern void retry_non_retriable_status_before_recv_trailing_metadata_started(grp
extern void retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init(void);
extern void retry_per_attempt_recv_timeout(grpc_end2end_test_config config);
extern void retry_per_attempt_recv_timeout_pre_init(void);
extern void retry_per_attempt_recv_timeout_on_last_attempt(grpc_end2end_test_config config);
extern void retry_per_attempt_recv_timeout_on_last_attempt_pre_init(void);
extern void retry_recv_initial_metadata(grpc_end2end_test_config config);
extern void retry_recv_initial_metadata_pre_init(void);
extern void retry_recv_message(grpc_end2end_test_config config);
@ -264,6 +266,7 @@ void grpc_end2end_tests_pre_init(void) {
retry_non_retriable_status_pre_init();
retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init();
retry_per_attempt_recv_timeout_pre_init();
retry_per_attempt_recv_timeout_on_last_attempt_pre_init();
retry_recv_initial_metadata_pre_init();
retry_recv_message_pre_init();
retry_recv_trailing_metadata_error_pre_init();
@ -293,6 +296,7 @@ void grpc_end2end_tests_pre_init(void) {
write_buffering_at_end_pre_init();
}
// NOLINTNEXTLINE(readability-function-size)
void grpc_end2end_tests(int argc, char **argv,
grpc_end2end_test_config config) {
int i;
@ -359,6 +363,7 @@ void grpc_end2end_tests(int argc, char **argv,
retry_non_retriable_status(config);
retry_non_retriable_status_before_recv_trailing_metadata_started(config);
retry_per_attempt_recv_timeout(config);
retry_per_attempt_recv_timeout_on_last_attempt(config);
retry_recv_initial_metadata(config);
retry_recv_message(config);
retry_recv_trailing_metadata_error(config);
@ -626,6 +631,10 @@ void grpc_end2end_tests(int argc, char **argv,
retry_per_attempt_recv_timeout(config);
continue;
}
if (0 == strcmp("retry_per_attempt_recv_timeout_on_last_attempt", argv[i])) {
retry_per_attempt_recv_timeout_on_last_attempt(config);
continue;
}
if (0 == strcmp("retry_recv_initial_metadata", argv[i])) {
retry_recv_initial_metadata(config);
continue;

@ -302,6 +302,12 @@ END2END_TESTS = {
"retry_per_attempt_recv_timeout": _test_options(
needs_client_channel = True,
),
"retry_per_attempt_recv_timeout_on_last_attempt": _test_options(
needs_client_channel = True,
# TODO(jtattermusch): too long bazel test name makes the test flaky on Windows RBE
# See b/151617965
short_name = "retry_per_attempt_recv_timeout2",
),
"retry_recv_initial_metadata": _test_options(needs_client_channel = True),
"retry_recv_message": _test_options(needs_client_channel = True),
"retry_recv_trailing_metadata_error": _test_options(

@ -0,0 +1,263 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "test/core/end2end/end2end_tests.h"
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/static_metadata.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/tests/cancel_test_helpers.h"
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests perAttemptRecvTimeout:
// - 1 retry allowed for ABORTED status
// - both attempts do not receive a response until after perAttemptRecvTimeout
static void test_retry_per_attempt_recv_timeout_on_last_attempt(
grpc_end2end_test_config config) {
grpc_call* c;
grpc_call* s;
grpc_call* s0;
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer* response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
grpc_byte_buffer* request_payload_recv = nullptr;
grpc_byte_buffer* response_payload_recv = nullptr;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
grpc_arg args[] = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 1),
grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
const_cast<char*>(
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"service\", \"method\": \"method\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 2,\n"
" \"initialBackoff\": \"1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1.6,\n"
" \"perAttemptRecvTimeout\": \"2s\",\n"
" \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
" }\n"
" } ]\n"
"}")),
};
grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args};
grpc_end2end_test_fixture f =
begin_test(config, "retry", &client_args, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
gpr_timespec deadline = n_seconds_from_now(10);
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Server gets a call but does not respond to the call.
error =
grpc_server_request_call(f.server, &s0, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), true);
cq_verify(cqv);
// Make sure the "grpc-previous-rpc-attempts" header was not sent in the
// initial attempt.
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
GPR_ASSERT(!grpc_slice_eq(request_metadata_recv.metadata[i].key,
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS));
}
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_details_init(&call_details);
// Server gets a second call, which it also does not respond to.
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(201));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(201), true);
cq_verify(cqv);
// Now we can unref the first call.
grpc_call_unref(s0);
// Make sure the "grpc-previous-rpc-attempts" header was sent in the retry.
bool found_retry_header = false;
for (size_t i = 0; i < request_metadata_recv.count; ++i) {
if (grpc_slice_eq(request_metadata_recv.metadata[i].key,
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS)) {
GPR_ASSERT(
grpc_slice_eq(request_metadata_recv.metadata[i].value, GRPC_MDSTR_1));
found_retry_header = true;
break;
}
}
GPR_ASSERT(found_retry_header);
// Client sees call completion.
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_CANCELLED);
GPR_ASSERT(
0 == grpc_slice_str_cmp(details, "retry perAttemptRecvTimeout exceeded"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_byte_buffer_destroy(response_payload_recv);
grpc_call_unref(c);
grpc_call_unref(s);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
void retry_per_attempt_recv_timeout_on_last_attempt(
grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
test_retry_per_attempt_recv_timeout_on_last_attempt(config);
}
void retry_per_attempt_recv_timeout_on_last_attempt_pre_init(void) {}

@ -1202,6 +1202,8 @@ src/core/ext/transport/chttp2/transport/bin_decoder.h \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.h \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.h \
src/core/ext/transport/chttp2/transport/context_list.cc \
@ -1695,7 +1697,6 @@ src/core/lib/debug/trace.h \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/endpoint_config_internal.h \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/event_engine/sockaddr.h \
src/core/lib/gpr/alloc.cc \

@ -1037,6 +1037,8 @@ src/core/ext/transport/chttp2/transport/bin_decoder.h \
src/core/ext/transport/chttp2/transport/bin_encoder.cc \
src/core/ext/transport/chttp2/transport/bin_encoder.h \
src/core/ext/transport/chttp2/transport/chttp2_plugin.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.cc \
src/core/ext/transport/chttp2/transport/chttp2_slice_allocator.h \
src/core/ext/transport/chttp2/transport/chttp2_transport.cc \
src/core/ext/transport/chttp2/transport/chttp2_transport.h \
src/core/ext/transport/chttp2/transport/context_list.cc \
@ -1532,7 +1534,6 @@ src/core/lib/debug/trace.h \
src/core/lib/event_engine/endpoint_config.cc \
src/core/lib/event_engine/endpoint_config_internal.h \
src/core/lib/event_engine/event_engine.cc \
src/core/lib/event_engine/slice_allocator.cc \
src/core/lib/event_engine/sockaddr.cc \
src/core/lib/event_engine/sockaddr.h \
src/core/lib/gpr/README.md \

@ -17,7 +17,7 @@
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/macos/grpc_build_artifacts.sh"
gfile_resources: "/bigstore/grpc-testing-secrets/gcp_credentials/GrpcTesting-d0eeee2db331.json"
timeout_mins: 150
timeout_mins: 240
action {
define_artifacts {
regex: "**/*sponge_log.*"

@ -198,14 +198,6 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
logging.info('GcpResourceManager: skipping setup for strategy [%s]',
self.strategy)
return
# Construct UrlMap from test classes
# This is the step that mostly likely to go wrong. Lifting it to be the
# first task ensures fail fast.
aggregator = _UrlMapChangeAggregator(
url_map_name=self.td.make_resource_name(self.td.URL_MAP_NAME))
for test_case_class in test_case_classes:
aggregator.apply_change(test_case_class)
final_url_map = aggregator.get_map()
# Clean up debris from previous runs
self._pre_cleanup()
# Start creating GCP resources
@ -219,6 +211,12 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
# Backend Services
self.td.create_backend_service()
self.td.create_alternative_backend_service()
# Construct UrlMap from test classes
aggregator = _UrlMapChangeAggregator(
url_map_name=self.td.make_resource_name(self.td.URL_MAP_NAME))
for test_case_class in test_case_classes:
aggregator.apply_change(test_case_class)
final_url_map = aggregator.get_map()
# UrlMap
self.td.create_url_map_with_content(final_url_map)
# Target Proxy
@ -266,10 +264,11 @@ class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
@functools.lru_cache(None)
def default_backend_service(self) -> str:
"""Returns default backend service URL."""
return self.td.make_resource_name(self.td.BACKEND_SERVICE_NAME)
self.td.load_backend_service()
return self.td.backend_service.url
@functools.lru_cache(None)
def alternative_backend_service(self) -> str:
"""Returns alternative backend service URL."""
return self.td.make_resource_name(
self.td.ALTERNATIVE_BACKEND_SERVICE_NAME)
self.td.load_alternative_backend_service()
return self.td.alternative_backend_service.url

Loading…
Cancel
Save