Fathom TCP level changes. TracedBuffer for keeping track of all buffers

to be traced. Adding tests for Fathom and TracedBuffer. A lot more.
Please read PR description.
reviewable/pr15941/r1
Yash Tibrewal 7 years ago
parent 31c7ab1aaf
commit f0397933b0
  1. 4
      BUILD
  2. 46
      CMakeLists.txt
  3. 48
      Makefile
  4. 18
      build.yaml
  5. 2
      config.m4
  6. 2
      config.w32
  7. 4
      gRPC-C++.podspec
  8. 6
      gRPC-Core.podspec
  9. 4
      grpc.gemspec
  10. 8
      grpc.gyp
  11. 4
      package.xml
  12. 2
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  13. 2
      src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
  14. 2
      src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
  15. 3
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  16. 2
      src/core/lib/http/httpcli.cc
  17. 143
      src/core/lib/iomgr/buffer_list.cc
  18. 81
      src/core/lib/iomgr/buffer_list.h
  19. 4
      src/core/lib/iomgr/endpoint.cc
  20. 6
      src/core/lib/iomgr/endpoint.h
  21. 2
      src/core/lib/iomgr/endpoint_cfstream.cc
  22. 4
      src/core/lib/iomgr/endpoint_pair_posix.cc
  23. 18
      src/core/lib/iomgr/ev_epoll1_linux.cc
  24. 18
      src/core/lib/iomgr/ev_epollex_linux.cc
  25. 18
      src/core/lib/iomgr/ev_epollsig_linux.cc
  26. 4
      src/core/lib/iomgr/ev_posix.cc
  27. 40
      src/core/lib/iomgr/internal_errqueue.cc
  28. 76
      src/core/lib/iomgr/internal_errqueue.h
  29. 1
      src/core/lib/iomgr/port.h
  30. 2
      src/core/lib/iomgr/tcp_client_posix.cc
  31. 2
      src/core/lib/iomgr/tcp_custom.cc
  32. 287
      src/core/lib/iomgr/tcp_posix.cc
  33. 8
      src/core/lib/iomgr/tcp_posix.h
  34. 4
      src/core/lib/iomgr/tcp_server_posix.cc
  35. 2
      src/core/lib/iomgr/tcp_server_utils_posix_common.cc
  36. 2
      src/core/lib/iomgr/tcp_windows.cc
  37. 2
      src/core/lib/iomgr/udp_server.cc
  38. 4
      src/core/lib/security/transport/secure_endpoint.cc
  39. 2
      src/core/lib/security/transport/security_handshaker.cc
  40. 2
      src/python/grpcio/grpc_core_dependencies.py
  41. 2
      test/core/bad_client/bad_client.cc
  42. 2
      test/core/end2end/bad_server_response_test.cc
  43. 10
      test/core/end2end/fixtures/http_proxy_fixture.cc
  44. 13
      test/core/iomgr/BUILD
  45. 111
      test/core/iomgr/buffer_list_test.cc
  46. 7
      test/core/iomgr/endpoint_tests.cc
  47. 126
      test/core/iomgr/tcp_posix_test.cc
  48. 2
      test/core/util/mock_endpoint.cc
  49. 2
      test/core/util/passthru_endpoint.cc
  50. 5
      test/core/util/trickle_endpoint.cc
  51. 2
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  52. 2
      tools/doxygen/Doxyfile.c++.internal
  53. 4
      tools/doxygen/Doxyfile.core.internal
  54. 23
      tools/run_tests/generated/sources_and_headers.json
  55. 20
      tools/run_tests/generated/tests.json

