[chttp2] Eliminate grpc_chttp2_stream_map (#33503)

No need for a bespoke type anymore... and a step along the path to
C++ification.

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/33610/head
Craig Tiller 2 years ago committed by GitHub
parent c0889a4f23
commit c5bb43ab61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      BUILD
  2. 52
      CMakeLists.txt
  3. 2
      Makefile
  4. 38
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 2
      package.xml
  12. 91
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  13. 5
      src/core/ext/transport/chttp2/transport/frame_ping.cc
  14. 10
      src/core/ext/transport/chttp2/transport/internal.h
  15. 4
      src/core/ext/transport/chttp2/transport/parsing.cc
  16. 177
      src/core/ext/transport/chttp2/transport/stream_map.cc
  17. 68
      src/core/ext/transport/chttp2/transport/stream_map.h
  18. 5
      src/core/ext/transport/chttp2/transport/writing.cc
  19. 1
      src/python/grpcio/grpc_core_dependencies.py
  20. 13
      test/core/transport/chttp2/BUILD
  21. 196
      test/core/transport/chttp2/stream_map_test.cc
  22. 2
      tools/doxygen/Doxyfile.c++.internal
  23. 2
      tools/doxygen/Doxyfile.core.internal
  24. 24
      tools/run_tests/generated/tests.json

@ -3883,7 +3883,6 @@ grpc_cc_library(
"//src/core:ext/transport/chttp2/transport/frame_window_update.cc", "//src/core:ext/transport/chttp2/transport/frame_window_update.cc",
"//src/core:ext/transport/chttp2/transport/parsing.cc", "//src/core:ext/transport/chttp2/transport/parsing.cc",
"//src/core:ext/transport/chttp2/transport/stream_lists.cc", "//src/core:ext/transport/chttp2/transport/stream_lists.cc",
"//src/core:ext/transport/chttp2/transport/stream_map.cc",
"//src/core:ext/transport/chttp2/transport/writing.cc", "//src/core:ext/transport/chttp2/transport/writing.cc",
], ],
hdrs = [ hdrs = [
@ -3896,10 +3895,12 @@ grpc_cc_library(
"//src/core:ext/transport/chttp2/transport/frame_settings.h", "//src/core:ext/transport/chttp2/transport/frame_settings.h",
"//src/core:ext/transport/chttp2/transport/frame_window_update.h", "//src/core:ext/transport/chttp2/transport/frame_window_update.h",
"//src/core:ext/transport/chttp2/transport/internal.h", "//src/core:ext/transport/chttp2/transport/internal.h",
"//src/core:ext/transport/chttp2/transport/stream_map.h",
], ],
external_deps = [ external_deps = [
"absl/base:core_headers", "absl/base:core_headers",
"absl/container:flat_hash_map",
"absl/hash",
"absl/meta:type_traits",
"absl/status", "absl/status",
"absl/strings", "absl/strings",
"absl/strings:cord", "absl/strings:cord",

52
CMakeLists.txt generated

@ -1302,7 +1302,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx stranded_event_test) add_dependencies(buildtests_cxx stranded_event_test)
endif() endif()
add_dependencies(buildtests_cxx stream_leak_with_queued_flow_control_update_test) add_dependencies(buildtests_cxx stream_leak_with_queued_flow_control_update_test)
add_dependencies(buildtests_cxx stream_map_test)
add_dependencies(buildtests_cxx streaming_error_response_test) add_dependencies(buildtests_cxx streaming_error_response_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx streaming_throughput_test) add_dependencies(buildtests_cxx streaming_throughput_test)
@ -1783,7 +1782,6 @@ add_library(grpc
src/core/ext/transport/chttp2/transport/huffsyms.cc src/core/ext/transport/chttp2/transport/huffsyms.cc
src/core/ext/transport/chttp2/transport/parsing.cc src/core/ext/transport/chttp2/transport/parsing.cc
src/core/ext/transport/chttp2/transport/stream_lists.cc src/core/ext/transport/chttp2/transport/stream_lists.cc
src/core/ext/transport/chttp2/transport/stream_map.cc
src/core/ext/transport/chttp2/transport/varint.cc src/core/ext/transport/chttp2/transport/varint.cc
src/core/ext/transport/chttp2/transport/writing.cc src/core/ext/transport/chttp2/transport/writing.cc
src/core/ext/transport/inproc/inproc_plugin.cc src/core/ext/transport/inproc/inproc_plugin.cc
@ -2809,7 +2807,6 @@ add_library(grpc_unsecure
src/core/ext/transport/chttp2/transport/huffsyms.cc src/core/ext/transport/chttp2/transport/huffsyms.cc
src/core/ext/transport/chttp2/transport/parsing.cc src/core/ext/transport/chttp2/transport/parsing.cc
src/core/ext/transport/chttp2/transport/stream_lists.cc src/core/ext/transport/chttp2/transport/stream_lists.cc
src/core/ext/transport/chttp2/transport/stream_map.cc
src/core/ext/transport/chttp2/transport/varint.cc src/core/ext/transport/chttp2/transport/varint.cc
src/core/ext/transport/chttp2/transport/writing.cc src/core/ext/transport/chttp2/transport/writing.cc
src/core/ext/transport/inproc/inproc_plugin.cc src/core/ext/transport/inproc/inproc_plugin.cc
@ -24174,55 +24171,6 @@ target_link_libraries(stream_leak_with_queued_flow_control_update_test
) )
endif()
if(gRPC_BUILD_TESTS)
add_executable(stream_map_test
test/core/transport/chttp2/stream_map_test.cc
test/core/util/cmdline.cc
test/core/util/fuzzer_util.cc
test/core/util/grpc_profiler.cc
test/core/util/histogram.cc
test/core/util/mock_endpoint.cc
test/core/util/parse_hexstring.cc
test/core/util/passthru_endpoint.cc
test/core/util/resolve_localhost_ip46.cc
test/core/util/slice_splitter.cc
test/core/util/subprocess_posix.cc
test/core/util/subprocess_windows.cc
test/core/util/tracer_util.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_compile_features(stream_map_test PUBLIC cxx_std_14)
target_include_directories(stream_map_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(stream_map_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif() endif()
if(gRPC_BUILD_TESTS) if(gRPC_BUILD_TESTS)

2
Makefile generated

@ -1066,7 +1066,6 @@ LIBGRPC_SRC = \
src/core/ext/transport/chttp2/transport/huffsyms.cc \ src/core/ext/transport/chttp2/transport/huffsyms.cc \
src/core/ext/transport/chttp2/transport/parsing.cc \ src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/stream_map.cc \
src/core/ext/transport/chttp2/transport/varint.cc \ src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/chttp2/transport/writing.cc \
src/core/ext/transport/inproc/inproc_plugin.cc \ src/core/ext/transport/inproc/inproc_plugin.cc \
@ -1945,7 +1944,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/chttp2/transport/huffsyms.cc \ src/core/ext/transport/chttp2/transport/huffsyms.cc \
src/core/ext/transport/chttp2/transport/parsing.cc \ src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/stream_map.cc \
src/core/ext/transport/chttp2/transport/varint.cc \ src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/chttp2/transport/writing.cc \
src/core/ext/transport/inproc/inproc_plugin.cc \ src/core/ext/transport/inproc/inproc_plugin.cc \

@ -306,7 +306,6 @@ libs:
- src/core/ext/transport/chttp2/transport/http_trace.h - src/core/ext/transport/chttp2/transport/http_trace.h
- src/core/ext/transport/chttp2/transport/huffsyms.h - src/core/ext/transport/chttp2/transport/huffsyms.h
- src/core/ext/transport/chttp2/transport/internal.h - src/core/ext/transport/chttp2/transport/internal.h
- src/core/ext/transport/chttp2/transport/stream_map.h
- src/core/ext/transport/chttp2/transport/varint.h - src/core/ext/transport/chttp2/transport/varint.h
- src/core/ext/transport/inproc/inproc_transport.h - src/core/ext/transport/inproc/inproc_transport.h
- src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h - src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h
@ -1123,7 +1122,6 @@ libs:
- src/core/ext/transport/chttp2/transport/huffsyms.cc - src/core/ext/transport/chttp2/transport/huffsyms.cc
- src/core/ext/transport/chttp2/transport/parsing.cc - src/core/ext/transport/chttp2/transport/parsing.cc
- src/core/ext/transport/chttp2/transport/stream_lists.cc - src/core/ext/transport/chttp2/transport/stream_lists.cc
- src/core/ext/transport/chttp2/transport/stream_map.cc
- src/core/ext/transport/chttp2/transport/varint.cc - src/core/ext/transport/chttp2/transport/varint.cc
- src/core/ext/transport/chttp2/transport/writing.cc - src/core/ext/transport/chttp2/transport/writing.cc
- src/core/ext/transport/inproc/inproc_plugin.cc - src/core/ext/transport/inproc/inproc_plugin.cc
@ -2021,7 +2019,6 @@ libs:
- src/core/ext/transport/chttp2/transport/http_trace.h - src/core/ext/transport/chttp2/transport/http_trace.h
- src/core/ext/transport/chttp2/transport/huffsyms.h - src/core/ext/transport/chttp2/transport/huffsyms.h
- src/core/ext/transport/chttp2/transport/internal.h - src/core/ext/transport/chttp2/transport/internal.h
- src/core/ext/transport/chttp2/transport/stream_map.h
- src/core/ext/transport/chttp2/transport/varint.h - src/core/ext/transport/chttp2/transport/varint.h
- src/core/ext/transport/inproc/inproc_transport.h - src/core/ext/transport/inproc/inproc_transport.h
- src/core/ext/upb-generated/google/api/annotations.upb.h - src/core/ext/upb-generated/google/api/annotations.upb.h
@ -2441,7 +2438,6 @@ libs:
- src/core/ext/transport/chttp2/transport/huffsyms.cc - src/core/ext/transport/chttp2/transport/huffsyms.cc
- src/core/ext/transport/chttp2/transport/parsing.cc - src/core/ext/transport/chttp2/transport/parsing.cc
- src/core/ext/transport/chttp2/transport/stream_lists.cc - src/core/ext/transport/chttp2/transport/stream_lists.cc
- src/core/ext/transport/chttp2/transport/stream_map.cc
- src/core/ext/transport/chttp2/transport/varint.cc - src/core/ext/transport/chttp2/transport/varint.cc
- src/core/ext/transport/chttp2/transport/writing.cc - src/core/ext/transport/chttp2/transport/writing.cc
- src/core/ext/transport/inproc/inproc_plugin.cc - src/core/ext/transport/inproc/inproc_plugin.cc
@ -14418,40 +14414,6 @@ targets:
- test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc - test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc
deps: deps:
- grpc_test_util - grpc_test_util
- name: stream_map_test
gtest: true
build: test
language: c++
headers:
- test/core/util/cmdline.h
- test/core/util/evaluate_args_test_util.h
- test/core/util/fuzzer_util.h
- test/core/util/grpc_profiler.h
- test/core/util/histogram.h
- test/core/util/mock_authorization_endpoint.h
- test/core/util/mock_endpoint.h
- test/core/util/parse_hexstring.h
- test/core/util/passthru_endpoint.h
- test/core/util/resolve_localhost_ip46.h
- test/core/util/slice_splitter.h
- test/core/util/subprocess.h
- test/core/util/tracer_util.h
src:
- test/core/transport/chttp2/stream_map_test.cc
- test/core/util/cmdline.cc
- test/core/util/fuzzer_util.cc
- test/core/util/grpc_profiler.cc
- test/core/util/histogram.cc
- test/core/util/mock_endpoint.cc
- test/core/util/parse_hexstring.cc
- test/core/util/passthru_endpoint.cc
- test/core/util/resolve_localhost_ip46.cc
- test/core/util/slice_splitter.cc
- test/core/util/subprocess_posix.cc
- test/core/util/subprocess_windows.cc
- test/core/util/tracer_util.cc
deps:
- grpc_test_util
- name: streaming_error_response_test - name: streaming_error_response_test
gtest: true gtest: true
build: test build: test

1
config.m4 generated

@ -145,7 +145,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/transport/chttp2/transport/huffsyms.cc \ src/core/ext/transport/chttp2/transport/huffsyms.cc \
src/core/ext/transport/chttp2/transport/parsing.cc \ src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/stream_map.cc \
src/core/ext/transport/chttp2/transport/varint.cc \ src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/chttp2/transport/writing.cc \
src/core/ext/transport/inproc/inproc_plugin.cc \ src/core/ext/transport/inproc/inproc_plugin.cc \

1
config.w32 generated

@ -110,7 +110,6 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\transport\\chttp2\\transport\\huffsyms.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\huffsyms.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\parsing.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\parsing.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\stream_lists.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\stream_lists.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\stream_map.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\varint.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\varint.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\writing.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\writing.cc " +
"src\\core\\ext\\transport\\inproc\\inproc_plugin.cc " + "src\\core\\ext\\transport\\inproc\\inproc_plugin.cc " +

2
gRPC-C++.podspec generated

@ -376,7 +376,6 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/http_trace.h', 'src/core/ext/transport/chttp2/transport/http_trace.h',
'src/core/ext/transport/chttp2/transport/huffsyms.h', 'src/core/ext/transport/chttp2/transport/huffsyms.h',
'src/core/ext/transport/chttp2/transport/internal.h', 'src/core/ext/transport/chttp2/transport/internal.h',
'src/core/ext/transport/chttp2/transport/stream_map.h',
'src/core/ext/transport/chttp2/transport/varint.h', 'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/inproc/inproc_transport.h', 'src/core/ext/transport/inproc/inproc_transport.h',
'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h', 'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h',
@ -1422,7 +1421,6 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/http_trace.h', 'src/core/ext/transport/chttp2/transport/http_trace.h',
'src/core/ext/transport/chttp2/transport/huffsyms.h', 'src/core/ext/transport/chttp2/transport/huffsyms.h',
'src/core/ext/transport/chttp2/transport/internal.h', 'src/core/ext/transport/chttp2/transport/internal.h',
'src/core/ext/transport/chttp2/transport/stream_map.h',
'src/core/ext/transport/chttp2/transport/varint.h', 'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/inproc/inproc_transport.h', 'src/core/ext/transport/inproc/inproc_transport.h',
'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h', 'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h',

3
gRPC-Core.podspec generated

@ -410,8 +410,6 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/internal.h', 'src/core/ext/transport/chttp2/transport/internal.h',
'src/core/ext/transport/chttp2/transport/parsing.cc', 'src/core/ext/transport/chttp2/transport/parsing.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc', 'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/stream_map.cc',
'src/core/ext/transport/chttp2/transport/stream_map.h',
'src/core/ext/transport/chttp2/transport/varint.cc', 'src/core/ext/transport/chttp2/transport/varint.cc',
'src/core/ext/transport/chttp2/transport/varint.h', 'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/chttp2/transport/writing.cc', 'src/core/ext/transport/chttp2/transport/writing.cc',
@ -2154,7 +2152,6 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/http_trace.h', 'src/core/ext/transport/chttp2/transport/http_trace.h',
'src/core/ext/transport/chttp2/transport/huffsyms.h', 'src/core/ext/transport/chttp2/transport/huffsyms.h',
'src/core/ext/transport/chttp2/transport/internal.h', 'src/core/ext/transport/chttp2/transport/internal.h',
'src/core/ext/transport/chttp2/transport/stream_map.h',
'src/core/ext/transport/chttp2/transport/varint.h', 'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/inproc/inproc_transport.h', 'src/core/ext/transport/inproc/inproc_transport.h',
'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h', 'src/core/ext/upb-generated/envoy/admin/v3/certs.upb.h',

2
grpc.gemspec generated

@ -315,8 +315,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/transport/chttp2/transport/internal.h ) s.files += %w( src/core/ext/transport/chttp2/transport/internal.h )
s.files += %w( src/core/ext/transport/chttp2/transport/parsing.cc ) s.files += %w( src/core/ext/transport/chttp2/transport/parsing.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/stream_lists.cc ) s.files += %w( src/core/ext/transport/chttp2/transport/stream_lists.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/stream_map.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/stream_map.h )
s.files += %w( src/core/ext/transport/chttp2/transport/varint.cc ) s.files += %w( src/core/ext/transport/chttp2/transport/varint.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/varint.h ) s.files += %w( src/core/ext/transport/chttp2/transport/varint.h )
s.files += %w( src/core/ext/transport/chttp2/transport/writing.cc ) s.files += %w( src/core/ext/transport/chttp2/transport/writing.cc )

2
grpc.gyp generated

@ -370,7 +370,6 @@
'src/core/ext/transport/chttp2/transport/huffsyms.cc', 'src/core/ext/transport/chttp2/transport/huffsyms.cc',
'src/core/ext/transport/chttp2/transport/parsing.cc', 'src/core/ext/transport/chttp2/transport/parsing.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc', 'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/stream_map.cc',
'src/core/ext/transport/chttp2/transport/varint.cc', 'src/core/ext/transport/chttp2/transport/varint.cc',
'src/core/ext/transport/chttp2/transport/writing.cc', 'src/core/ext/transport/chttp2/transport/writing.cc',
'src/core/ext/transport/inproc/inproc_plugin.cc', 'src/core/ext/transport/inproc/inproc_plugin.cc',
@ -1189,7 +1188,6 @@
'src/core/ext/transport/chttp2/transport/huffsyms.cc', 'src/core/ext/transport/chttp2/transport/huffsyms.cc',
'src/core/ext/transport/chttp2/transport/parsing.cc', 'src/core/ext/transport/chttp2/transport/parsing.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc', 'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/stream_map.cc',
'src/core/ext/transport/chttp2/transport/varint.cc', 'src/core/ext/transport/chttp2/transport/varint.cc',
'src/core/ext/transport/chttp2/transport/writing.cc', 'src/core/ext/transport/chttp2/transport/writing.cc',
'src/core/ext/transport/inproc/inproc_plugin.cc', 'src/core/ext/transport/inproc/inproc_plugin.cc',

2
package.xml generated

@ -297,8 +297,6 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/internal.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/internal.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/parsing.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/parsing.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/stream_lists.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/stream_lists.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/stream_map.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/stream_map.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/varint.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/varint.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/varint.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/varint.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/writing.cc" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/writing.cc" role="src" />

@ -32,6 +32,9 @@
#include <vector> #include <vector>
#include "absl/base/attributes.h" #include "absl/base/attributes.h"
#include "absl/container/flat_hash_map.h"
#include "absl/hash/hash.h"
#include "absl/meta/type_traits.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/strings/cord.h" #include "absl/strings/cord.h"
#include "absl/strings/str_cat.h" #include "absl/strings/str_cat.h"
@ -58,7 +61,6 @@
#include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/http_trace.h" #include "src/core/ext/transport/chttp2/transport/http_trace.h"
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
@ -315,10 +317,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
GPR_ASSERT(lists[i].tail == nullptr); GPR_ASSERT(lists[i].tail == nullptr);
} }
GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0); GPR_ASSERT(stream_map.empty());
grpc_chttp2_stream_map_destroy(&stream_map);
GRPC_COMBINER_UNREF(combiner, "chttp2_transport"); GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed")); cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed"));
@ -598,12 +597,6 @@ grpc_chttp2_transport::grpc_chttp2_transport(
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
base.vtable = get_vtable(); base.vtable = get_vtable();
// 8 is a random stab in the dark as to a good initial size: it's small enough
// that it shouldn't waste memory for infrequently used connections, yet
// large enough that the exponential growth should happen nicely when it's
// needed.
// TODO(ctiller): tune this
grpc_chttp2_stream_map_init(&stream_map, 8);
grpc_slice_buffer_init(&read_buffer); grpc_slice_buffer_init(&read_buffer);
grpc_slice_buffer_init(&outbuf); grpc_slice_buffer_init(&outbuf);
@ -813,7 +806,7 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
this, id, server_data); this, id, server_data);
} }
*t->accepting_stream = this; *t->accepting_stream = this;
grpc_chttp2_stream_map_add(&t->stream_map, id, this); t->stream_map.emplace(id, this);
post_destructive_reclaimer(t); post_destructive_reclaimer(t);
} }
@ -835,7 +828,7 @@ grpc_chttp2_stream::~grpc_chttp2_stream() {
GPR_ASSERT((write_closed && read_closed) || id == 0); GPR_ASSERT((write_closed && read_closed) || id == 0);
if (id != 0) { if (id != 0) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); GPR_ASSERT(t->stream_map.count(id) == 0);
} }
grpc_slice_buffer_destroy(&frame_storage); grpc_slice_buffer_destroy(&frame_storage);
@ -1076,7 +1069,7 @@ static void write_action_end_locked(void* tp, grpc_error_handle error) {
if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) { if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) {
t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT; t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT;
closed = true; closed = true;
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { if (t->stream_map.empty()) {
close_transport_locked(t, GRPC_ERROR_CREATE("goaway sent")); close_transport_locked(t, GRPC_ERROR_CREATE("goaway sent"));
} }
} }
@ -1169,19 +1162,18 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
if (t->is_client) { if (t->is_client) {
cancel_unstarted_streams(t, t->goaway_error); cancel_unstarted_streams(t, t->goaway_error);
// Cancel all unseen streams // Cancel all unseen streams
grpc_chttp2_stream_map_for_each( std::vector<grpc_chttp2_stream*> to_cancel;
&t->stream_map, for (auto id_stream : t->stream_map) {
[](void* user_data, uint32_t /* key */, void* stream) { if (id_stream.first > last_stream_id) {
uint32_t last_stream_id = *(static_cast<uint32_t*>(user_data)); to_cancel.push_back(id_stream.second);
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream); }
if (s->id > last_stream_id) { }
s->trailing_metadata_buffer.Set( for (auto s : to_cancel) {
grpc_core::GrpcStreamNetworkState(), s->trailing_metadata_buffer.Set(
grpc_core::GrpcStreamNetworkState::kNotSeenByServer); grpc_core::GrpcStreamNetworkState(),
grpc_chttp2_cancel_stream(s->t, s, s->t->goaway_error); grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
} grpc_chttp2_cancel_stream(s->t, s, s->t->goaway_error);
}, }
&last_stream_id);
} }
absl::Status status = grpc_error_to_absl_status(t->goaway_error); absl::Status status = grpc_error_to_absl_status(t->goaway_error);
// When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
@ -1225,7 +1217,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
// start streams where we have free grpc_chttp2_stream ids and free // start streams where we have free grpc_chttp2_stream ids and free
// * concurrency // * concurrency
while (t->next_stream_id <= MAX_CLIENT_STREAM_ID && while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
grpc_chttp2_stream_map_size(&t->stream_map) < t->stream_map.size() <
t->settings[GRPC_PEER_SETTINGS] t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) { grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
@ -1246,7 +1238,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
"no_more_stream_ids"); "no_more_stream_ids");
} }
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); t->stream_map.emplace(s->id, s);
post_destructive_reclaimer(t); post_destructive_reclaimer(t);
grpc_chttp2_mark_stream_writable(t, s); grpc_chttp2_mark_stream_writable(t, s);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
@ -2095,15 +2087,14 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
static void remove_stream(grpc_chttp2_transport* t, uint32_t id, static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
grpc_error_handle error) { grpc_error_handle error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>( grpc_chttp2_stream* s = t->stream_map.extract(id).mapped();
grpc_chttp2_stream_map_delete(&t->stream_map, id));
GPR_DEBUG_ASSERT(s); GPR_DEBUG_ASSERT(s);
if (t->incoming_stream == s) { if (t->incoming_stream == s) {
t->incoming_stream = nullptr; t->incoming_stream = nullptr;
grpc_chttp2_parsing_become_skip_parser(t); grpc_chttp2_parsing_become_skip_parser(t);
} }
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { if (t->stream_map.empty()) {
post_benign_reclaimer(t); post_benign_reclaimer(t);
if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) { if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) {
close_transport_locked( close_transport_locked(
@ -2454,17 +2445,6 @@ static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
} }
struct cancel_stream_cb_args {
grpc_error_handle error;
grpc_chttp2_transport* t;
};
static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {
cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
grpc_chttp2_cancel_stream(args->t, s, args->error);
}
static void end_all_the_calls(grpc_chttp2_transport* t, static void end_all_the_calls(grpc_chttp2_transport* t,
grpc_error_handle error) { grpc_error_handle error) {
intptr_t http2_error; intptr_t http2_error;
@ -2476,8 +2456,13 @@ static void end_all_the_calls(grpc_chttp2_transport* t,
GRPC_STATUS_UNAVAILABLE); GRPC_STATUS_UNAVAILABLE);
} }
cancel_unstarted_streams(t, error); cancel_unstarted_streams(t, error);
cancel_stream_cb_args args = {error, t}; std::vector<grpc_chttp2_stream*> to_cancel;
grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args); for (auto id_stream : t->stream_map) {
to_cancel.push_back(id_stream.second);
}
for (auto s : to_cancel) {
grpc_chttp2_cancel_stream(t, s, error);
}
} }
// //
@ -2825,8 +2810,7 @@ static void init_keepalive_ping_locked(void* arg,
if (t->destroying || !t->closed_with_error.ok()) { if (t->destroying || !t->closed_with_error.ok()) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
} else { } else {
if (t->keepalive_permit_without_calls || if (t->keepalive_permit_without_calls || !t->stream_map.empty()) {
grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
send_keepalive_ping_locked(t); send_keepalive_ping_locked(t);
@ -3048,7 +3032,7 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
static void benign_reclaimer_locked(void* arg, grpc_error_handle error) { static void benign_reclaimer_locked(void* arg, grpc_error_handle error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
if (error.ok() && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { if (error.ok() && t->stream_map.empty()) {
// Channel with no active streams: send a goaway to try and make it // Channel with no active streams: send a goaway to try and make it
// disconnect cleanly // disconnect cleanly
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
@ -3065,7 +3049,7 @@ static void benign_reclaimer_locked(void* arg, grpc_error_handle error) {
"HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
" streams", " streams",
std::string(t->peer_string.as_string_view()).c_str(), std::string(t->peer_string.as_string_view()).c_str(),
grpc_chttp2_stream_map_size(&t->stream_map)); t->stream_map.size());
} }
t->benign_reclaimer_registered = false; t->benign_reclaimer_registered = false;
if (error != absl::CancelledError()) { if (error != absl::CancelledError()) {
@ -3076,11 +3060,10 @@ static void benign_reclaimer_locked(void* arg, grpc_error_handle error) {
static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) { static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
t->destructive_reclaimer_registered = false; t->destructive_reclaimer_registered = false;
if (error.ok() && n > 0) { if (error.ok() && !t->stream_map.empty()) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>( // As stream_map is a hash map, this selects effectively a random stream.
grpc_chttp2_stream_map_rand(&t->stream_map)); grpc_chttp2_stream* s = t->stream_map.begin()->second;
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
std::string(t->peer_string.as_string_view()).c_str(), s->id); std::string(t->peer_string.as_string_view()).c_str(), s->id);
@ -3090,7 +3073,7 @@ static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) {
grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
grpc_core::StatusIntProperty::kHttp2Error, grpc_core::StatusIntProperty::kHttp2Error,
GRPC_HTTP2_ENHANCE_YOUR_CALM)); GRPC_HTTP2_ENHANCE_YOUR_CALM));
if (n > 1) { if (!t->stream_map.empty()) {
// Since we cancel one stream per destructive reclamation, if // Since we cancel one stream per destructive reclamation, if
// there are more streams left, we can immediately post a new // there are more streams left, we can immediately post a new
// reclaimer in case the resource quota needs to free more // reclaimer in case the resource quota needs to free more

@ -25,6 +25,7 @@
#include <algorithm> #include <algorithm>
#include <initializer_list> #include <initializer_list>
#include "absl/container/flat_hash_map.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/strings/str_format.h" #include "absl/strings/str_format.h"
@ -32,7 +33,6 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/time.h"
grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes) { grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes) {
@ -99,8 +99,7 @@ grpc_error_handle grpc_chttp2_ping_parser_parse(void* parser,
t->ping_recv_state.last_ping_recv_time + t->ping_recv_state.last_ping_recv_time +
t->ping_policy.min_recv_ping_interval_without_data; t->ping_policy.min_recv_ping_interval_without_data;
if (t->keepalive_permit_without_calls == 0 && if (t->keepalive_permit_without_calls == 0 && t->stream_map.empty()) {
grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
// According to RFC1122, the interval of TCP Keep-Alive is default to // According to RFC1122, the interval of TCP Keep-Alive is default to
// no less than two hours. When there is no outstanding streams, we // no less than two hours. When there is no outstanding streams, we
// restrict the number of PINGS equivalent to TCP Keep-Alive. // restrict the number of PINGS equivalent to TCP Keep-Alive.

@ -26,6 +26,8 @@
#include <memory> #include <memory>
#include "absl/container/flat_hash_map.h"
#include "absl/meta/type_traits.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
@ -46,7 +48,6 @@
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
@ -278,7 +279,7 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {}; grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
/// maps stream id to grpc_chttp2_stream objects /// maps stream id to grpc_chttp2_stream objects
grpc_chttp2_stream_map stream_map; absl::flat_hash_map<uint32_t, grpc_chttp2_stream*> stream_map;
grpc_closure write_action_begin_locked; grpc_closure write_action_begin_locked;
grpc_closure write_action; grpc_closure write_action;
@ -679,8 +680,9 @@ void grpc_chttp2_act_on_flowctl_action(
inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream( inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport* t, uint32_t id) { grpc_chttp2_transport* t, uint32_t id) {
return static_cast<grpc_chttp2_stream*>( auto it = t->stream_map.find(id);
grpc_chttp2_stream_map_find(&t->stream_map, id)); if (it == t->stream_map.end()) return nullptr;
return it->second;
} }
grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
uint32_t id); uint32_t id);

@ -25,6 +25,7 @@
#include <string> #include <string>
#include "absl/base/attributes.h" #include "absl/base/attributes.h"
#include "absl/container/flat_hash_map.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/strings/str_cat.h" #include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h" #include "absl/strings/str_format.h"
@ -46,7 +47,6 @@
#include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/http_trace.h" #include "src/core/ext/transport/chttp2/transport/http_trace.h"
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -623,7 +623,7 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
t->incoming_stream_id)); t->incoming_stream_id));
return init_header_skip_frame_parser(t, priority_type, is_eoh); return init_header_skip_frame_parser(t, priority_type, is_eoh);
} else if (GPR_UNLIKELY( } else if (GPR_UNLIKELY(
grpc_chttp2_stream_map_size(&t->stream_map) >= t->stream_map.size() >=
t->settings[GRPC_ACKED_SETTINGS] t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS])) { [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS])) {
return GRPC_ERROR_CREATE("Max stream count exceeded"); return GRPC_ERROR_CREATE("Max stream count exceeded");

@ -1,177 +0,0 @@
//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include <stdlib.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
void grpc_chttp2_stream_map_init(grpc_chttp2_stream_map* map,
size_t initial_capacity) {
GPR_DEBUG_ASSERT(initial_capacity > 1);
map->keys =
static_cast<uint32_t*>(gpr_malloc(sizeof(uint32_t) * initial_capacity));
map->values =
static_cast<void**>(gpr_malloc(sizeof(void*) * initial_capacity));
map->count = 0;
map->free = 0;
map->capacity = initial_capacity;
}
void grpc_chttp2_stream_map_destroy(grpc_chttp2_stream_map* map) {
gpr_free(map->keys);
gpr_free(map->values);
}
static size_t compact(uint32_t* keys, void** values, size_t count) {
size_t i, out;
for (i = 0, out = 0; i < count; i++) {
if (values[i]) {
keys[out] = keys[i];
values[out] = values[i];
out++;
}
}
return out;
}
void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map* map, uint32_t key,
void* value) {
size_t count = map->count;
size_t capacity = map->capacity;
uint32_t* keys = map->keys;
void** values = map->values;
// The first assertion ensures that the table is monotonically increasing.
GPR_ASSERT(count == 0 || keys[count - 1] < key);
GPR_DEBUG_ASSERT(value);
// Asserting that the key is not already in the map can be a debug assertion.
// Why: we're already checking that the map elements are monotonically
// increasing. If we re-add a key, i.e. if the key is already present, then
// either it is the most recently added key in the map (in which case the
// first assertion fails due to key == last_key) or there is a more recently
// added (larger) key at the end of the map: in which case the first assertion
// still fails due to key < last_key.
GPR_DEBUG_ASSERT(grpc_chttp2_stream_map_find(map, key) == nullptr);
if (count == capacity) {
if (map->free > capacity / 4) {
count = compact(keys, values, count);
map->free = 0;
} else {
// resize when less than 25% of the table is free, because compaction
// won't help much
map->capacity = capacity = 2 * capacity;
map->keys = keys = static_cast<uint32_t*>(
gpr_realloc(keys, capacity * sizeof(uint32_t)));
map->values = values =
static_cast<void**>(gpr_realloc(values, capacity * sizeof(void*)));
}
}
keys[count] = key;
values[count] = value;
map->count = count + 1;
}
template <bool strict_find>
static void** find(grpc_chttp2_stream_map* map, uint32_t key) {
size_t min_idx = 0;
size_t max_idx = map->count;
size_t mid_idx;
uint32_t* keys = map->keys;
void** values = map->values;
uint32_t mid_key;
GPR_DEBUG_ASSERT(!strict_find || max_idx > 0);
if (!strict_find && max_idx == 0) return nullptr;
while (min_idx < max_idx) {
// find the midpoint, avoiding overflow
mid_idx = min_idx + ((max_idx - min_idx) / 2);
mid_key = keys[mid_idx];
if (mid_key < key) {
min_idx = mid_idx + 1;
} else if (mid_key > key) {
max_idx = mid_idx;
} else // mid_key == key
{
return &values[mid_idx];
}
}
GPR_DEBUG_ASSERT(!strict_find);
return nullptr;
}
void* grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map* map, uint32_t key) {
void** pvalue = find<true>(map, key);
GPR_DEBUG_ASSERT(pvalue != nullptr);
void* out = *pvalue;
GPR_DEBUG_ASSERT(out != nullptr);
*pvalue = nullptr;
map->free++;
// recognize complete emptyness and ensure we can skip
// defragmentation later
if (map->free == map->count) {
map->free = map->count = 0;
}
GPR_DEBUG_ASSERT(grpc_chttp2_stream_map_find(map, key) == nullptr);
return out;
}
void* grpc_chttp2_stream_map_find(grpc_chttp2_stream_map* map, uint32_t key) {
void** pvalue = find<false>(map, key);
return pvalue != nullptr ? *pvalue : nullptr;
}
size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map* map) {
return map->count - map->free;
}
void* grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map* map) {
if (map->count == map->free) {
return nullptr;
}
if (map->free != 0) {
map->count = compact(map->keys, map->values, map->count);
map->free = 0;
GPR_ASSERT(map->count > 0);
}
return map->values[(static_cast<size_t>(rand())) % map->count];
}
void grpc_chttp2_stream_map_for_each(grpc_chttp2_stream_map* map,
void (*f)(void* user_data, uint32_t key,
void* value),
void* user_data) {
size_t i;
for (i = 0; i < map->count; i++) {
if (map->values[i]) {
f(user_data, map->keys[i], map->values[i]);
}
}
}

@ -1,68 +0,0 @@
//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STREAM_MAP_H
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STREAM_MAP_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
// Data structure to map a uint32_t to a data object (represented by a void*)
// Represented as a sorted array of keys, and a corresponding array of values.
// Lookups are performed with binary search.
// Adds are restricted to strictly higher keys than previously seen (this is
// guaranteed by http2).
struct grpc_chttp2_stream_map {
uint32_t* keys;
void** values;
size_t count;
size_t free;
size_t capacity;
};
void grpc_chttp2_stream_map_init(grpc_chttp2_stream_map* map,
size_t initial_capacity);
void grpc_chttp2_stream_map_destroy(grpc_chttp2_stream_map* map);
// Add a new key: given http2 semantics, new keys must always be greater than
// existing keys - this is asserted
void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map* map, uint32_t key,
void* value);
// Delete an existing key - returns the previous value of the key if it existed,
// or NULL otherwise
void* grpc_chttp2_stream_map_delete(grpc_chttp2_stream_map* map, uint32_t key);
// Return an existing key, or NULL if it does not exist
void* grpc_chttp2_stream_map_find(grpc_chttp2_stream_map* map, uint32_t key);
// Return a random entry
void* grpc_chttp2_stream_map_rand(grpc_chttp2_stream_map* map);
// How many (populated) entries are in the stream map?
size_t grpc_chttp2_stream_map_size(grpc_chttp2_stream_map* map);
// Callback on each stream
void grpc_chttp2_stream_map_for_each(grpc_chttp2_stream_map* map,
void (*f)(void* user_data, uint32_t key,
void* value),
void* user_data);
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STREAM_MAP_H