@ -695,6 +695,7 @@ grpc_cc_library(
"src/core/lib/http/format_request.cc",
"src/core/lib/http/httpcli.cc",
"src/core/lib/http/parser.cc",
"src/core/lib/iomgr/buffer_list.cc",
"src/core/lib/iomgr/call_combiner.cc",
"src/core/lib/iomgr/combiner.cc",
"src/core/lib/iomgr/endpoint.cc",
@ -715,6 +716,7 @@ grpc_cc_library(
"src/core/lib/iomgr/gethostname_fallback.cc",
"src/core/lib/iomgr/gethostname_host_name_max.cc",
"src/core/lib/iomgr/gethostname_sysconf.cc",
"src/core/lib/iomgr/internal_errqueue.cc",
"src/core/lib/iomgr/iocp_windows.cc",
"src/core/lib/iomgr/iomgr.cc",
"src/core/lib/iomgr/iomgr_custom.cc",
@ -844,6 +846,7 @@ grpc_cc_library(
"src/core/lib/http/format_request.h",
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/buffer_list.h",
"src/core/lib/iomgr/block_annotate.h",
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/closure.h",
@ -861,6 +864,7 @@ grpc_cc_library(
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/gevent_util.h",
"src/core/lib/iomgr/internal_errqueue.h",
"src/core/lib/iomgr/iocp_windows.h",
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_custom.h",

@ -228,6 +228,9 @@ add_dependencies(buildtests_c avl_test)
add_dependencies(buildtests_c bad_server_response_test)
add_dependencies(buildtests_c bin_decoder_test)
add_dependencies(buildtests_c bin_encoder_test)
if(_gRPC_PLATFORM_LINUX)
add_dependencies(buildtests_c buffer_list_test)
endif()
add_dependencies(buildtests_c channel_create_test)
add_dependencies(buildtests_c chttp2_hpack_encoder_test)
add_dependencies(buildtests_c chttp2_stream_map_test)
@ -963,6 +966,7 @@ add_library(grpc
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@ -983,6 +987,7 @@ add_library(grpc
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@ -1364,6 +1369,7 @@ add_library(grpc_cronet
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@ -1384,6 +1390,7 @@ add_library(grpc_cronet
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@ -1754,6 +1761,7 @@ add_library(grpc_test_util
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@ -1774,6 +1782,7 @@ add_library(grpc_test_util
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@ -2062,6 +2071,7 @@ add_library(grpc_test_util_unsecure
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@ -2082,6 +2092,7 @@ add_library(grpc_test_util_unsecure
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@ -2349,6 +2360,7 @@ add_library(grpc_unsecure
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@ -2369,6 +2381,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@ -3186,6 +3199,7 @@ add_library(grpc++_cronet
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@ -3206,6 +3220,7 @@ add_library(grpc++_cronet
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@ -5761,6 +5776,37 @@ target_link_libraries(bin_encoder_test
grpc
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX)
add_executable(buffer_list_test
test/core/iomgr/buffer_list_test.cc
)
target_include_directories(buffer_list_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
)
target_link_libraries(buffer_list_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr_test_util
gpr
)
endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)

@ -969,6 +969,7 @@ avl_test: $(BINDIR)/$(CONFIG)/avl_test
bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test
bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test
bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test
buffer_list_test: $(BINDIR)/$(CONFIG)/buffer_list_test
channel_create_test: $(BINDIR)/$(CONFIG)/channel_create_test
check_epollexclusive: $(BINDIR)/$(CONFIG)/check_epollexclusive
chttp2_hpack_encoder_test: $(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test
@ -1419,6 +1420,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/bad_server_response_test \
$(BINDIR)/$(CONFIG)/bin_decoder_test \
$(BINDIR)/$(CONFIG)/bin_encoder_test \
$(BINDIR)/$(CONFIG)/buffer_list_test \
$(BINDIR)/$(CONFIG)/channel_create_test \
$(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test \
$(BINDIR)/$(CONFIG)/chttp2_stream_map_test \
@ -1929,6 +1931,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 )
$(E) "[RUN] Testing bin_encoder_test"
$(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 )
$(E) "[RUN] Testing buffer_list_test"
$(Q) $(BINDIR)/$(CONFIG)/buffer_list_test || ( echo test buffer_list_test failed ; exit 1 )
$(E) "[RUN] Testing channel_create_test"
$(Q) $(BINDIR)/$(CONFIG)/channel_create_test || ( echo test channel_create_test failed ; exit 1 )
$(E) "[RUN] Testing chttp2_hpack_encoder_test"
@ -3409,6 +3413,7 @@ LIBGRPC_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@ -3429,6 +3434,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@ -3809,6 +3815,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@ -3829,6 +3836,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@ -4197,6 +4205,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@ -4217,6 +4226,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@ -4496,6 +4506,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@ -4516,6 +4527,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@ -4761,6 +4773,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@ -4781,6 +4794,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@ -5586,6 +5600,7 @@ LIBGRPC++_CRONET_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@ -5606,6 +5621,7 @@ LIBGRPC++_CRONET_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@ -10565,6 +10581,38 @@ endif
endif
BUFFER_LIST_TEST_SRC = \
test/core/iomgr/buffer_list_test.cc \
BUFFER_LIST_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BUFFER_LIST_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/buffer_list_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/buffer_list_test: $(BUFFER_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) $(BUFFER_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/buffer_list_test
endif
$(OBJDIR)/$(CONFIG)/test/core/iomgr/buffer_list_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_buffer_list_test: $(BUFFER_LIST_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(BUFFER_LIST_TEST_OBJS:.o=.dep)
endif
endif
CHANNEL_CREATE_TEST_SRC = \
test/core/surface/channel_create_test.cc \

@ -254,6 +254,7 @@ filegroups:
- src/core/lib/http/format_request.cc
- src/core/lib/http/httpcli.cc
- src/core/lib/http/parser.cc
- src/core/lib/iomgr/buffer_list.cc
- src/core/lib/iomgr/call_combiner.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/endpoint.cc
@ -274,6 +275,7 @@ filegroups:
- src/core/lib/iomgr/gethostname_fallback.cc
- src/core/lib/iomgr/gethostname_host_name_max.cc
- src/core/lib/iomgr/gethostname_sysconf.cc
- src/core/lib/iomgr/internal_errqueue.cc
- src/core/lib/iomgr/iocp_windows.cc
- src/core/lib/iomgr/iomgr.cc
- src/core/lib/iomgr/iomgr_custom.cc
@ -432,6 +434,7 @@ filegroups:
- src/core/lib/http/httpcli.h
- src/core/lib/http/parser.h
- src/core/lib/iomgr/block_annotate.h
- src/core/lib/iomgr/buffer_list.h
- src/core/lib/iomgr/call_combiner.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
@ -447,6 +450,7 @@ filegroups:
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/gethostname.h
- src/core/lib/iomgr/internal_errqueue.h
- src/core/lib/iomgr/iocp_windows.h
- src/core/lib/iomgr/iomgr.h
- src/core/lib/iomgr/iomgr_custom.h
@ -2107,6 +2111,20 @@ targets:
- grpc_test_util
- grpc
uses_polling: false
- name: buffer_list_test
build: test
language: c
src:
- test/core/iomgr/buffer_list_test.cc
deps:
- grpc_test_util
- grpc
- gpr_test_util
- gpr
exclude_iomgrs:
- uv
platforms:
- linux
- name: channel_create_test
build: test
language: c

@ -108,6 +108,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@ -128,6 +129,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \

@ -83,6 +83,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\http\\format_request.cc " +
"src\\core\\lib\\http\\httpcli.cc " +
"src\\core\\lib\\http\\parser.cc " +
"src\\core\\lib\\iomgr\\buffer_list.cc " +
"src\\core\\lib\\iomgr\\call_combiner.cc " +
"src\\core\\lib\\iomgr\\combiner.cc " +
"src\\core\\lib\\iomgr\\endpoint.cc " +
@ -103,6 +104,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\gethostname_fallback.cc " +
"src\\core\\lib\\iomgr\\gethostname_host_name_max.cc " +
"src\\core\\lib\\iomgr\\gethostname_sysconf.cc " +
"src\\core\\lib\\iomgr\\internal_errqueue.cc " +
"src\\core\\lib\\iomgr\\iocp_windows.cc " +
"src\\core\\lib\\iomgr\\iomgr.cc " +
"src\\core\\lib\\iomgr\\iomgr_custom.cc " +

@ -378,6 +378,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/block_annotate.h',
'src/core/lib/iomgr/buffer_list.h',
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
@ -393,6 +394,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/internal_errqueue.h',
'src/core/lib/iomgr/iocp_windows.h',
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_custom.h',
@ -565,6 +567,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/block_annotate.h',
'src/core/lib/iomgr/buffer_list.h',
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
@ -580,6 +583,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/internal_errqueue.h',
'src/core/lib/iomgr/iocp_windows.h',
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_custom.h',

@ -389,6 +389,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/block_annotate.h',
'src/core/lib/iomgr/buffer_list.h',
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
@ -404,6 +405,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/internal_errqueue.h',
'src/core/lib/iomgr/iocp_windows.h',
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_custom.h',
@ -533,6 +535,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@ -553,6 +556,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
@ -979,6 +983,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/block_annotate.h',
'src/core/lib/iomgr/buffer_list.h',
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
@ -994,6 +999,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',
'src/core/lib/iomgr/internal_errqueue.h',
'src/core/lib/iomgr/iocp_windows.h',
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_custom.h',

@ -326,6 +326,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/http/httpcli.h )
s.files += %w( src/core/lib/http/parser.h )
s.files += %w( src/core/lib/iomgr/block_annotate.h )
s.files += %w( src/core/lib/iomgr/buffer_list.h )
s.files += %w( src/core/lib/iomgr/call_combiner.h )
s.files += %w( src/core/lib/iomgr/closure.h )
s.files += %w( src/core/lib/iomgr/combiner.h )
@ -341,6 +342,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/exec_ctx.h )
s.files += %w( src/core/lib/iomgr/executor.h )
s.files += %w( src/core/lib/iomgr/gethostname.h )
s.files += %w( src/core/lib/iomgr/internal_errqueue.h )
s.files += %w( src/core/lib/iomgr/iocp_windows.h )
s.files += %w( src/core/lib/iomgr/iomgr.h )
s.files += %w( src/core/lib/iomgr/iomgr_custom.h )
@ -470,6 +472,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/http/format_request.cc )
s.files += %w( src/core/lib/http/httpcli.cc )
s.files += %w( src/core/lib/http/parser.cc )
s.files += %w( src/core/lib/iomgr/buffer_list.cc )
s.files += %w( src/core/lib/iomgr/call_combiner.cc )
s.files += %w( src/core/lib/iomgr/combiner.cc )
s.files += %w( src/core/lib/iomgr/endpoint.cc )
@ -490,6 +493,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/gethostname_fallback.cc )
s.files += %w( src/core/lib/iomgr/gethostname_host_name_max.cc )
s.files += %w( src/core/lib/iomgr/gethostname_sysconf.cc )
s.files += %w( src/core/lib/iomgr/internal_errqueue.cc )
s.files += %w( src/core/lib/iomgr/iocp_windows.cc )
s.files += %w( src/core/lib/iomgr/iomgr.cc )
s.files += %w( src/core/lib/iomgr/iomgr_custom.cc )

@ -300,6 +300,7 @@
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@ -320,6 +321,7 @@
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
@ -655,6 +657,7 @@
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@ -675,6 +678,7 @@
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
@ -888,6 +892,7 @@
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@ -908,6 +913,7 @@
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
@ -1099,6 +1105,7 @@
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@ -1119,6 +1126,7 @@
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',

@ -331,6 +331,7 @@
<file baseinstalldir="/" name="src/core/lib/http/httpcli.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/http/parser.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/block_annotate.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/buffer_list.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/call_combiner.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/closure.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/combiner.h" role="src" />
@ -346,6 +347,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/gethostname.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/internal_errqueue.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iocp_windows.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr_custom.h" role="src" />
@ -475,6 +477,7 @@
<file baseinstalldir="/" name="src/core/lib/http/format_request.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/http/httpcli.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/http/parser.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/buffer_list.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/call_combiner.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/combiner.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.cc" role="src" />
@ -495,6 +498,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/gethostname_fallback.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/gethostname_host_name_max.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/gethostname_sysconf.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/internal_errqueue.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iocp_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr_custom.cc" role="src" />

@ -320,7 +320,7 @@ static void http_connect_handshaker_do_handshake(
// Take a new ref to be held by the write callback.
gpr_ref(&handshaker->refcount);
grpc_endpoint_write(args->endpoint, &handshaker->write_buffer,
&handshaker->request_done_closure);
&handshaker->request_done_closure, nullptr);
gpr_mu_unlock(&handshaker->mu);
}

@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
grpc_fd_create(fd, "client", false), args, "fd-client");
grpc_fd_create(fd, "client", true), args, "fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);

@ -44,7 +44,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
gpr_asprintf(&name, "fd:%d", fd);
grpc_endpoint* server_endpoint =
grpc_tcp_create(grpc_fd_create(fd, name, false),
grpc_tcp_create(grpc_fd_create(fd, name, true),
grpc_server_get_channel_args(server), name);
gpr_free(name);

@ -1007,7 +1007,8 @@ static void write_action(void* gt, grpc_error* error) {
grpc_endpoint_write(
t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
grpc_combiner_scheduler(t->combiner)));
grpc_combiner_scheduler(t->combiner)),
nullptr);
}
static void write_action_end_locked(void* tp, grpc_error* error) {

@ -163,7 +163,7 @@ static void done_write(void* arg, grpc_error* error) {
static void start_write(internal_request* req) {
grpc_slice_ref_internal(req->request_text);
grpc_slice_buffer_add(&req->outgoing, req->request_text);
grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write);
grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write, nullptr);
}
static void on_handshake_done(void* arg, grpc_endpoint* ep) {

@ -0,0 +1,143 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/port.h"
#include <grpc/support/log.h>
#ifdef GRPC_LINUX_ERRQUEUE
#include <time.h>
#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
void* arg) {
gpr_log(GPR_INFO, "Adding new entry %u", seq_no);
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
/* Store the current time as the sendmsg time. */
new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME);
if (*head == nullptr) {
*head = new_elem;
gpr_log(GPR_INFO, "returning");
return;
}
/* Append at the end. */
TracedBuffer* ptr = *head;
while (ptr->next_ != nullptr) {
ptr = ptr->next_;
}
ptr->next_ = new_elem;
gpr_log(GPR_INFO, "returning");
}
namespace {
void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
gts->tv_sec = ts->tv_sec;
gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec);
gts->clock_type = GPR_CLOCK_REALTIME;
}
void (*timestamps_callback)(void*, grpc_core::Timestamps*,
grpc_error* shutdown_err);
} /* namespace */
void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
struct sock_extended_err* serr,
struct scm_timestamping* tss) {
gpr_log(GPR_INFO, "Got timestamp %d", serr->ee_data);
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* elem = *head;
TracedBuffer* next = nullptr;
while (elem != nullptr) {
gpr_log(GPR_INFO, "looping");
/* The byte number refers to the sequence number of the last byte which this
* timestamp relates to. For scheduled and send, we are interested in the
* timestamp for the first byte, whereas for ack, we are interested in the
* last */
if (serr->ee_data >= elem->seq_no_) {
switch (serr->ee_info) {
case SCM_TSTAMP_SCHED:
gpr_log(GPR_INFO, "type sched\n");
fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0]));
elem = elem->next_;
break;
case SCM_TSTAMP_SND:
gpr_log(GPR_INFO, "type send\n");
fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0]));
elem = elem->next_;
break;
case SCM_TSTAMP_ACK:
gpr_log(GPR_INFO, "type ack\n");
if (serr->ee_data >= elem->seq_no_) {
fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0]));
/* Got all timestamps. Do the callback and free this TracedBuffer.
* The thing below can be passed by value if we don't want the
* restriction on the lifetime. */
timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE);
next = elem->next_;
Delete<TracedBuffer>(elem);
*head = elem = next;
break;
default:
abort();
}
}
} else {
break;
}
}
}
void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* elem = *head;
while (elem != nullptr) {
if (timestamps_callback) {
timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
}
auto* next = elem->next_;
Delete<TracedBuffer>(elem);
elem = next;
}
*head = nullptr;
GRPC_ERROR_UNREF(shutdown_err);
}
void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
grpc_core::Timestamps*,
grpc_error* error)) {
timestamps_callback = fn;
}
} /* namespace grpc_core */
#else /* GRPC_LINUX_ERRQUEUE */
namespace grpc_core {
void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
grpc_core::Timestamps*,
grpc_error* error)) {
gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform");
}
} /* namespace grpc_core */
#endif /* GRPC_LINUX_ERRQUEUE */