@ -25,6 +25,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "absl/container/flat_hash_map.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
@ -50,7 +51,6 @@
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h" #include "src/core/lib/debug/stats_data.h"
@ -123,8 +123,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
grpc_core::Duration next_allowed_ping_interval = grpc_core::Duration::Zero(); grpc_core::Duration next_allowed_ping_interval = grpc_core::Duration::Zero();
if (t->is_client) { if (t->is_client) {
next_allowed_ping_interval = next_allowed_ping_interval =
(t->keepalive_permit_without_calls == 0 && (t->keepalive_permit_without_calls == 0 && t->stream_map.empty())
grpc_chttp2_stream_map_size(&t->stream_map) == 0)
? grpc_core::Duration::Hours(2) ? grpc_core::Duration::Hours(2)
: grpc_core::Duration::Seconds( : grpc_core::Duration::Seconds(
1); // A second is added to deal with 1); // A second is added to deal with

@ -119,7 +119,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/transport/huffsyms.cc', 'src/core/ext/transport/chttp2/transport/huffsyms.cc',
'src/core/ext/transport/chttp2/transport/parsing.cc', 'src/core/ext/transport/chttp2/transport/parsing.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc', 'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/stream_map.cc',
'src/core/ext/transport/chttp2/transport/varint.cc', 'src/core/ext/transport/chttp2/transport/varint.cc',
'src/core/ext/transport/chttp2/transport/writing.cc', 'src/core/ext/transport/chttp2/transport/writing.cc',
'src/core/ext/transport/inproc/inproc_plugin.cc', 'src/core/ext/transport/inproc/inproc_plugin.cc',

@ -240,19 +240,6 @@ grpc_cc_test(
], ],
) )
grpc_cc_test(
name = "stream_map_test",
srcs = ["stream_map_test.cc"],
external_deps = ["gtest"],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],
)
grpc_cc_test( grpc_cc_test(
name = "streams_not_seen_test", name = "streams_not_seen_test",
srcs = ["streams_not_seen_test.cc"], srcs = ["streams_not_seen_test.cc"],

@ -1,196 +0,0 @@
//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "gtest/gtest.h"
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
// test creation & destruction
static void test_no_op(void) {
grpc_chttp2_stream_map map;
LOG_TEST("test_no_op");
grpc_chttp2_stream_map_init(&map, 8);
grpc_chttp2_stream_map_destroy(&map);
}
// test lookup on an empty map
static void test_empty_find(void) {
grpc_chttp2_stream_map map;
LOG_TEST("test_empty_find");
grpc_chttp2_stream_map_init(&map, 8);
ASSERT_EQ(nullptr, grpc_chttp2_stream_map_find(&map, 39128));
grpc_chttp2_stream_map_destroy(&map);
}
// test add & lookup
static void test_basic_add_find(uint32_t n) {
grpc_chttp2_stream_map map;
uint32_t i;
size_t got;
LOG_TEST("test_basic_add_find");
gpr_log(GPR_INFO, "n = %d", n);
grpc_chttp2_stream_map_init(&map, 8);
ASSERT_EQ(0, grpc_chttp2_stream_map_size(&map));
for (i = 1; i <= n; i++) {
grpc_chttp2_stream_map_add(&map, i, reinterpret_cast<void*>(i));
}
ASSERT_EQ(n, grpc_chttp2_stream_map_size(&map));
ASSERT_EQ(nullptr, grpc_chttp2_stream_map_find(&map, 0));
ASSERT_EQ(nullptr, grpc_chttp2_stream_map_find(&map, n + 1));
for (i = 1; i <= n; i++) {
got = reinterpret_cast<uintptr_t>(grpc_chttp2_stream_map_find(&map, i));
ASSERT_EQ(i, got);
}
grpc_chttp2_stream_map_destroy(&map);
}
// verify that for_each gets the right values during test_delete_evens_XXX
static void verify_for_each(void* user_data, uint32_t stream_id, void* ptr) {
uint32_t* for_each_check = static_cast<uint32_t*>(user_data);
ASSERT_TRUE(ptr);
ASSERT_EQ(*for_each_check, stream_id);
*for_each_check += 2;
}
static void check_delete_evens(grpc_chttp2_stream_map* map, uint32_t n) {
uint32_t for_each_check = 1;
uint32_t i;
size_t got;
ASSERT_EQ(nullptr, grpc_chttp2_stream_map_find(map, 0));
ASSERT_EQ(nullptr, grpc_chttp2_stream_map_find(map, n + 1));
for (i = 1; i <= n; i++) {
if (i & 1) {
got = reinterpret_cast<uintptr_t>(grpc_chttp2_stream_map_find(map, i));
ASSERT_EQ(i, got);
} else {
ASSERT_EQ(nullptr, grpc_chttp2_stream_map_find(map, i));
}
}
grpc_chttp2_stream_map_for_each(map, verify_for_each, &for_each_check);
if (n & 1) {
ASSERT_EQ(for_each_check, n + 2);
} else {
ASSERT_EQ(for_each_check, n + 1);
}
}
// add a bunch of keys, delete the even ones, and make sure the map is
// consistent
static void test_delete_evens_sweep(uint32_t n) {
grpc_chttp2_stream_map map;
uint32_t i;
LOG_TEST("test_delete_evens_sweep");
gpr_log(GPR_INFO, "n = %d", n);
grpc_chttp2_stream_map_init(&map, 8);
for (i = 1; i <= n; i++) {
grpc_chttp2_stream_map_add(&map, i, reinterpret_cast<void*>(i));
}
for (i = 1; i <= n; i++) {
if ((i & 1) == 0) {
ASSERT_EQ((void*)(uintptr_t)i, grpc_chttp2_stream_map_delete(&map, i));
}
}
check_delete_evens(&map, n);
grpc_chttp2_stream_map_destroy(&map);
}
// add a bunch of keys, delete the even ones immediately, and make sure the map
// is consistent
static void test_delete_evens_incremental(uint32_t n) {
grpc_chttp2_stream_map map;
uint32_t i;
LOG_TEST("test_delete_evens_incremental");
gpr_log(GPR_INFO, "n = %d", n);
grpc_chttp2_stream_map_init(&map, 8);
for (i = 1; i <= n; i++) {
grpc_chttp2_stream_map_add(&map, i, reinterpret_cast<void*>(i));
if ((i & 1) == 0) {
grpc_chttp2_stream_map_delete(&map, i);
}
}
check_delete_evens(&map, n);
grpc_chttp2_stream_map_destroy(&map);
}
// add a bunch of keys, delete old ones after some time, ensure the
// backing array does not grow
static void test_periodic_compaction(uint32_t n) {
grpc_chttp2_stream_map map;
uint32_t i;
uint32_t del;
LOG_TEST("test_periodic_compaction");
gpr_log(GPR_INFO, "n = %d", n);
grpc_chttp2_stream_map_init(&map, 16);
ASSERT_EQ(map.capacity, 16);
for (i = 1; i <= n; i++) {
grpc_chttp2_stream_map_add(&map, i, reinterpret_cast<void*>(i));
if (i > 8) {
del = i - 8;
ASSERT_EQ((void*)(uintptr_t)del,
grpc_chttp2_stream_map_delete(&map, del));
}
}
ASSERT_EQ(map.capacity, 16);
grpc_chttp2_stream_map_destroy(&map);
}
TEST(StreamMapTest, MainTest) {
uint32_t n = 1;
uint32_t prev = 1;
uint32_t tmp;
test_no_op();
test_empty_find();
while (n < 100000) {
test_basic_add_find(n);
test_delete_evens_sweep(n);
test_delete_evens_incremental(n);
test_periodic_compaction(n);
tmp = n;
n += prev;
prev = tmp;
}
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -1312,8 +1312,6 @@ src/core/ext/transport/chttp2/transport/huffsyms.h \
src/core/ext/transport/chttp2/transport/internal.h \ src/core/ext/transport/chttp2/transport/internal.h \
src/core/ext/transport/chttp2/transport/parsing.cc \ src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/stream_map.cc \
src/core/ext/transport/chttp2/transport/stream_map.h \
src/core/ext/transport/chttp2/transport/varint.cc \ src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/varint.h \ src/core/ext/transport/chttp2/transport/varint.h \
src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/chttp2/transport/writing.cc \

@ -1088,8 +1088,6 @@ src/core/ext/transport/chttp2/transport/huffsyms.h \
src/core/ext/transport/chttp2/transport/internal.h \ src/core/ext/transport/chttp2/transport/internal.h \
src/core/ext/transport/chttp2/transport/parsing.cc \ src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \ src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/stream_map.cc \
src/core/ext/transport/chttp2/transport/stream_map.h \
src/core/ext/transport/chttp2/transport/varint.cc \ src/core/ext/transport/chttp2/transport/varint.cc \
src/core/ext/transport/chttp2/transport/varint.h \ src/core/ext/transport/chttp2/transport/varint.h \
src/core/ext/transport/chttp2/transport/writing.cc \ src/core/ext/transport/chttp2/transport/writing.cc \

@ -9303,30 +9303,6 @@
], ],
"uses_polling": true "uses_polling": true
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "stream_map_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,

Loading…
Cancel
Save