@ -0,0 +1,81 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
#define GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/internal_errqueue.h"
namespace grpc_core {
struct Timestamps {
gpr_timespec sendmsg_time;
gpr_timespec scheduled_time;
gpr_timespec sent_time;
gpr_timespec acked_time;
};
#ifdef GRPC_LINUX_ERRQUEUE
class TracedBuffer {
public:
/** Add a new entry in the TracedBuffer list pointed to by head */
static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no,
void* arg);
/** Processes a timestamp received */
static void ProcessTimestamp(grpc_core::TracedBuffer** head,
struct sock_extended_err* serr,
struct scm_timestamping* tss);
/** Calls the callback for each traced buffer in the list with timestamps that
* it has. */
static void Shutdown(grpc_core::TracedBuffer** head,
grpc_error* shutdown_err);
private:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
TracedBuffer(int seq_no, void* arg)
: seq_no_(seq_no), arg_(arg), next_(nullptr) {
gpr_log(GPR_INFO, "seq_no %d", seq_no_);
}
uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
void* arg_; /* The arg to pass to timestamps_callback */
grpc_core::Timestamps ts_;
grpc_core::TracedBuffer* next_;
};
#else /* GRPC_LINUX_ERRQUEUE */
class TracedBuffer {};
#endif /* GRPC_LINUX_ERRQUEUE */
/** Sets the timestamp callback */
void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
grpc_core::Timestamps*,
grpc_error* error));
}; // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H */

@ -28,8 +28,8 @@ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
ep->vtable->write(ep, slices, cb);
grpc_closure* cb, void* arg) {
ep->vtable->write(ep, slices, cb, arg);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {

@ -33,10 +33,12 @@
typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
class Timestamps;
struct grpc_endpoint_vtable {
void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
void* arg);
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
@ -72,7 +74,7 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep);
it is a valid slice buffer.
*/
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb);
grpc_closure* cb, void* arg);
/* Causes any pending and future read/write callbacks to run immediately with
success==0 */

@ -268,7 +268,7 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
grpc_closure* cb, void *arg) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",

@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_core::ExecCtx exec_ctx;
gpr_asprintf(&final_name, "%s:client", name);
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args,
"socketpair-client");
gpr_free(final_name);

@ -386,15 +386,27 @@ static bool fd_is_shutdown(grpc_fd* fd) {
}
static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
fd->read_closure->NotifyOn(closure);
if (closure != nullptr) {
fd->read_closure->NotifyOn(closure);
} else {
fd->read_closure->SetReady();
}
}
static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
if (closure != nullptr) {
fd->write_closure->NotifyOn(closure);
} else {
fd->write_closure->SetReady();
}
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
if (closure != nullptr) {
fd->error_closure->NotifyOn(closure);
} else {
fd->error_closure->SetReady();
}
}
static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {

@ -539,15 +539,27 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
}
static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
fd->read_closure->NotifyOn(closure);
if (closure != nullptr) {
fd->read_closure->NotifyOn(closure);
} else {
fd->read_closure->SetReady();
}
}
static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
if (closure != nullptr) {
fd->write_closure->NotifyOn(closure);
} else {
fd->write_closure->SetReady();
}
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
if (closure != nullptr) {
fd->error_closure->NotifyOn(closure);
} else {
fd->error_closure->SetReady();
}
}
/*******************************************************************************

@ -947,15 +947,27 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
}
static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
fd->read_closure->NotifyOn(closure);
if (closure != nullptr) {
fd->read_closure->NotifyOn(closure);
} else {
fd->read_closure->SetReady();
}
}
static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
if (closure != nullptr) {
fd->write_closure->NotifyOn(closure);
} else {
fd->write_closure->SetReady();
}
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
if (closure != nullptr) {
fd->error_closure->NotifyOn(closure);
} else {
fd->error_closure->SetReady();
}
}
/*******************************************************************************

@ -200,8 +200,8 @@ bool grpc_event_engine_can_track_errors(void) {
grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err);
return g_event_engine->fd_create(fd, name, track_err);
return g_event_engine->fd_create(fd, name,
track_err && g_event_engine->can_track_err);
}
int grpc_fd_wrapped_fd(grpc_fd* fd) {

@ -0,0 +1,40 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/internal_errqueue.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#ifdef GPR_LINUX
#include <linux/version.h>
#endif /* GPR_LINUX */
bool kernel_supports_errqueue() {
#ifdef LINUX_VERSION_CODE
#if LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0)
return true;
#endif /* LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0) */
#endif /* LINUX_VERSION_CODE */
return false;
}
#endif /* GRPC_POSIX_SOCKET_TCP */

@ -0,0 +1,76 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
#define GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <sys/types.h>
#include <time.h>
#ifdef GRPC_LINUX_ERRQUEUE
#include <linux/errqueue.h>
#include <linux/net_tstamp.h>
#include <sys/socket.h>
#endif /* GRPC_LINUX_ERRQUEUE */
namespace grpc_core {
/* Redefining scm_timestamping in the same way that <linux/errqueue.h> defines
* it, so that code compiles on systems that don't have it. */
struct scm_timestamping {
struct timespec ts[3];
};
/* Also redefine timestamp types */
/* The timestamp type for when the driver passed skb to NIC, or HW. */
constexpr int SCM_TSTAMP_SND = 0;
/* The timestamp type for when data entered the packet scheduler. */
constexpr int SCM_TSTAMP_SCHED = 1;
/* The timestamp type for when data acknowledged by peer. */
constexpr int SCM_TSTAMP_ACK = 2;
/* Redefine required constants from <linux/net_tstamp.h> */
constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1;
constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4;
constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7;
constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8;
constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9;
constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11;
constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE |
SOF_TIMESTAMPING_OPT_ID |
SOF_TIMESTAMPING_OPT_TSONLY;
constexpr uint32_t kTimestampingRecordingOptions =
SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE |
SOF_TIMESTAMPING_TX_ACK;
/* Returns true if kernel is capable of supporting errqueue and timestamping.
* Currently allowing only linux kernels above 4.0.0
*/
bool kernel_supports_errqueue();
} // namespace grpc_core
#endif /* GRPC_POSIX_SOCKET_TCP */
#endif /* GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H */

@ -60,6 +60,7 @@
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
#define GRPC_HAVE_UNIX_SOCKET 1
#define GRPC_LINUX_ERRQUEUE 1
#define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_HOST_NAME_MAX 1

@ -279,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
}
addr_str = grpc_sockaddr_to_uri(mapped_addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
*fdobj = grpc_fd_create(fd, name, false);
*fdobj = grpc_fd_create(fd, name, true);
gpr_free(name);
gpr_free(addr_str);
return GRPC_ERROR_NONE;

@ -221,7 +221,7 @@ static void custom_write_callback(grpc_custom_socket* socket,
}
static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
grpc_closure* cb) {
grpc_closure* cb, void* arg) {
custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();

@ -26,7 +26,9 @@
#include "src/core/lib/iomgr/tcp_posix.h"
#include <errno.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
@ -45,6 +47,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
@ -96,17 +99,26 @@ struct grpc_tcp {
grpc_closure read_done_closure;
grpc_closure write_done_closure;
grpc_closure error_closure;
char* peer_string;
grpc_resource_user* resource_user;
grpc_resource_user_slice_allocator slice_allocator;
grpc_core::TracedBuffer* head;
gpr_mu traced_buffer_lock;
void* outgoing_buffer_arg;
int bytes_counter;
bool socket_ts_enabled;
gpr_atm stop_error_notification;
};
struct backup_poller {
gpr_mu* pollset_mu;
grpc_closure run_poller;
};
} // namespace
#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
@ -301,6 +313,7 @@ static void tcp_free(grpc_tcp* tcp) {
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);
gpr_mu_destroy(&tcp->traced_buffer_lock);
gpr_free(tcp);
}
@ -346,6 +359,11 @@ static void tcp_destroy(grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
if (grpc_event_engine_can_track_errors()) {
// gpr_log(GPR_INFO, "stop errors");
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_notify_on_error(tcp->em_fd, nullptr);
}
TCP_UNREF(tcp, "destroy");
}
@ -512,6 +530,215 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
}
}
/** This is to be called if outgoing_buffer_arg is not null. On linux platforms,
* this will call sendmsg with socket options set to collect timestamps inside
* the kernel. On return, sent_length is set to the return value of the sendmsg
* call. Returns false if setting the socket options failed. This is not
* implemented for non-linux platforms currently, and crashes out.
*/
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length, grpc_error** error);
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
#ifdef GRPC_LINUX_ERRQUEUE
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length,
grpc_error** error) {
if (!tcp->socket_ts_enabled) {
// gpr_log(GPR_INFO, "setting options yo");
uint32_t opt = grpc_core::kTimestampingSocketOptions;
if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
static_cast<void*>(&opt), sizeof(opt)) != 0) {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp);
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
gpr_log(GPR_INFO, "failed to set");
return false;
}
tcp->socket_ts_enabled = true;
}
union {
char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
struct cmsghdr align;
} u;
cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SO_TIMESTAMPING;
cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
*reinterpret_cast<int*>(CMSG_DATA(cmsg)) =
grpc_core::kTimestampingRecordingOptions;
msg->msg_control = u.cmsg_buf;
msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
ssize_t length;
do {
GRPC_STATS_INC_SYSCALL_WRITE();
length = sendmsg(tcp->fd, msg, SENDMSG_FLAGS);
} while (length < 0 && errno == EINTR);
*sent_length = length;
/* Only save timestamps if all the bytes were taken by sendmsg. */
if (sending_length == static_cast<size_t>(length)) {
gpr_mu_lock(&tcp->traced_buffer_lock);
grpc_core::TracedBuffer::AddNewEntry(
&tcp->head, static_cast<int>(tcp->bytes_counter + length),
tcp->outgoing_buffer_arg);
gpr_mu_unlock(&tcp->traced_buffer_lock);
tcp->outgoing_buffer_arg = nullptr;
}
return true;
}
struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
struct cmsghdr* cmsg) {
auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
if (next_cmsg == nullptr) {
gpr_log(GPR_ERROR, "Received timestamp without extended error");
return cmsg;
}
if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
!(next_cmsg->cmsg_type == IP_RECVERR ||
next_cmsg->cmsg_type == IPV6_RECVERR)) {
gpr_log(GPR_ERROR, "Unexpected cmsg");
return cmsg;
}
auto tss =
reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg));
auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
if (serr->ee_errno != ENOMSG ||
serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
gpr_log(GPR_ERROR, "Unexpected cmsg");
return cmsg;
}
/* The error handling can potentially be done on another thread so we need
* to protect the traced buffer list. A lock free list might be better. Using
* a simple mutex for now. */
gpr_mu_lock(&tcp->traced_buffer_lock);
// gpr_log(GPR_INFO, "processing timestamp");
grpc_core::TracedBuffer::ProcessTimestamp(&tcp->head, serr, tss);
gpr_mu_unlock(&tcp->traced_buffer_lock);
return next_cmsg;
}
/** For linux platforms, reads the socket's error queue and processes error
* messages from the queue. Returns true if all the errors processed were
* timestamps. Returns false if the any of the errors were not timestamps. For
* non-linux platforms, error processing is not enabled currently, and hence
* crashes out.
*/
static bool process_errors(grpc_tcp* tcp) {
// gpr_log(GPR_INFO, "process errors");
while (true) {
// gpr_log(GPR_INFO, "looping");
struct iovec iov;
iov.iov_base = nullptr;
iov.iov_len = 0;
struct msghdr msg;
msg.msg_name = nullptr;
msg.msg_namelen = 0;
msg.msg_iov = &iov;
msg.msg_iovlen = 0;
msg.msg_flags = 0;
union {
char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) +
CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/];
struct cmsghdr align;
} aligned_buf;
memset(&aligned_buf, 0, sizeof(aligned_buf));
msg.msg_control = aligned_buf.rbuf;
msg.msg_controllen = sizeof(aligned_buf.rbuf);
int r, saved_errno;
do {
// gpr_log(GPR_INFO, "error recvmsg");
r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE);
saved_errno = errno;
} while (r < 0 && saved_errno == EINTR);
if (r == -1 && saved_errno == EAGAIN) {
// gpr_log(GPR_INFO, "here");
return true; /* No more errors to process */
}
if (r == -1) {
// gpr_log(GPR_INFO, "%d", saved_errno);
return false;
}
if ((msg.msg_flags & MSG_CTRUNC) == 1) {
gpr_log(GPR_INFO, "Error message was truncated.");
}
// gpr_log(GPR_INFO, "%d %lu", r, msg.msg_controllen);
if (msg.msg_controllen == 0) {
/* There was no control message read. Return now */
return true;
}
for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET ||
cmsg->cmsg_type != SCM_TIMESTAMPING) {
/* Got a weird one, not a timestamp */
gpr_log(GPR_INFO, "weird %d %d %d", r, cmsg->cmsg_level,
cmsg->cmsg_type);
continue;
}
process_timestamp(tcp, &msg, cmsg);
}
}
}
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
// gpr_log(GPR_INFO, "grpc_tcp_handle_error");
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
}
if (error != GRPC_ERROR_NONE ||
static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
/* We aren't going to register to hear on error anymore, so it is safe to
* unref. */
// gpr_log(GPR_INFO, "%p %d", error,
// static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification)));
// gpr_log(GPR_INFO, "unref");
grpc_core::TracedBuffer::Shutdown(&tcp->head, GRPC_ERROR_REF(error));
TCP_UNREF(tcp, "error");
// gpr_log(GPR_INFO, "here");
} else {
if (!process_errors(tcp)) {
// gpr_log(GPR_INFO, "no timestamps");
/* This was not a timestamps error. This was an actual error. Set the
* read and write closures to be ready.
*/
grpc_fd_notify_on_read(tcp->em_fd, nullptr);
grpc_fd_notify_on_write(tcp->em_fd, nullptr);
}
GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
// gpr_log(GPR_INFO, "udhar se");
}
}
#else /* GRPC_LINUX_ERRQUEUE */
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length,
grpc_error** error) {
gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
GPR_ASSERT(0);
return false;
}
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
GPR_ASSERT(0);
}
#endif /* GRPC_LINUX_ERRQUEUE */
/* returns true if done, false if pending; if returning true, *error is set */
#define MAX_WRITE_IOVEC 1000
static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
@ -552,19 +779,26 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iov_size;
msg.msg_control = nullptr;
msg.msg_controllen = 0;
msg.msg_flags = 0;
GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
GPR_TIMER_SCOPE("sendmsg", 1);
do {
/* TODO(klempner): Cork if this is a partial write */
GRPC_STATS_INC_SYSCALL_WRITE();
sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
} while (sent_length < 0 && errno == EINTR);
if (tcp->outgoing_buffer_arg != nullptr) {
if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
error))
return true; /* something went wrong with timestamps */
} else {
msg.msg_control = nullptr;
msg.msg_controllen = 0;
GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
GPR_TIMER_SCOPE("sendmsg", 1);
do {
/* TODO(klempner): Cork if this is a partial write */
GRPC_STATS_INC_SYSCALL_WRITE();
sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
} while (sent_length < 0 && errno == EINTR);
}
// gpr_log(GPR_INFO, "sent length %ld", sent_length);
if (sent_length < 0) {
if (errno == EAGAIN) {
@ -588,6 +822,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
}
GPR_ASSERT(tcp->outgoing_byte_idx == 0);
tcp->bytes_counter += sent_length;
trailing = sending_length - static_cast<size_t>(sent_length);
while (trailing > 0) {
size_t slice_length;
@ -602,7 +837,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
trailing -= slice_length;
}
}
if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
*error = GRPC_ERROR_NONE;
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
@ -635,14 +869,14 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
// gpr_log(GPR_INFO, "scheduling callback");
GRPC_CLOSURE_SCHED(cb, error);
TCP_UNREF(tcp, "write");
}
}
static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
grpc_closure* cb) {
grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("tcp_write", 0);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_error* error = GRPC_ERROR_NONE;
@ -670,6 +904,8 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
tcp->outgoing_buffer = buf;
tcp->outgoing_byte_idx = 0;
tcp->outgoing_buffer_arg = arg;
if (arg) GPR_ASSERT(grpc_event_engine_can_track_errors());
if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
@ -677,16 +913,20 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "write: delayed");
}
// gpr_log(GPR_INFO, "notify");
notify_on_write(tcp);
} else {
if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
// gpr_log(GPR_INFO, "sched");
GRPC_CLOSURE_SCHED(cb, error);
}
}
namespace {} /* namespace */
static void tcp_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_pollset_add_fd(pollset, tcp->em_fd);
@ -787,6 +1027,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->bytes_read_this_round = 0;
/* Will be set to false by the very first endpoint read function */
tcp->is_first_read = true;
tcp->bytes_counter = -1;
tcp->socket_ts_enabled = false;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
@ -798,6 +1040,16 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
grpc_resource_quota_unref_internal(resource_quota);
gpr_mu_init(&tcp->traced_buffer_lock);
tcp->head = nullptr;
/* Start being notified on errors if event engine can track errors. */
if (grpc_event_engine_can_track_errors()) {
TCP_REF(tcp, "error");
gpr_atm_rel_store(&tcp->stop_error_notification, 0);
GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
}
return &tcp->base;
}
@ -816,6 +1068,11 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
tcp->release_fd = fd;
tcp->release_fd_cb = done;
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
if (grpc_event_engine_can_track_errors()) {
// gpr_log(GPR_INFO, "stop errors");
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_notify_on_error(tcp->em_fd, nullptr);
}
TCP_UNREF(tcp, "destroy");
}

@ -31,7 +31,10 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
@ -54,4 +57,9 @@ int grpc_tcp_fd(grpc_endpoint* ep);
void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
grpc_closure* done);
/** Sets the callback function to call when timestamps for a write are
* collected. */
void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
grpc_core::Timestamps*));
#endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */

@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
grpc_fd* fdobj = grpc_fd_create(fd, name, false);
grpc_fd* fdobj = grpc_fd_create(fd, name, true);
read_notifier_pollset =
sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
listener->sibling = sp;
sp->server = listener->server;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name, false);
sp->emfd = grpc_fd_create(fd, name, true);
memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;

@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd,
s->tail = sp;
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name, false);
sp->emfd = grpc_fd_create(fd, name, true);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;

@ -296,7 +296,7 @@ static void on_write(void* tcpp, grpc_error* error) {
/* Initiates a write. */
static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
grpc_closure* cb, void* arg) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* socket = tcp->socket;
grpc_winsocket_callback_info* info = &socket->write_info;

@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
emfd_ = grpc_fd_create(fd, name, false);
emfd_ = grpc_fd_create(fd, name, true);
memcpy(&addr_, addr, sizeof(grpc_resolved_address));
GPR_ASSERT(emfd_);
gpr_free(name);

@ -254,7 +254,7 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
}
static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
unsigned i;
@ -342,7 +342,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
return;
}
grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
}
static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {

@ -259,7 +259,7 @@ static grpc_error* on_handshake_next_done_locked(
grpc_slice_buffer_reset_and_unref_internal(&h->outgoing);
grpc_slice_buffer_add(&h->outgoing, to_send);
grpc_endpoint_write(h->args->endpoint, &h->outgoing,
&h->on_handshake_data_sent_to_peer);
&h->on_handshake_data_sent_to_peer, nullptr);
} else if (handshaker_result == nullptr) {
// There is nothing to send, but need to read from peer.
grpc_endpoint_read(h->args->endpoint, h->args->read_buffer,

@ -82,6 +82,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@ -102,6 +103,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',

@ -115,7 +115,7 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags,
grpc_schedule_on_exec_ctx);
/* Write data */
grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure);
grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure, nullptr);
grpc_core::ExecCtx::Get()->Flush();
/* Await completion, unless the request is large and write may not finish

@ -104,7 +104,7 @@ static void handle_write() {
grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer);
grpc_slice_buffer_add(&state.outgoing_buffer, slice);
grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write);
grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr);
}
static void handle_read(void* arg, grpc_error* error) {

@ -201,7 +201,7 @@ static void on_client_write_done(void* arg, grpc_error* error) {
&conn->client_write_buffer);
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_client_write_done);
&conn->on_client_write_done, nullptr);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "write_done");
@ -226,7 +226,7 @@ static void on_server_write_done(void* arg, grpc_error* error) {
&conn->server_write_buffer);
conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
&conn->on_server_write_done);
&conn->on_server_write_done, nullptr);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "server_write");
@ -257,7 +257,7 @@ static void on_client_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "client_read");
conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
&conn->on_server_write_done);
&conn->on_server_write_done, nullptr);
}
// Read more data.
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
@ -288,7 +288,7 @@ static void on_server_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "server_read");
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_client_write_done);
&conn->on_client_write_done, nullptr);
}
// Read more data.
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
@ -340,7 +340,7 @@ static void on_server_connect_done(void* arg, grpc_error* error) {
grpc_slice_buffer_add(&conn->client_write_buffer, slice);
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_write_response_done);
&conn->on_write_response_done, nullptr);
}
/**

@ -233,6 +233,19 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "buffer_list_test",
srcs = ["buffer_list_test.cc"],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "tcp_server_posix_test",
srcs = ["tcp_server_posix_test.cc"],

@ -0,0 +1,111 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include <grpc/grpc.h>
#include "test/core/util/test_config.h"
#ifdef GRPC_LINUX_ERRQUEUE
static void TestShutdownFlushesListVerifier(void* arg,
grpc_core::Timestamps* ts,
grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(arg != nullptr);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
/** Tests that all TracedBuffer elements in the list are flushed out on
* shutdown.
* Also tests that arg is passed correctly.
*/
static void TestShutdownFlushesList() {
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestShutdownFlushesListVerifier);
grpc_core::TracedBuffer* list = nullptr;
#define NUM_ELEM 5
gpr_atm verifier_called[NUM_ELEM];
for (auto i = 0; i < NUM_ELEM; i++) {
gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
grpc_core::TracedBuffer::AddNewEntry(
&list, i, static_cast<void*>(&verifier_called[i]));
}
grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
GPR_ASSERT(list == nullptr);
for (auto i = 0; i < NUM_ELEM; i++) {
GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) ==
static_cast<gpr_atm>(1));
}
}
static void TestVerifierCalledOnAckVerifier(void* arg,
grpc_core::Timestamps* ts,
grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(arg != nullptr);
GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->acked_time.tv_sec == 123);
GPR_ASSERT(ts->acked_time.tv_nsec == 456);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
/** Tests that the timestamp verifier is called on an ACK timestamp.
*/
static void TestVerifierCalledOnAck() {
struct sock_extended_err serr;
serr.ee_data = 213;
serr.ee_info = grpc_core::SCM_TSTAMP_ACK;
struct grpc_core::scm_timestamping tss;
tss.ts[0].tv_sec = 123;
tss.ts[0].tv_nsec = 456;
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestVerifierCalledOnAckVerifier);
grpc_core::TracedBuffer* list = nullptr;
gpr_atm verifier_called;
gpr_atm_rel_store(&verifier_called, static_cast<gpr_atm>(0));
grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called);
grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss);
GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast<gpr_atm>(1));
GPR_ASSERT(list == nullptr);
grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
}
static void TestTcpBufferList() {
TestVerifierCalledOnAck();
TestShutdownFlushesList();
}
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
grpc_init();
TestTcpBufferList();
grpc_shutdown();
return 0;
}
#else /* GRPC_LINUX_ERRQUEUE */
int main(int argc, char** argv) { return 1; }
#endif /* GRPC_LINUX_ERRQUEUE */

@ -150,8 +150,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) {
&state->current_write_data);
grpc_slice_buffer_reset_and_unref(&state->outgoing);
grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
grpc_endpoint_write(state->write_ep, &state->outgoing,
&state->done_write);
grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
nullptr);
gpr_free(slices);
return;
}
@ -294,7 +294,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
grpc_endpoint_write(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
grpc_schedule_on_exec_ctx),
nullptr);
wait_for_fail_count(&fail_count, 3);
grpc_endpoint_shutdown(f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));

@ -36,6 +36,9 @@
#include <grpc/support/time.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/iomgr/endpoint_tests.h"
#include "test/core/util/test_config.h"
@ -68,6 +71,48 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
static void create_inet_sockets(int sv[2]) {
gpr_log(GPR_INFO, "create sockets");
/* Prepare listening socket */
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
int sock = socket(AF_INET, SOCK_STREAM, 0);
GPR_ASSERT(sock);
GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
listen(sock, 1);
/* Prepare client socket and connect to server */
socklen_t len = sizeof(sockaddr_in);
GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
gpr_log(GPR_INFO, "%d\n", addr.sin_port);
char* addra = inet_ntoa(addr.sin_addr);
gpr_log(GPR_INFO, "%s\n", addra);
int client = socket(AF_INET, SOCK_STREAM, 0);
GPR_ASSERT(client);
int ret;
do {
ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in));
} while (ret == -1 && errno == EINTR);
/* Accept client connection */
len = sizeof(socklen_t);
int server;
do {
server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len);
} while (server == -1 && errno == EINTR);
GPR_ASSERT(server != -1);
sv[0] = server;
sv[1] = client;
int flags = fcntl(sv[0], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
static ssize_t fill_socket(int fd) {
ssize_t write_bytes;
ssize_t total_bytes = 0;
@ -289,6 +334,7 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
static void write_done(void* user_data /* write_socket_state */,
grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
struct write_socket_state* state =
static_cast<struct write_socket_state*>(user_data);
gpr_log(GPR_INFO, "Write done callback called");
@ -314,17 +360,22 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
for (;;) {
grpc_pollset_worker* worker = nullptr;
gpr_log(GPR_INFO, "in loop");
gpr_mu_lock(g_mu);
gpr_log(GPR_INFO, "in locked polling");
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(g_pollset, &worker,
grpc_timespec_to_millis_round_up(
grpc_timeout_milliseconds_to_deadline(10)))));
gpr_log(GPR_INFO, "done locked polling");
gpr_mu_unlock(g_mu);
do {
gpr_log(GPR_INFO, "doing a read");
bytes_read =
read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
gpr_log(GPR_INFO, "done with read");
} while (bytes_read < 0 && errno == EINTR);
GPR_ASSERT(bytes_read >= 0);
for (i = 0; i < bytes_read; ++i) {
@ -340,10 +391,24 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
gpr_free(buf);
}
/* Verifier for timestamps callback for write_test */
void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(arg != nullptr);
GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
gpr_atm* done_timestamps = (gpr_atm*)arg;
gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
}
/* Write to a socket using the grpc_tcp API, then drain it directly.
Note that if the write does not complete immediately we need to drain the
socket in parallel with the read. */
static void write_test(size_t num_bytes, size_t slice_size) {
socket in parallel with the read. If collect_timestamps is true, it will
try to get timestamps for the write. */
static void write_test(size_t num_bytes, size_t slice_size,
bool collect_timestamps) {
int sv[2];
grpc_endpoint* ep;
struct write_socket_state state;
@ -356,19 +421,27 @@ static void write_test(size_t num_bytes, size_t slice_size) {
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
grpc_core::ExecCtx exec_ctx;
if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
return;
}
gpr_log(GPR_INFO,
"Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
num_bytes, slice_size);
create_sockets(sv);
if (collect_timestamps) {
create_inet_sockets(sv);
} else {
create_sockets(sv);
}
grpc_arg a[1];
a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
a[0].type = GRPC_ARG_INTEGER,
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args,
"test");
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
&args, "test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
state.ep = ep;
@ -381,18 +454,28 @@ static void write_test(size_t num_bytes, size_t slice_size) {
GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
grpc_schedule_on_exec_ctx);
grpc_endpoint_write(ep, &outgoing, &write_done_closure);
gpr_atm done_timestamps;
gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0));
grpc_endpoint_write(ep, &outgoing, &write_done_closure,
grpc_event_engine_can_track_errors() && collect_timestamps
? (void*)&done_timestamps
: nullptr);
gpr_log(GPR_INFO, "about to drain");
drain_socket_blocking(sv[0], num_bytes, num_bytes);
gpr_log(GPR_INFO, "done drain");
exec_ctx.Flush();
gpr_mu_lock(g_mu);
for (;;) {
grpc_pollset_worker* worker = nullptr;
if (state.write_done) {
if (state.write_done &&
(!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) {
break;
}
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
exec_ctx.Flush();
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
@ -488,6 +571,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
}
void run_tests(void) {
gpr_log(GPR_INFO, "run tests");
size_t i = 0;
read_test(100, 8192);
@ -496,15 +580,25 @@ void run_tests(void) {
read_test(10000, 1);
large_read_test(8192);
large_read_test(1);
write_test(100, 8192);
write_test(100, 1);
write_test(100000, 8192);
write_test(100000, 1);
write_test(100000, 137);
gpr_log(GPR_INFO, "done read tests");
write_test(100, 8192, false);
write_test(100, 1, false);
write_test(100000, 8192, false);
write_test(100000, 1, false);
write_test(100000, 137, false);
gpr_log(GPR_INFO, "done normal write tests");
write_test(100, 8192, true);
write_test(100, 1, true);
write_test(100000, 8192, true);
write_test(100000, 1, true);
write_test(100, 137, true);
gpr_log(GPR_INFO, "done super write tests");
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
write_test(40320, i);
write_test(40320, i, false);
write_test(40320, i, true);
}
release_fd_test(100, 8192);
@ -549,6 +643,8 @@ int main(int argc, char** argv) {
grpc_closure destroyed;
grpc_test_init(argc, argv);
grpc_init();
gpr_log(GPR_INFO, "here");
grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
{
grpc_core::ExecCtx exec_ctx;
g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));

@ -55,7 +55,7 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
grpc_closure* cb, void* arg) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
for (size_t i = 0; i < slices->count; i++) {
m->on_write(slices->slices[i]);

@ -76,7 +76,7 @@ static half* other_half(half* h) {
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
grpc_closure* cb, void* arg) {
half* m = other_half(reinterpret_cast<half*>(ep));
gpr_mu_lock(&m->parent->mu);
grpc_error* error = GRPC_ERROR_NONE;

@ -62,7 +62,7 @@ static void maybe_call_write_cb_locked(trickle_endpoint* te) {
}
static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
grpc_closure* cb, void* arg) {
trickle_endpoint* te = reinterpret_cast<trickle_endpoint*>(ep);
gpr_mu_lock(&te->mu);
GPR_ASSERT(te->write_cb == nullptr);
@ -186,7 +186,8 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) {
te->last_write = now;
grpc_endpoint_write(
te->wrapped, &te->writing_buffer,
GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx));
GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx),
nullptr);
maybe_call_write_cb_locked(te);
}
}

@ -96,7 +96,7 @@ class DummyEndpoint : public grpc_endpoint {
}
static void write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
grpc_closure* cb, void* arg) {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
}

@ -1065,6 +1065,7 @@ src/core/lib/http/format_request.h \
src/core/lib/http/httpcli.h \
src/core/lib/http/parser.h \
src/core/lib/iomgr/block_annotate.h \
src/core/lib/iomgr/buffer_list.h \
src/core/lib/iomgr/call_combiner.h \
src/core/lib/iomgr/closure.h \
src/core/lib/iomgr/combiner.h \
@ -1080,6 +1081,7 @@ src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/exec_ctx.h \
src/core/lib/iomgr/executor.h \
src/core/lib/iomgr/gethostname.h \
src/core/lib/iomgr/internal_errqueue.h \
src/core/lib/iomgr/iocp_windows.h \
src/core/lib/iomgr/iomgr.h \
src/core/lib/iomgr/iomgr_custom.h \

@ -1153,6 +1153,8 @@ src/core/lib/http/parser.cc \
src/core/lib/http/parser.h \
src/core/lib/iomgr/README.md \
src/core/lib/iomgr/block_annotate.h \
src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/buffer_list.h \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/call_combiner.h \
src/core/lib/iomgr/closure.h \
@ -1188,6 +1190,8 @@ src/core/lib/iomgr/gethostname.h \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/internal_errqueue.h \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iocp_windows.h \
src/core/lib/iomgr/iomgr.cc \

@ -163,6 +163,23 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c",
"name": "buffer_list_test",
"src": [
"test/core/iomgr/buffer_list_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
@ -9385,6 +9402,7 @@
"src/core/lib/http/format_request.cc",
"src/core/lib/http/httpcli.cc",
"src/core/lib/http/parser.cc",
"src/core/lib/iomgr/buffer_list.cc",
"src/core/lib/iomgr/call_combiner.cc",
"src/core/lib/iomgr/combiner.cc",
"src/core/lib/iomgr/endpoint.cc",
@ -9405,6 +9423,7 @@
"src/core/lib/iomgr/gethostname_fallback.cc",
"src/core/lib/iomgr/gethostname_host_name_max.cc",
"src/core/lib/iomgr/gethostname_sysconf.cc",
"src/core/lib/iomgr/internal_errqueue.cc",
"src/core/lib/iomgr/iocp_windows.cc",
"src/core/lib/iomgr/iomgr.cc",
"src/core/lib/iomgr/iomgr_custom.cc",
@ -9564,6 +9583,7 @@
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/block_annotate.h",
"src/core/lib/iomgr/buffer_list.h",
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/combiner.h",
@ -9579,6 +9599,7 @@
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/internal_errqueue.h",
"src/core/lib/iomgr/iocp_windows.h",
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_custom.h",
@ -9714,6 +9735,7 @@
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/block_annotate.h",
"src/core/lib/iomgr/buffer_list.h",
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/combiner.h",
@ -9729,6 +9751,7 @@
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/internal_errqueue.h",
"src/core/lib/iomgr/iocp_windows.h",
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_custom.h",

@ -195,6 +195,26 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [
"uv"
],
"flaky": false,
"gtest": false,
"language": "c",
"name": "buffer_list_test",
"platforms": [
"linux"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save