diff --git a/BUILD b/BUILD
index e1b00a5cf0a..c01b971281d 100644
--- a/BUILD
+++ b/BUILD
@@ -696,6 +696,7 @@ grpc_cc_library(
"src/core/lib/http/format_request.cc",
"src/core/lib/http/httpcli.cc",
"src/core/lib/http/parser.cc",
+ "src/core/lib/iomgr/buffer_list.cc",
"src/core/lib/iomgr/call_combiner.cc",
"src/core/lib/iomgr/combiner.cc",
"src/core/lib/iomgr/endpoint.cc",
@@ -716,6 +717,7 @@ grpc_cc_library(
"src/core/lib/iomgr/gethostname_fallback.cc",
"src/core/lib/iomgr/gethostname_host_name_max.cc",
"src/core/lib/iomgr/gethostname_sysconf.cc",
+ "src/core/lib/iomgr/internal_errqueue.cc",
"src/core/lib/iomgr/iocp_windows.cc",
"src/core/lib/iomgr/iomgr.cc",
"src/core/lib/iomgr/iomgr_custom.cc",
@@ -845,6 +847,7 @@ grpc_cc_library(
"src/core/lib/http/format_request.h",
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
+ "src/core/lib/iomgr/buffer_list.h",
"src/core/lib/iomgr/block_annotate.h",
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/closure.h",
@@ -862,6 +865,7 @@ grpc_cc_library(
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/gethostname.h",
"src/core/lib/iomgr/gevent_util.h",
+ "src/core/lib/iomgr/internal_errqueue.h",
"src/core/lib/iomgr/iocp_windows.h",
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_custom.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 32d7a9b94d3..3895d4c0f17 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -230,6 +230,9 @@ add_dependencies(buildtests_c avl_test)
add_dependencies(buildtests_c bad_server_response_test)
add_dependencies(buildtests_c bin_decoder_test)
add_dependencies(buildtests_c bin_encoder_test)
+if(_gRPC_PLATFORM_LINUX)
+add_dependencies(buildtests_c buffer_list_test)
+endif()
add_dependencies(buildtests_c channel_create_test)
add_dependencies(buildtests_c chttp2_hpack_encoder_test)
add_dependencies(buildtests_c chttp2_stream_map_test)
@@ -331,6 +334,7 @@ if(_gRPC_PLATFORM_LINUX)
add_dependencies(buildtests_c httpscli_test)
endif()
add_dependencies(buildtests_c init_test)
+add_dependencies(buildtests_c inproc_callback_test)
add_dependencies(buildtests_c invalid_call_argument_test)
add_dependencies(buildtests_c json_rewrite)
add_dependencies(buildtests_c json_rewrite_test)
@@ -958,6 +962,7 @@ add_library(grpc
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
+ src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@@ -978,6 +983,7 @@ add_library(grpc
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
+ src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@@ -1364,6 +1370,7 @@ add_library(grpc_cronet
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
+ src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@@ -1384,6 +1391,7 @@ add_library(grpc_cronet
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
+ src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@@ -1756,6 +1764,7 @@ add_library(grpc_test_util
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
+ src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@@ -1776,6 +1785,7 @@ add_library(grpc_test_util
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
+ src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@@ -2064,6 +2074,7 @@ add_library(grpc_test_util_unsecure
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
+ src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@@ -2084,6 +2095,7 @@ add_library(grpc_test_util_unsecure
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
+ src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@@ -2351,6 +2363,7 @@ add_library(grpc_unsecure
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
+ src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@@ -2371,6 +2384,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
+ src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@@ -3191,6 +3205,7 @@ add_library(grpc++_cronet
src/core/lib/http/format_request.cc
src/core/lib/http/httpcli.cc
src/core/lib/http/parser.cc
+ src/core/lib/iomgr/buffer_list.cc
src/core/lib/iomgr/call_combiner.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/endpoint.cc
@@ -3211,6 +3226,7 @@ add_library(grpc++_cronet
src/core/lib/iomgr/gethostname_fallback.cc
src/core/lib/iomgr/gethostname_host_name_max.cc
src/core/lib/iomgr/gethostname_sysconf.cc
+ src/core/lib/iomgr/internal_errqueue.cc
src/core/lib/iomgr/iocp_windows.cc
src/core/lib/iomgr/iomgr.cc
src/core/lib/iomgr/iomgr_custom.cc
@@ -5834,6 +5850,37 @@ target_link_libraries(bin_encoder_test
grpc
)
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+if(_gRPC_PLATFORM_LINUX)
+
+add_executable(buffer_list_test
+ test/core/iomgr/buffer_list_test.cc
+)
+
+
+target_include_directories(buffer_list_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
+ PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
+ PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
+ PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
+ PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
+)
+
+target_link_libraries(buffer_list_test
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ grpc
+ gpr_test_util
+ gpr
+)
+
+endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
@@ -7893,6 +7940,35 @@ target_link_libraries(init_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
+add_executable(inproc_callback_test
+ test/core/end2end/inproc_callback_test.cc
+)
+
+
+target_include_directories(inproc_callback_test
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+ PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
+ PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
+ PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
+ PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
+ PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
+ PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
+)
+
+target_link_libraries(inproc_callback_test
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ grpc
+ gpr_test_util
+ gpr
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
add_executable(invalid_call_argument_test
test/core/end2end/invalid_call_argument_test.cc
)
diff --git a/Makefile b/Makefile
index 3454ef85870..7caf585ee9e 100644
--- a/Makefile
+++ b/Makefile
@@ -978,6 +978,7 @@ avl_test: $(BINDIR)/$(CONFIG)/avl_test
bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test
bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test
bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test
+buffer_list_test: $(BINDIR)/$(CONFIG)/buffer_list_test
channel_create_test: $(BINDIR)/$(CONFIG)/channel_create_test
check_epollexclusive: $(BINDIR)/$(CONFIG)/check_epollexclusive
chttp2_hpack_encoder_test: $(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test
@@ -1053,6 +1054,7 @@ httpcli_format_request_test: $(BINDIR)/$(CONFIG)/httpcli_format_request_test
httpcli_test: $(BINDIR)/$(CONFIG)/httpcli_test
httpscli_test: $(BINDIR)/$(CONFIG)/httpscli_test
init_test: $(BINDIR)/$(CONFIG)/init_test
+inproc_callback_test: $(BINDIR)/$(CONFIG)/inproc_callback_test
invalid_call_argument_test: $(BINDIR)/$(CONFIG)/invalid_call_argument_test
json_fuzzer_test: $(BINDIR)/$(CONFIG)/json_fuzzer_test
json_rewrite: $(BINDIR)/$(CONFIG)/json_rewrite
@@ -1433,6 +1435,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/bad_server_response_test \
$(BINDIR)/$(CONFIG)/bin_decoder_test \
$(BINDIR)/$(CONFIG)/bin_encoder_test \
+ $(BINDIR)/$(CONFIG)/buffer_list_test \
$(BINDIR)/$(CONFIG)/channel_create_test \
$(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test \
$(BINDIR)/$(CONFIG)/chttp2_stream_map_test \
@@ -1500,6 +1503,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/httpcli_test \
$(BINDIR)/$(CONFIG)/httpscli_test \
$(BINDIR)/$(CONFIG)/init_test \
+ $(BINDIR)/$(CONFIG)/inproc_callback_test \
$(BINDIR)/$(CONFIG)/invalid_call_argument_test \
$(BINDIR)/$(CONFIG)/json_rewrite \
$(BINDIR)/$(CONFIG)/json_rewrite_test \
@@ -1948,6 +1952,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 )
$(E) "[RUN] Testing bin_encoder_test"
$(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 )
+ $(E) "[RUN] Testing buffer_list_test"
+ $(Q) $(BINDIR)/$(CONFIG)/buffer_list_test || ( echo test buffer_list_test failed ; exit 1 )
$(E) "[RUN] Testing channel_create_test"
$(Q) $(BINDIR)/$(CONFIG)/channel_create_test || ( echo test channel_create_test failed ; exit 1 )
$(E) "[RUN] Testing chttp2_hpack_encoder_test"
@@ -2076,6 +2082,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/httpscli_test || ( echo test httpscli_test failed ; exit 1 )
$(E) "[RUN] Testing init_test"
$(Q) $(BINDIR)/$(CONFIG)/init_test || ( echo test init_test failed ; exit 1 )
+ $(E) "[RUN] Testing inproc_callback_test"
+ $(Q) $(BINDIR)/$(CONFIG)/inproc_callback_test || ( echo test inproc_callback_test failed ; exit 1 )
$(E) "[RUN] Testing invalid_call_argument_test"
$(Q) $(BINDIR)/$(CONFIG)/invalid_call_argument_test || ( echo test invalid_call_argument_test failed ; exit 1 )
$(E) "[RUN] Testing json_rewrite_test"
@@ -3456,6 +3464,7 @@ LIBGRPC_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
+ src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@@ -3476,6 +3485,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
+ src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@@ -3861,6 +3871,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
+ src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@@ -3881,6 +3892,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
+ src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@@ -4251,6 +4263,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
+ src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@@ -4271,6 +4284,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
+ src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@@ -4550,6 +4564,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
+ src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@@ -4570,6 +4585,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
+ src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@@ -4815,6 +4831,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
+ src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@@ -4835,6 +4852,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
+ src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@@ -5643,6 +5661,7 @@ LIBGRPC++_CRONET_SRC = \
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
+ src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@@ -5663,6 +5682,7 @@ LIBGRPC++_CRONET_SRC = \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
+ src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
@@ -10697,6 +10717,38 @@ endif
endif
+BUFFER_LIST_TEST_SRC = \
+ test/core/iomgr/buffer_list_test.cc \
+
+BUFFER_LIST_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BUFFER_LIST_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/buffer_list_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/buffer_list_test: $(BUFFER_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LD) $(LDFLAGS) $(BUFFER_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/buffer_list_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/iomgr/buffer_list_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_buffer_list_test: $(BUFFER_LIST_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(BUFFER_LIST_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
CHANNEL_CREATE_TEST_SRC = \
test/core/surface/channel_create_test.cc \
@@ -13115,6 +13167,38 @@ endif
endif
+INPROC_CALLBACK_TEST_SRC = \
+ test/core/end2end/inproc_callback_test.cc \
+
+INPROC_CALLBACK_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(INPROC_CALLBACK_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/inproc_callback_test: openssl_dep_error
+
+else
+
+
+
+$(BINDIR)/$(CONFIG)/inproc_callback_test: $(INPROC_CALLBACK_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LD) $(LDFLAGS) $(INPROC_CALLBACK_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/inproc_callback_test
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/end2end/inproc_callback_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_inproc_callback_test: $(INPROC_CALLBACK_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(INPROC_CALLBACK_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
INVALID_CALL_ARGUMENT_TEST_SRC = \
test/core/end2end/invalid_call_argument_test.cc \
diff --git a/build.yaml b/build.yaml
index aef1954a0af..a7ebee7572e 100644
--- a/build.yaml
+++ b/build.yaml
@@ -256,6 +256,7 @@ filegroups:
- src/core/lib/http/format_request.cc
- src/core/lib/http/httpcli.cc
- src/core/lib/http/parser.cc
+ - src/core/lib/iomgr/buffer_list.cc
- src/core/lib/iomgr/call_combiner.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/endpoint.cc
@@ -276,6 +277,7 @@ filegroups:
- src/core/lib/iomgr/gethostname_fallback.cc
- src/core/lib/iomgr/gethostname_host_name_max.cc
- src/core/lib/iomgr/gethostname_sysconf.cc
+ - src/core/lib/iomgr/internal_errqueue.cc
- src/core/lib/iomgr/iocp_windows.cc
- src/core/lib/iomgr/iomgr.cc
- src/core/lib/iomgr/iomgr_custom.cc
@@ -434,6 +436,7 @@ filegroups:
- src/core/lib/http/httpcli.h
- src/core/lib/http/parser.h
- src/core/lib/iomgr/block_annotate.h
+ - src/core/lib/iomgr/buffer_list.h
- src/core/lib/iomgr/call_combiner.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
@@ -449,6 +452,7 @@ filegroups:
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/gethostname.h
+ - src/core/lib/iomgr/internal_errqueue.h
- src/core/lib/iomgr/iocp_windows.h
- src/core/lib/iomgr/iomgr.h
- src/core/lib/iomgr/iomgr_custom.h
@@ -2139,6 +2143,20 @@ targets:
- grpc_test_util
- grpc
uses_polling: false
+- name: buffer_list_test
+ build: test
+ language: c
+ src:
+ - test/core/iomgr/buffer_list_test.cc
+ deps:
+ - grpc_test_util
+ - grpc
+ - gpr_test_util
+ - gpr
+ exclude_iomgrs:
+ - uv
+ platforms:
+ - linux
- name: channel_create_test
build: test
language: c
@@ -3022,6 +3040,19 @@ targets:
- gpr_test_util
- gpr
uses_polling: false
+- name: inproc_callback_test
+ build: test
+ language: c
+ headers:
+ - test/core/end2end/end2end_tests.h
+ src:
+ - test/core/end2end/inproc_callback_test.cc
+ deps:
+ - grpc_test_util
+ - grpc
+ - gpr_test_util
+ - gpr
+ uses_polling: false
- name: invalid_call_argument_test
cpu_cost: 0.1
build: test
diff --git a/config.m4 b/config.m4
index 7825274eea9..af3624cdd1b 100644
--- a/config.m4
+++ b/config.m4
@@ -108,6 +108,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/http/format_request.cc \
src/core/lib/http/httpcli.cc \
src/core/lib/http/parser.cc \
+ src/core/lib/iomgr/buffer_list.cc \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/combiner.cc \
src/core/lib/iomgr/endpoint.cc \
@@ -128,6 +129,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
+ src/core/lib/iomgr/internal_errqueue.cc \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iomgr.cc \
src/core/lib/iomgr/iomgr_custom.cc \
diff --git a/config.w32 b/config.w32
index a9d1e6c9d01..ad91ee40bd7 100644
--- a/config.w32
+++ b/config.w32
@@ -83,6 +83,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\http\\format_request.cc " +
"src\\core\\lib\\http\\httpcli.cc " +
"src\\core\\lib\\http\\parser.cc " +
+ "src\\core\\lib\\iomgr\\buffer_list.cc " +
"src\\core\\lib\\iomgr\\call_combiner.cc " +
"src\\core\\lib\\iomgr\\combiner.cc " +
"src\\core\\lib\\iomgr\\endpoint.cc " +
@@ -103,6 +104,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\gethostname_fallback.cc " +
"src\\core\\lib\\iomgr\\gethostname_host_name_max.cc " +
"src\\core\\lib\\iomgr\\gethostname_sysconf.cc " +
+ "src\\core\\lib\\iomgr\\internal_errqueue.cc " +
"src\\core\\lib\\iomgr\\iocp_windows.cc " +
"src\\core\\lib\\iomgr\\iomgr.cc " +
"src\\core\\lib\\iomgr\\iomgr_custom.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 207d7baa8f9..a387794f579 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -382,6 +382,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/block_annotate.h',
+ 'src/core/lib/iomgr/buffer_list.h',
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
@@ -397,6 +398,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',
+ 'src/core/lib/iomgr/internal_errqueue.h',
'src/core/lib/iomgr/iocp_windows.h',
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_custom.h',
@@ -570,6 +572,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/block_annotate.h',
+ 'src/core/lib/iomgr/buffer_list.h',
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
@@ -585,6 +588,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',
+ 'src/core/lib/iomgr/internal_errqueue.h',
'src/core/lib/iomgr/iocp_windows.h',
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_custom.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index f1a9bce7f2e..9832283e5c9 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -394,6 +394,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/block_annotate.h',
+ 'src/core/lib/iomgr/buffer_list.h',
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
@@ -409,6 +410,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',
+ 'src/core/lib/iomgr/internal_errqueue.h',
'src/core/lib/iomgr/iocp_windows.h',
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_custom.h',
@@ -538,6 +540,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
+ 'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@@ -558,6 +561,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
+ 'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
@@ -993,6 +997,7 @@ Pod::Spec.new do |s|
'src/core/lib/http/httpcli.h',
'src/core/lib/http/parser.h',
'src/core/lib/iomgr/block_annotate.h',
+ 'src/core/lib/iomgr/buffer_list.h',
'src/core/lib/iomgr/call_combiner.h',
'src/core/lib/iomgr/closure.h',
'src/core/lib/iomgr/combiner.h',
@@ -1008,6 +1013,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',
+ 'src/core/lib/iomgr/internal_errqueue.h',
'src/core/lib/iomgr/iocp_windows.h',
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_custom.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 5c38071d15b..c8e58faec9e 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -330,6 +330,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/http/httpcli.h )
s.files += %w( src/core/lib/http/parser.h )
s.files += %w( src/core/lib/iomgr/block_annotate.h )
+ s.files += %w( src/core/lib/iomgr/buffer_list.h )
s.files += %w( src/core/lib/iomgr/call_combiner.h )
s.files += %w( src/core/lib/iomgr/closure.h )
s.files += %w( src/core/lib/iomgr/combiner.h )
@@ -345,6 +346,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/exec_ctx.h )
s.files += %w( src/core/lib/iomgr/executor.h )
s.files += %w( src/core/lib/iomgr/gethostname.h )
+ s.files += %w( src/core/lib/iomgr/internal_errqueue.h )
s.files += %w( src/core/lib/iomgr/iocp_windows.h )
s.files += %w( src/core/lib/iomgr/iomgr.h )
s.files += %w( src/core/lib/iomgr/iomgr_custom.h )
@@ -474,6 +476,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/http/format_request.cc )
s.files += %w( src/core/lib/http/httpcli.cc )
s.files += %w( src/core/lib/http/parser.cc )
+ s.files += %w( src/core/lib/iomgr/buffer_list.cc )
s.files += %w( src/core/lib/iomgr/call_combiner.cc )
s.files += %w( src/core/lib/iomgr/combiner.cc )
s.files += %w( src/core/lib/iomgr/endpoint.cc )
@@ -494,6 +497,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/gethostname_fallback.cc )
s.files += %w( src/core/lib/iomgr/gethostname_host_name_max.cc )
s.files += %w( src/core/lib/iomgr/gethostname_sysconf.cc )
+ s.files += %w( src/core/lib/iomgr/internal_errqueue.cc )
s.files += %w( src/core/lib/iomgr/iocp_windows.cc )
s.files += %w( src/core/lib/iomgr/iomgr.cc )
s.files += %w( src/core/lib/iomgr/iomgr_custom.cc )
diff --git a/grpc.gyp b/grpc.gyp
index a36998bcb3b..654a5310923 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -300,6 +300,7 @@
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
+ 'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@@ -320,6 +321,7 @@
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
+ 'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
@@ -660,6 +662,7 @@
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
+ 'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@@ -680,6 +683,7 @@
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
+ 'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
@@ -893,6 +897,7 @@
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
+ 'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@@ -913,6 +918,7 @@
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
+ 'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
@@ -1104,6 +1110,7 @@
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
+ 'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@@ -1124,6 +1131,7 @@
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
+ 'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
diff --git a/package.xml b/package.xml
index 1bdf127c36a..537c5404e7e 100644
--- a/package.xml
+++ b/package.xml
@@ -335,6 +335,7 @@
+
@@ -350,6 +351,7 @@
+
@@ -479,6 +481,7 @@
+
@@ -499,6 +502,7 @@
+
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
index 4e8b8b71dbd..7ce8da8c00e 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -320,7 +320,7 @@ static void http_connect_handshaker_do_handshake(
// Take a new ref to be held by the write callback.
gpr_ref(&handshaker->refcount);
grpc_endpoint_write(args->endpoint, &handshaker->write_buffer,
- &handshaker->request_done_closure);
+ &handshaker->request_done_closure, nullptr);
gpr_mu_unlock(&handshaker->mu);
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index dfed824cd5b..5bdcb387c9b 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
- grpc_fd_create(fd, "client", false), args, "fd-client");
+ grpc_fd_create(fd, "client", true), args, "fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
index a0228785eeb..e4bd91d07ba 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
@@ -44,7 +44,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
gpr_asprintf(&name, "fd:%d", fd);
grpc_endpoint* server_endpoint =
- grpc_tcp_create(grpc_fd_create(fd, name, false),
+ grpc_tcp_create(grpc_fd_create(fd, name, true),
grpc_server_get_channel_args(server), name);
gpr_free(name);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 36511fa6088..027a57d606d 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1029,7 +1029,8 @@ static void write_action(void* gt, grpc_error* error) {
grpc_endpoint_write(
t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
- grpc_combiner_scheduler(t->combiner)));
+ grpc_combiner_scheduler(t->combiner)),
+ nullptr);
}
/* Callback from the grpc_endpoint after bytes have been written by calling
diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc
index 12060074c53..3bd7a2ce590 100644
--- a/src/core/lib/http/httpcli.cc
+++ b/src/core/lib/http/httpcli.cc
@@ -163,7 +163,7 @@ static void done_write(void* arg, grpc_error* error) {
static void start_write(internal_request* req) {
grpc_slice_ref_internal(req->request_text);
grpc_slice_buffer_add(&req->outgoing, req->request_text);
- grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write);
+ grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write, nullptr);
}
static void on_handshake_done(void* arg, grpc_endpoint* ep) {
diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc
new file mode 100644
index 00000000000..6ada23db1c2
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_list.cc
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include
+
+#include "src/core/lib/iomgr/buffer_list.h"
+#include "src/core/lib/iomgr/port.h"
+
+#include
+
+#ifdef GRPC_LINUX_ERRQUEUE
+#include
+
+#include "src/core/lib/gprpp/memory.h"
+
+namespace grpc_core {
+void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
+ void* arg) {
+ GPR_DEBUG_ASSERT(head != nullptr);
+ TracedBuffer* new_elem = New(seq_no, arg);
+ /* Store the current time as the sendmsg time. */
+ new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME);
+ if (*head == nullptr) {
+ *head = new_elem;
+ return;
+ }
+ /* Append at the end. */
+ TracedBuffer* ptr = *head;
+ while (ptr->next_ != nullptr) {
+ ptr = ptr->next_;
+ }
+ ptr->next_ = new_elem;
+}
+
+namespace {
+/** Fills gpr_timespec gts based on values from timespec ts */
+void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
+ gts->tv_sec = ts->tv_sec;
+ gts->tv_nsec = static_cast(ts->tv_nsec);
+ gts->clock_type = GPR_CLOCK_REALTIME;
+}
+
+/** The saved callback function that will be invoked when we get all the
+ * timestamps that we are going to get for a TracedBuffer. */
+void (*timestamps_callback)(void*, grpc_core::Timestamps*,
+ grpc_error* shutdown_err);
+} /* namespace */
+
+void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
+ struct sock_extended_err* serr,
+ struct scm_timestamping* tss) {
+ GPR_DEBUG_ASSERT(head != nullptr);
+ TracedBuffer* elem = *head;
+ TracedBuffer* next = nullptr;
+ while (elem != nullptr) {
+ /* The byte number refers to the sequence number of the last byte which this
+ * timestamp relates to. */
+ if (serr->ee_data >= elem->seq_no_) {
+ switch (serr->ee_info) {
+ case SCM_TSTAMP_SCHED:
+ fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0]));
+ elem = elem->next_;
+ break;
+ case SCM_TSTAMP_SND:
+ fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0]));
+ elem = elem->next_;
+ break;
+ case SCM_TSTAMP_ACK:
+ fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0]));
+ /* Got all timestamps. Do the callback and free this TracedBuffer.
+ * The thing below can be passed by value if we don't want the
+ * restriction on the lifetime. */
+ timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE);
+ next = elem->next_;
+ Delete(elem);
+ *head = elem = next;
+ break;
+ default:
+ abort();
+ }
+ } else {
+ break;
+ }
+ }
+}
+
+void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) {
+ GPR_DEBUG_ASSERT(head != nullptr);
+ TracedBuffer* elem = *head;
+ while (elem != nullptr) {
+ if (timestamps_callback) {
+ timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
+ }
+ auto* next = elem->next_;
+ Delete(elem);
+ elem = next;
+ }
+ *head = nullptr;
+ GRPC_ERROR_UNREF(shutdown_err);
+}
+
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+ grpc_core::Timestamps*,
+ grpc_error* error)) {
+ timestamps_callback = fn;
+}
+} /* namespace grpc_core */
+
+#else /* GRPC_LINUX_ERRQUEUE */
+
+namespace grpc_core {
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+ grpc_core::Timestamps*,
+ grpc_error* error)) {
+ gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform");
+}
+} /* namespace grpc_core */
+
+#endif /* GRPC_LINUX_ERRQUEUE */
diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h
new file mode 100644
index 00000000000..cbbf50a6575
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_list.h
@@ -0,0 +1,96 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
+#define GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H
+
+#include
+
+#include "src/core/lib/iomgr/port.h"
+
+#include
+
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/internal_errqueue.h"
+
+namespace grpc_core {
+struct Timestamps {
+ /* TODO(yashykt): This would also need to store OPTSTAT once support is added
+ */
+ gpr_timespec sendmsg_time;
+ gpr_timespec scheduled_time;
+ gpr_timespec sent_time;
+ gpr_timespec acked_time;
+};
+
+/** TracedBuffer is a class to keep track of timestamps for a specific buffer in
+ * the TCP layer. We are only tracking timestamps for Linux kernels and hence
+ * this class would only be used by Linux platforms. For all other platforms,
+ * TracedBuffer would be an empty class.
+ *
+ * The timestamps collected are according to grpc_core::Timestamps declared
+ * above.
+ *
+ * A TracedBuffer list is kept track of using the head element of the list. If
+ * the head element of the list is nullptr, then the list is empty.
+ */
+#ifdef GRPC_LINUX_ERRQUEUE
+class TracedBuffer {
+ public:
+ /** Add a new entry in the TracedBuffer list pointed to by head. Also saves
+ * sendmsg_time with the current timestamp. */
+ static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no,
+ void* arg);
+
+ /** Processes a received timestamp based on sock_extended_err and
+ * scm_timestamping structures. It will invoke the timestamps callback if the
+ * timestamp type is SCM_TSTAMP_ACK. */
+ static void ProcessTimestamp(grpc_core::TracedBuffer** head,
+ struct sock_extended_err* serr,
+ struct scm_timestamping* tss);
+
+ /** Cleans the list by calling the callback for each traced buffer in the list
+ * with timestamps that it has. */
+ static void Shutdown(grpc_core::TracedBuffer** head,
+ grpc_error* shutdown_err);
+
+ private:
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
+
+ TracedBuffer(int seq_no, void* arg)
+ : seq_no_(seq_no), arg_(arg), next_(nullptr) {}
+
+ uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
+ void* arg_; /* The arg to pass to timestamps_callback */
+ grpc_core::Timestamps ts_; /* The timestamps corresponding to this buffer */
+ grpc_core::TracedBuffer* next_; /* The next TracedBuffer in the list */
+};
+#else /* GRPC_LINUX_ERRQUEUE */
+class TracedBuffer {};
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+/** Sets the callback function to call when timestamps for a write are
+ * collected. The callback does not own a reference to error. */
+void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*,
+ grpc_core::Timestamps*,
+ grpc_error* error));
+
+}; /* namespace grpc_core */
+
+#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H */
diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc
index 92e79301114..44fb47e19d9 100644
--- a/src/core/lib/iomgr/endpoint.cc
+++ b/src/core/lib/iomgr/endpoint.cc
@@ -28,8 +28,8 @@ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
- ep->vtable->write(ep, slices, cb);
+ grpc_closure* cb, void* arg) {
+ ep->vtable->write(ep, slices, cb, arg);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 15db1649fa9..1f590a80cae 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -33,10 +33,12 @@
typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
+class Timestamps;
struct grpc_endpoint_vtable {
void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
- void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb);
+ void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb,
+ void* arg);
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
@@ -70,9 +72,11 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep);
\a slices may be mutated at will by the endpoint until cb is called.
No guarantee is made to the content of slices after a write EXCEPT that
it is a valid slice buffer.
+ \a arg is platform specific. It is currently only used by TCP on linux
+ platforms as an argument that would be forwarded to the timestamps callback.
*/
void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb);
+ grpc_closure* cb, void* arg);
/* Causes any pending and future read/write callbacks to run immediately with
success==0 */
diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc
index c3bc0cc8fd9..df2cf508c8f 100644
--- a/src/core/lib/iomgr/endpoint_cfstream.cc
+++ b/src/core/lib/iomgr/endpoint_cfstream.cc
@@ -268,7 +268,7 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
CFStreamEndpoint* ep_impl = reinterpret_cast(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc
index 5c5c246f998..3afbfd72543 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.cc
+++ b/src/core/lib/iomgr/endpoint_pair_posix.cc
@@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_core::ExecCtx exec_ctx;
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args,
"socketpair-client");
gpr_free(final_name);
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index fb4c71ef71b..16562538a63 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -60,6 +60,19 @@ typedef struct grpc_fd_watcher {
grpc_fd* fd;
} grpc_fd_watcher;
+typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
+
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+struct grpc_fork_fd_list {
+ /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
+ set to nullptr. */
+ grpc_fd* fd;
+ grpc_cached_wakeup_fd* cached_wakeup_fd;
+
+ grpc_fork_fd_list* next;
+ grpc_fork_fd_list* prev;
+};
+
struct grpc_fd {
int fd;
/* refst format:
@@ -108,8 +121,18 @@ struct grpc_fd {
grpc_closure* on_done_closure;
grpc_iomgr_object iomgr_object;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
};
+/* True when GRPC_ENABLE_FORK_SUPPORT=1. We do not support fork with poll-cv */
+static bool track_fds_for_fork = false;
+
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+static grpc_fork_fd_list* fork_fd_list_head = nullptr;
+static gpr_mu fork_fd_list_mu;
+
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
@@ -156,6 +179,9 @@ static void fd_unref(grpc_fd* fd);
typedef struct grpc_cached_wakeup_fd {
grpc_wakeup_fd fd;
struct grpc_cached_wakeup_fd* next;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
} grpc_cached_wakeup_fd;
struct grpc_pollset_worker {
@@ -281,9 +307,61 @@ poll_hash_table poll_cache;
grpc_cv_fd_table g_cvfds;
/*******************************************************************************
- * fd_posix.c
+ * functions to track opened fds. No-ops unless track_fds_for_fork is true.
*/
+static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
+ if (track_fds_for_fork) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ if (fork_fd_list_head == node) {
+ fork_fd_list_head = node->next;
+ }
+ if (node->prev != nullptr) {
+ node->prev->next = node->next;
+ }
+ if (node->next != nullptr) {
+ node->next->prev = node->prev;
+ }
+ gpr_free(node);
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
+static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ node->next = fork_fd_list_head;
+ node->prev = nullptr;
+ if (fork_fd_list_head != nullptr) {
+ fork_fd_list_head->prev = node;
+ }
+ fork_fd_list_head = node;
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
+static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
+ if (track_fds_for_fork) {
+ fd->fork_fd_list =
+ static_cast(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->fd = fd;
+ fd->fork_fd_list->cached_wakeup_fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
+ if (track_fds_for_fork) {
+ fd->fork_fd_list =
+ static_cast(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->cached_wakeup_fd = fd;
+ fd->fork_fd_list->fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+ /*******************************************************************************
+ * fd_posix.c
+ */
+
#ifndef NDEBUG
#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
@@ -319,6 +397,7 @@ static void unref_by(grpc_fd* fd, int n) {
if (old == n) {
gpr_mu_destroy(&fd->mu);
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ fork_fd_list_remove_node(fd->fork_fd_list);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_free(fd);
} else {
@@ -347,6 +426,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
gpr_asprintf(&name2, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&r->iomgr_object, name2);
gpr_free(name2);
+ fork_fd_list_add_grpc_fd(r);
return r;
}
@@ -822,6 +902,7 @@ static void pollset_destroy(grpc_pollset* pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
+ fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
gpr_free(pollset->local_wakeup_cache);
pollset->local_wakeup_cache = next;
@@ -895,6 +976,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
worker.wakeup_fd = static_cast(
gpr_malloc(sizeof(*worker.wakeup_fd)));
error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
return error;
@@ -1705,6 +1787,10 @@ static void shutdown_engine(void) {
if (grpc_cv_wakeup_fds_enabled()) {
global_cv_fd_table_shutdown();
}
+ if (track_fds_for_fork) {
+ gpr_mu_destroy(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
+ }
}
static const grpc_event_engine_vtable vtable = {
@@ -1742,6 +1828,26 @@ static const grpc_event_engine_vtable vtable = {
shutdown_engine,
};
+/* Called by the child process's post-fork handler to close open fds, including
+ * worker wakeup fds. This allows gRPC to shutdown in the child process without
+ * interfering with connections or RPCs ongoing in the parent. */
+static void reset_event_manager_on_fork() {
+ gpr_mu_lock(&fork_fd_list_mu);
+ while (fork_fd_list_head != nullptr) {
+ if (fork_fd_list_head->fd != nullptr) {
+ close(fork_fd_list_head->fd->fd);
+ fork_fd_list_head->fd->fd = -1;
+ } else {
+ close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
+ close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
+ }
+ fork_fd_list_head = fork_fd_list_head->next;
+ }
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!grpc_has_wakeup_fd()) {
gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
@@ -1750,6 +1856,12 @@ const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
return nullptr;
}
+ if (grpc_core::Fork::Enabled()) {
+ track_fds_for_fork = true;
+ gpr_mu_init(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(
+ reset_event_manager_on_fork);
+ }
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 0205363d5c4..d4377e2d504 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -237,14 +237,19 @@ void grpc_event_engine_shutdown(void) {
}
bool grpc_event_engine_can_track_errors(void) {
+/* Only track errors if platform supports errqueue. */
+#ifdef GRPC_LINUX_ERRQUEUE
return g_event_engine->can_track_err;
+#else
+ return false;
+#endif /* GRPC_LINUX_ERRQUEUE */
}
grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
- GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err);
- return g_event_engine->fd_create(fd, name, track_err);
+ return g_event_engine->fd_create(fd, name,
+ track_err && g_event_engine->can_track_err);
}
int grpc_fd_wrapped_fd(grpc_fd* fd) {
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
index ac85c81de2d..e957bad73d3 100644
--- a/src/core/lib/iomgr/fork_posix.cc
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -58,6 +58,12 @@ void grpc_prefork() {
"environment variable GRPC_ENABLE_FORK_SUPPORT=1");
return;
}
+ if (strcmp(grpc_get_poll_strategy_name(), "epoll1") != 0 &&
+ strcmp(grpc_get_poll_strategy_name(), "poll") != 0) {
+ gpr_log(GPR_ERROR,
+ "Fork support is only compatible with the epoll1 and poll polling "
+ "strategies");
+ }
if (!grpc_core::Fork::BlockExecCtx()) {
gpr_log(GPR_INFO,
"Other threads are currently calling into gRPC, skipping fork() "
diff --git a/src/core/lib/iomgr/internal_errqueue.cc b/src/core/lib/iomgr/internal_errqueue.cc
new file mode 100644
index 00000000000..8823737e494
--- /dev/null
+++ b/src/core/lib/iomgr/internal_errqueue.cc
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include
+
+#include "src/core/lib/iomgr/port.h"
+
+#include "src/core/lib/iomgr/internal_errqueue.h"
+
+#ifdef GRPC_POSIX_SOCKET_TCP
+
+#ifdef GPR_LINUX
+#include
+#endif /* GPR_LINUX */
+
+bool kernel_supports_errqueue() {
+#ifdef LINUX_VERSION_CODE
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0)
+ return true;
+#endif /* LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0) */
+#endif /* LINUX_VERSION_CODE */
+ return false;
+}
+
+#endif /* GRPC_POSIX_SOCKET_TCP */
diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h
new file mode 100644
index 00000000000..fc11be9a6dd
--- /dev/null
+++ b/src/core/lib/iomgr/internal_errqueue.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* This file contains constants defined in and
+ * so as to allow collecting network timestamps in the
+ * kernel. This file allows tcp_posix.cc to compile on platforms that do not
+ * have and .
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
+#define GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H
+
+#include
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_POSIX_SOCKET_TCP
+
+#include
+#include
+
+#ifdef GRPC_LINUX_ERRQUEUE
+#include
+#include
+#include
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+namespace grpc_core {
+
+#ifdef GRPC_LINUX_ERRQUEUE
+constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE |
+ SOF_TIMESTAMPING_OPT_ID |
+ SOF_TIMESTAMPING_OPT_TSONLY;
+constexpr uint32_t kTimestampingRecordingOptions =
+ SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE |
+ SOF_TIMESTAMPING_TX_ACK;
+#endif /* GRPC_LINUX_ERRQUEUE */
+
+/* Returns true if kernel is capable of supporting errqueue and timestamping.
+ * Currently allowing only linux kernels above 4.0.0
+ */
+bool kernel_supports_errqueue();
+} // namespace grpc_core
+
+#endif /* GRPC_POSIX_SOCKET_TCP */
+
+#endif /* GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 066417b93cc..a4688fd0efb 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -60,6 +60,12 @@
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
#define GRPC_HAVE_UNIX_SOCKET 1
+#include
+#ifdef LINUX_VERSION_CODE
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0)
+#define GRPC_LINUX_ERRQUEUE 1
+#endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) */
+#endif /* LINUX_VERSION_CODE */
#define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_HOST_NAME_MAX 1
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 296ee74311a..9c989b7dfee 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -279,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
}
addr_str = grpc_sockaddr_to_uri(mapped_addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
- *fdobj = grpc_fd_create(fd, name, false);
+ *fdobj = grpc_fd_create(fd, name, true);
gpr_free(name);
gpr_free(addr_str);
return GRPC_ERROR_NONE;
diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc
index 990e8d632b9..e02a1898f25 100644
--- a/src/core/lib/iomgr/tcp_custom.cc
+++ b/src/core/lib/iomgr/tcp_custom.cc
@@ -221,7 +221,7 @@ static void custom_write_callback(grpc_custom_socket* socket,
}
static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index b53ffbf01cf..1db2790265e 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -27,7 +27,9 @@
#include
#include
+#include
#include
+#include
#include
#include
#include
@@ -46,6 +48,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
@@ -97,17 +100,42 @@ struct grpc_tcp {
grpc_closure read_done_closure;
grpc_closure write_done_closure;
+ grpc_closure error_closure;
char* peer_string;
grpc_resource_user* resource_user;
grpc_resource_user_slice_allocator slice_allocator;
+
+ grpc_core::TracedBuffer* tb_head; /* List of traced buffers */
+ gpr_mu tb_mu; /* Lock for access to list of traced buffers */
+
+ /* grpc_endpoint_write takes an argument which if non-null means that the
+ * transport layer wants the TCP layer to collect timestamps for this write.
+ * This arg is forwarded to the timestamps callback function when the ACK
+ * timestamp is received from the kernel. This arg is a (void *) which allows
+ * users of this API to pass in a pointer to any kind of structure. This
+ * structure could actually be a tag or any book-keeping object that the user
+ * can use to distinguish between different traced writes. The only
+ * requirement from the TCP endpoint layer is that this arg should be non-null
+ * if the user wants timestamps for the write. */
+ void* outgoing_buffer_arg;
+ /* A counter which starts at 0. It is initialized the first time the socket
+ * options for collecting timestamps are set, and is incremented with each
+ * byte sent. */
+ int bytes_counter;
+ bool socket_ts_enabled; /* True if timestamping options are set on the socket
+ */
+ gpr_atm
+ stop_error_notification; /* Set to 1 if we do not want to be notified on
+ errors anymore */
};
struct backup_poller {
gpr_mu* pollset_mu;
grpc_closure run_poller;
};
+
} // namespace
#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1))
@@ -302,6 +330,7 @@ static void tcp_free(grpc_tcp* tcp) {
grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);
+ gpr_mu_destroy(&tcp->tb_mu);
gpr_free(tcp);
}
@@ -347,6 +376,10 @@ static void tcp_destroy(grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = reinterpret_cast(ep);
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ if (grpc_event_engine_can_track_errors()) {
+ gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
+ grpc_fd_set_error(tcp->em_fd);
+ }
TCP_UNREF(tcp, "destroy");
}
@@ -513,6 +546,234 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
}
}
+/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
+ * of bytes sent. */
+ssize_t tcp_send(int fd, const struct msghdr* msg) {
+ GPR_TIMER_SCOPE("sendmsg", 1);
+ ssize_t sent_length;
+ do {
+ /* TODO(klempner): Cork if this is a partial write */
+ GRPC_STATS_INC_SYSCALL_WRITE();
+ sent_length = sendmsg(fd, msg, SENDMSG_FLAGS);
+ } while (sent_length < 0 && errno == EINTR);
+ return sent_length;
+}
+
+/** This is to be called if outgoing_buffer_arg is not null. On linux platforms,
+ * this will call sendmsg with socket options set to collect timestamps inside
+ * the kernel. On return, sent_length is set to the return value of the sendmsg
+ * call. Returns false if setting the socket options failed. This is not
+ * implemented for non-linux platforms currently, and crashes out.
+ */
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+ size_t sending_length,
+ ssize_t* sent_length, grpc_error** error);
+
+/** The callback function to be invoked when we get an error on the socket. */
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
+
+#ifdef GRPC_LINUX_ERRQUEUE
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+ size_t sending_length,
+ ssize_t* sent_length,
+ grpc_error** error) {
+ if (!tcp->socket_ts_enabled) {
+ uint32_t opt = grpc_core::kTimestampingSocketOptions;
+ if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
+ static_cast(&opt), sizeof(opt)) != 0) {
+ *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp);
+ grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket.");
+ }
+ return false;
+ }
+ tcp->bytes_counter = -1;
+ tcp->socket_ts_enabled = true;
+ }
+ /* Set control message to indicate that you want timestamps. */
+ union {
+ char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))];
+ struct cmsghdr align;
+ } u;
+ cmsghdr* cmsg = reinterpret_cast(u.cmsg_buf);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SO_TIMESTAMPING;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t));
+ *reinterpret_cast(CMSG_DATA(cmsg)) =
+ grpc_core::kTimestampingRecordingOptions;
+ msg->msg_control = u.cmsg_buf;
+ msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));
+
+ /* If there was an error on sendmsg the logic in tcp_flush will handle it. */
+ ssize_t length = tcp_send(tcp->fd, msg);
+ *sent_length = length;
+ /* Only save timestamps if all the bytes were taken by sendmsg. */
+ if (sending_length == static_cast(length)) {
+ gpr_mu_lock(&tcp->tb_mu);
+ grpc_core::TracedBuffer::AddNewEntry(
+ &tcp->tb_head, static_cast(tcp->bytes_counter + length),
+ tcp->outgoing_buffer_arg);
+ gpr_mu_unlock(&tcp->tb_mu);
+ tcp->outgoing_buffer_arg = nullptr;
+ }
+ return true;
+}
+
+/** Reads \a cmsg to derive timestamps from the control messages. If a valid
+ * timestamp is found, the traced buffer list is updated with this timestamp.
+ * The caller of this function should be looping on the control messages found
+ * in \a msg. \a cmsg should point to the control message that the caller wants
+ * processed.
+ * On return, a pointer to a control message is returned. On the next iteration,
+ * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */
+struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
+ struct cmsghdr* cmsg) {
+ auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
+ if (next_cmsg == nullptr) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Received timestamp without extended error");
+ }
+ return cmsg;
+ }
+
+ if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
+ !(next_cmsg->cmsg_type == IP_RECVERR ||
+ next_cmsg->cmsg_type == IPV6_RECVERR)) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Unexpected control message");
+ }
+ return cmsg;
+ }
+
+ auto tss = reinterpret_cast(CMSG_DATA(cmsg));
+ auto serr = reinterpret_cast(CMSG_DATA(next_cmsg));
+ if (serr->ee_errno != ENOMSG ||
+ serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
+ gpr_log(GPR_ERROR, "Unexpected control message");
+ return cmsg;
+ }
+ /* The error handling can potentially be done on another thread so we need
+ * to protect the traced buffer list. A lock free list might be better. Using
+ * a simple mutex for now. */
+ gpr_mu_lock(&tcp->tb_mu);
+ grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, tss);
+ gpr_mu_unlock(&tcp->tb_mu);
+ return next_cmsg;
+}
+
+/** For linux platforms, reads the socket's error queue and processes error
+ * messages from the queue. Returns true if all the errors processed were
+ * timestamps. Returns false if any of the errors were not timestamps. For
+ * non-linux platforms, error processing is not used/enabled currently.
+ */
+static bool process_errors(grpc_tcp* tcp) {
+ while (true) {
+ struct iovec iov;
+ iov.iov_base = nullptr;
+ iov.iov_len = 0;
+ struct msghdr msg;
+ msg.msg_name = nullptr;
+ msg.msg_namelen = 0;
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 0;
+ msg.msg_flags = 0;
+
+ union {
+ char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) +
+ CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/];
+ struct cmsghdr align;
+ } aligned_buf;
+ memset(&aligned_buf, 0, sizeof(aligned_buf));
+
+ msg.msg_control = aligned_buf.rbuf;
+ msg.msg_controllen = sizeof(aligned_buf.rbuf);
+
+ int r, saved_errno;
+ do {
+ r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE);
+ saved_errno = errno;
+ } while (r < 0 && saved_errno == EINTR);
+
+ if (r == -1 && saved_errno == EAGAIN) {
+ return true; /* No more errors to process */
+ }
+ if (r == -1) {
+ return false;
+ }
+ if (grpc_tcp_trace.enabled()) {
+ if ((msg.msg_flags & MSG_CTRUNC) == 1) {
+ gpr_log(GPR_INFO, "Error message was truncated.");
+ }
+ }
+
+ if (msg.msg_controllen == 0) {
+ /* There was no control message found. It was probably spurious. */
+ return true;
+ }
+ for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
+ cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_level != SOL_SOCKET ||
+ cmsg->cmsg_type != SCM_TIMESTAMPING) {
+ /* Got a control message that is not a timestamp. Don't know how to
+ * handle this. */
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "unknown control message cmsg_level:%d cmsg_type:%d",
+ cmsg->cmsg_level, cmsg->cmsg_type);
+ }
+ return false;
+ }
+ process_timestamp(tcp, &msg, cmsg);
+ }
+ }
+}
+
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
+ grpc_tcp* tcp = static_cast(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
+ }
+
+ if (error != GRPC_ERROR_NONE ||
+ static_cast(gpr_atm_acq_load(&tcp->stop_error_notification))) {
+ /* We aren't going to register to hear on error anymore, so it is safe to
+ * unref. */
+ grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error));
+ TCP_UNREF(tcp, "error-tracking");
+ return;
+ }
+
+ /* We are still interested in collecting timestamps, so let's try reading
+ * them. */
+ if (!process_errors(tcp)) {
+ /* This was not a timestamps error. This was an actual error. Set the
+ * read and write closures to be ready.
+ */
+ grpc_fd_set_readable(tcp->em_fd);
+ grpc_fd_set_writable(tcp->em_fd);
+ }
+ GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
+}
+
+#else /* GRPC_LINUX_ERRQUEUE */
+static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
+ size_t sending_length,
+ ssize_t* sent_length,
+ grpc_error** error) {
+ gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
+ GPR_ASSERT(0);
+ return false;
+}
+
+static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
+ gpr_log(GPR_ERROR, "Error handling is not supported for this platform");
+ GPR_ASSERT(0);
+}
+#endif /* GRPC_LINUX_ERRQUEUE */
+
/* returns true if done, false if pending; if returning true, *error is set */
#if defined(IOV_MAX) && IOV_MAX < 1000
#define MAX_WRITE_IOVEC IOV_MAX
@@ -557,19 +818,20 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iov_size;
- msg.msg_control = nullptr;
- msg.msg_controllen = 0;
msg.msg_flags = 0;
+ if (tcp->outgoing_buffer_arg != nullptr) {
+ if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
+ error))
+ return true; /* something went wrong with timestamps */
+ } else {
+ msg.msg_control = nullptr;
+ msg.msg_controllen = 0;
- GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
- GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
+ GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
+ GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
- GPR_TIMER_SCOPE("sendmsg", 1);
- do {
- /* TODO(klempner): Cork if this is a partial write */
- GRPC_STATS_INC_SYSCALL_WRITE();
- sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
- } while (sent_length < 0 && errno == EINTR);
+ sent_length = tcp_send(tcp->fd, &msg);
+ }
if (sent_length < 0) {
if (errno == EAGAIN) {
@@ -593,6 +855,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
}
GPR_ASSERT(tcp->outgoing_byte_idx == 0);
+ tcp->bytes_counter += sent_length;
trailing = sending_length - static_cast(sent_length);
while (trailing > 0) {
size_t slice_length;
@@ -607,7 +870,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
trailing -= slice_length;
}
}
-
if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
*error = GRPC_ERROR_NONE;
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
@@ -640,14 +902,13 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
-
GRPC_CLOSURE_SCHED(cb, error);
TCP_UNREF(tcp, "write");
}
}
static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("tcp_write", 0);
grpc_tcp* tcp = reinterpret_cast(ep);
grpc_error* error = GRPC_ERROR_NONE;
@@ -675,6 +936,10 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
tcp->outgoing_buffer = buf;
tcp->outgoing_byte_idx = 0;
+ tcp->outgoing_buffer_arg = arg;
+ if (arg) {
+ GPR_ASSERT(grpc_event_engine_can_track_errors());
+ }
if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
@@ -792,6 +1057,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->bytes_read_this_round = 0;
/* Will be set to false by the very first endpoint read function */
tcp->is_first_read = true;
+ tcp->bytes_counter = -1;
+ tcp->socket_ts_enabled = false;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
@@ -803,6 +1070,19 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
grpc_resource_quota_unref_internal(resource_quota);
+ gpr_mu_init(&tcp->tb_mu);
+ tcp->tb_head = nullptr;
+ /* Start being notified on errors if event engine can track errors. */
+ if (grpc_event_engine_can_track_errors()) {
+ /* Grab a ref to tcp so that we can safely access the tcp struct when
+ * processing errors. We unref when we no longer want to track errors
+ * separately. */
+ TCP_REF(tcp, "error-tracking");
+ gpr_atm_rel_store(&tcp->stop_error_notification, 0);
+ GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
+ }
return &tcp->base;
}
@@ -821,6 +1101,11 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
tcp->release_fd = fd;
tcp->release_fd_cb = done;
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ if (grpc_event_engine_can_track_errors()) {
+ /* Stop errors notification. */
+ gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
+ grpc_fd_set_error(tcp->em_fd);
+ }
TCP_UNREF(tcp, "destroy");
}
diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h
index af89bd24db1..eff825cb925 100644
--- a/src/core/lib/iomgr/tcp_posix.h
+++ b/src/core/lib/iomgr/tcp_posix.h
@@ -31,7 +31,10 @@
#include
+#include "src/core/lib/iomgr/port.h"
+
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 8ddf684feac..824db07fbfd 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- grpc_fd* fdobj = grpc_fd_create(fd, name, false);
+ grpc_fd* fdobj = grpc_fd_create(fd, name, true);
read_notifier_pollset =
sp->server->pollsets[static_cast(gpr_atm_no_barrier_fetch_add(
@@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
listener->sibling = sp;
sp->server = listener->server;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name, false);
+ sp->emfd = grpc_fd_create(fd, name, true);
memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index b9f81455729..9595c028ce0 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd,
s->tail = sp;
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name, false);
+ sp->emfd = grpc_fd_create(fd, name, true);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index b3cb442f18f..64c4a56ae95 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -296,7 +296,7 @@ static void on_write(void* tcpp, grpc_error* error) {
/* Initiates a write. */
static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_winsocket* socket = tcp->socket;
grpc_winsocket_callback_info* info = &socket->write_info;
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index bdb2d0e7644..3dd7cab855c 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
- emfd_ = grpc_fd_create(fd, name, false);
+ emfd_ = grpc_fd_create(fd, name, true);
memcpy(&addr_, addr, sizeof(grpc_resolved_address));
GPR_ASSERT(emfd_);
gpr_free(name);
diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc
index 840b2e73bcf..f40f969bb7f 100644
--- a/src/core/lib/security/transport/secure_endpoint.cc
+++ b/src/core/lib/security/transport/secure_endpoint.cc
@@ -254,7 +254,7 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
}
static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0);
unsigned i;
@@ -342,7 +342,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
return;
}
- grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
+ grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg);
}
static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) {
diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc
index aff723ed044..d76d5826388 100644
--- a/src/core/lib/security/transport/security_handshaker.cc
+++ b/src/core/lib/security/transport/security_handshaker.cc
@@ -259,7 +259,7 @@ static grpc_error* on_handshake_next_done_locked(
grpc_slice_buffer_reset_and_unref_internal(&h->outgoing);
grpc_slice_buffer_add(&h->outgoing, to_send);
grpc_endpoint_write(h->args->endpoint, &h->outgoing,
- &h->on_handshake_data_sent_to_peer);
+ &h->on_handshake_data_sent_to_peer, nullptr);
} else if (handshaker_result == nullptr) {
// There is nothing to send, but need to read from peer.
grpc_endpoint_read(h->args->endpoint, h->args->read_buffer,
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
index 0d2516977b8..433ae1f374f 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
@@ -37,8 +37,6 @@ _GRPC_ENABLE_FORK_SUPPORT = (
os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0')
.lower() in _TRUE_VALUES)
-_GRPC_POLL_STRATEGY = os.environ.get('GRPC_POLL_STRATEGY')
-
cdef void __prefork() nogil:
with gil:
with _fork_state.fork_in_progress_condition:
@@ -82,13 +80,6 @@ cdef void __postfork_child() nogil:
def fork_handlers_and_grpc_init():
grpc_init()
if _GRPC_ENABLE_FORK_SUPPORT:
- # TODO(ericgribkoff) epoll1 is default for grpcio distribution. Decide whether to expose
- # grpc_get_poll_strategy_name() from ev_posix.cc to get actual polling choice.
- if _GRPC_POLL_STRATEGY is not None and _GRPC_POLL_STRATEGY != "epoll1":
- _LOGGER.error(
- 'gRPC Python fork support is only compatible with the epoll1 '
- 'polling engine')
- return
with _fork_state.fork_handler_registered_lock:
if not _fork_state.fork_handler_registered:
pthread_atfork(&__prefork, &__postfork_parent, &__postfork_child)
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 6e6d756eec7..0f68e823d79 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -82,6 +82,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/http/format_request.cc',
'src/core/lib/http/httpcli.cc',
'src/core/lib/http/parser.cc',
+ 'src/core/lib/iomgr/buffer_list.cc',
'src/core/lib/iomgr/call_combiner.cc',
'src/core/lib/iomgr/combiner.cc',
'src/core/lib/iomgr/endpoint.cc',
@@ -102,6 +103,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/gethostname_fallback.cc',
'src/core/lib/iomgr/gethostname_host_name_max.cc',
'src/core/lib/iomgr/gethostname_sysconf.cc',
+ 'src/core/lib/iomgr/internal_errqueue.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
'src/core/lib/iomgr/iomgr_custom.cc',
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc
index c03ebcf4096..ade23133c55 100644
--- a/test/core/bad_client/bad_client.cc
+++ b/test/core/bad_client/bad_client.cc
@@ -115,7 +115,7 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags,
grpc_schedule_on_exec_ctx);
/* Write data */
- grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure);
+ grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure, nullptr);
grpc_core::ExecCtx::Get()->Flush();
/* Await completion, unless the request is large and write may not finish
diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD
index dd16694204a..37999a98d1e 100644
--- a/test/core/end2end/BUILD
+++ b/test/core/end2end/BUILD
@@ -123,6 +123,19 @@ grpc_cc_test(
],
)
+grpc_cc_test(
+ name = "inproc_callback_test",
+ srcs = ["inproc_callback_test.cc"],
+ language = "C++",
+ deps = [
+ ':end2end_tests',
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+)
+
grpc_cc_test(
name = "invalid_call_argument_test",
srcs = ["invalid_call_argument_test.cc"],
diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc
index 3d133cfc186..f7396a16845 100644
--- a/test/core/end2end/bad_server_response_test.cc
+++ b/test/core/end2end/bad_server_response_test.cc
@@ -104,7 +104,7 @@ static void handle_write() {
grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer);
grpc_slice_buffer_add(&state.outgoing_buffer, slice);
- grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write);
+ grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr);
}
static void handle_read(void* arg, grpc_error* error) {
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc
index f02fa9d9983..ea9c000efb9 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.cc
+++ b/test/core/end2end/fixtures/http_proxy_fixture.cc
@@ -201,7 +201,7 @@ static void on_client_write_done(void* arg, grpc_error* error) {
&conn->client_write_buffer);
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
- &conn->on_client_write_done);
+ &conn->on_client_write_done, nullptr);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "write_done");
@@ -226,7 +226,7 @@ static void on_server_write_done(void* arg, grpc_error* error) {
&conn->server_write_buffer);
conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
- &conn->on_server_write_done);
+ &conn->on_server_write_done, nullptr);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "server_write");
@@ -257,7 +257,7 @@ static void on_client_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "client_read");
conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
- &conn->on_server_write_done);
+ &conn->on_server_write_done, nullptr);
}
// Read more data.
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
@@ -288,7 +288,7 @@ static void on_server_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "server_read");
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
- &conn->on_client_write_done);
+ &conn->on_client_write_done, nullptr);
}
// Read more data.
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
@@ -340,7 +340,7 @@ static void on_server_connect_done(void* arg, grpc_error* error) {
grpc_slice_buffer_add(&conn->client_write_buffer, slice);
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
- &conn->on_write_response_done);
+ &conn->on_write_response_done, nullptr);
}
/**
diff --git a/test/core/end2end/inproc_callback_test.cc b/test/core/end2end/inproc_callback_test.cc
new file mode 100644
index 00000000000..0d6c7c75a8b
--- /dev/null
+++ b/test/core/end2end/inproc_callback_test.cc
@@ -0,0 +1,498 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "test/core/end2end/end2end_tests.h"
+
+#include
+
+#include
+#include
+#include
+
+#include "src/core/ext/transport/inproc/inproc_transport.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/server.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+typedef struct inproc_fixture_data {
+ bool dummy; // reserved for future expansion. Struct can't be empty
+} inproc_fixture_data;
+
+namespace {
+template
+class CQDeletingCallback : public grpc_core::CQCallbackInterface {
+ public:
+ explicit CQDeletingCallback(F f) : func_(f) {}
+ ~CQDeletingCallback() override {}
+ void Run(bool ok) override {
+ func_(ok);
+ grpc_core::Delete(this);
+ }
+
+ private:
+ F func_;
+};
+
+template
+grpc_core::CQCallbackInterface* NewDeletingCallback(F f) {
+ return grpc_core::New>(f);
+}
+
+class ShutdownCallback : public grpc_core::CQCallbackInterface {
+ public:
+ ShutdownCallback() : done_(false) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&cv_);
+ }
+ ~ShutdownCallback() override {}
+ void Run(bool ok) override {
+ gpr_log(GPR_DEBUG, "CQ shutdown notification invoked");
+ gpr_mu_lock(&mu_);
+ done_ = true;
+ gpr_cv_broadcast(&cv_);
+ gpr_mu_unlock(&mu_);
+ }
+ // The Wait function waits for a specified amount of
+ // time for the completion of the shutdown and returns
+ // whether it was successfully shut down
+ bool Wait(gpr_timespec deadline) {
+ gpr_mu_lock(&mu_);
+ while (!done_ && !gpr_cv_wait(&cv_, &mu_, deadline)) {
+ }
+ bool ret = done_;
+ gpr_mu_unlock(&mu_);
+ return ret;
+ }
+
+ private:
+ bool done_;
+ gpr_mu mu_;
+ gpr_cv cv_;
+};
+
+ShutdownCallback* g_shutdown_callback;
+} // namespace
+
+// The following global structure is the tag collection. It holds
+// all information related to tags expected and tags received
+// during the execution, with each callback setting a tag.
+// The tag sets are implemented and checked using arrays and
+// linear lookups (rather than maps) so that this test doesn't
+// need the C++ standard library.
+static gpr_mu tags_mu;
+static gpr_cv tags_cv;
+const size_t kAvailableTags = 4;
+bool tags[kAvailableTags];
+bool tags_valid[kAvailableTags];
+bool tags_expected[kAvailableTags];
+bool tags_needed[kAvailableTags];
+
+// Mark that a tag is expected; this function must be executed in the
+// main thread only while there are no other threads altering the
+// expectation set (e.g., by calling expect_tag or verify_tags)
+static void expect_tag(intptr_t tag, bool ok) {
+ size_t idx = static_cast(tag);
+ GPR_ASSERT(idx < kAvailableTags);
+ tags_needed[idx] = true;
+ tags_expected[idx] = ok;
+}
+
+// Check that the expected tags have reached, within a certain
+// deadline. This must also be executed only on the main thread while
+// there are no other threads altering the expectation set (e.g., by
+// calling expect_tag or verify_tags). The tag verifier doesn't have
+// to drive the CQ at all (unlike the next-based end2end tests)
+// because the tags will get set when the callbacks are executed,
+// which happens when a particular batch related to a callback is
+// complete.
+static void verify_tags(gpr_timespec deadline) {
+ bool done = false;
+
+ gpr_mu_lock(&tags_mu);
+ while (!done) {
+ done = gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) > 0;
+ for (size_t i = 0; i < kAvailableTags; i++) {
+ if (tags_needed[i]) {
+ if (tags_valid[i]) {
+ gpr_log(GPR_DEBUG, "Verifying tag %d", static_cast(i));
+ if (tags[i] != tags_expected[i]) {
+ gpr_log(GPR_ERROR, "Got wrong result (%d instead of %d) for tag %d",
+ tags[i], tags_expected[i], static_cast(i));
+ GPR_ASSERT(false);
+ }
+ tags_valid[i] = false;
+ tags_needed[i] = false;
+ } else if (done) {
+ gpr_log(GPR_ERROR, "Didn't get tag %d", static_cast(i));
+ GPR_ASSERT(false);
+ }
+ }
+ }
+ bool empty = true;
+ for (size_t i = 0; i < kAvailableTags; i++) {
+ if (tags_needed[i]) {
+ empty = false;
+ }
+ }
+ done = done || empty;
+ if (done) {
+ for (size_t i = 0; i < kAvailableTags; i++) {
+ if (tags_valid[i]) {
+ gpr_log(GPR_ERROR, "Got unexpected tag %d and result %d",
+ static_cast(i), tags[i]);
+ GPR_ASSERT(false);
+ }
+ tags_valid[i] = false;
+ }
+ } else {
+ gpr_cv_wait(&tags_cv, &tags_mu, deadline);
+ }
+ }
+ gpr_mu_unlock(&tags_mu);
+}
+
+// This function creates a callback functor that emits the
+// desired tag into the global tag set
+static grpc_core::CQCallbackInterface* tag(intptr_t t) {
+ auto func = [t](bool ok) {
+ gpr_mu_lock(&tags_mu);
+ gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t);
+ bool was_empty = true;
+ for (size_t i = 0; i < kAvailableTags; i++) {
+ if (tags_valid[i]) {
+ was_empty = false;
+ }
+ }
+ size_t idx = static_cast(t);
+ tags[idx] = ok;
+ tags_valid[idx] = true;
+ if (was_empty) {
+ gpr_cv_signal(&tags_cv);
+ }
+ gpr_mu_unlock(&tags_mu);
+ };
+ auto cb = NewDeletingCallback(func);
+ return cb;
+}
+
+static grpc_end2end_test_fixture inproc_create_fixture(
+ grpc_channel_args* client_args, grpc_channel_args* server_args) {
+ grpc_end2end_test_fixture f;
+ inproc_fixture_data* ffd = static_cast(
+ gpr_malloc(sizeof(inproc_fixture_data)));
+ memset(&f, 0, sizeof(f));
+
+ f.fixture_data = ffd;
+ g_shutdown_callback = grpc_core::New();
+ f.cq =
+ grpc_completion_queue_create_for_callback(g_shutdown_callback, nullptr);
+ f.shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr);
+
+ return f;
+}
+
+void inproc_init_client(grpc_end2end_test_fixture* f,
+ grpc_channel_args* client_args) {
+ f->client = grpc_inproc_channel_create(f->server, client_args, nullptr);
+ GPR_ASSERT(f->client);
+}
+
+void inproc_init_server(grpc_end2end_test_fixture* f,
+ grpc_channel_args* server_args) {
+ if (f->server) {
+ grpc_server_destroy(f->server);
+ }
+ f->server = grpc_server_create(server_args, nullptr);
+ grpc_server_register_completion_queue(f->server, f->cq, nullptr);
+ grpc_server_start(f->server);
+}
+
+void inproc_tear_down(grpc_end2end_test_fixture* f) {
+ inproc_fixture_data* ffd = static_cast(f->fixture_data);
+ gpr_free(ffd);
+}
+
+static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
+ const char* test_name,
+ grpc_channel_args* client_args,
+ grpc_channel_args* server_args) {
+ grpc_end2end_test_fixture f;
+ gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
+ f = config.create_fixture(client_args, server_args);
+ config.init_server(&f, server_args);
+ config.init_client(&f, client_args);
+ return f;
+}
+
+static gpr_timespec n_seconds_from_now(int n) {
+ return grpc_timeout_seconds_to_deadline(n);
+}
+
+static gpr_timespec five_seconds_from_now() { return n_seconds_from_now(5); }
+
+static void drain_cq(grpc_completion_queue* cq) {
+ // Wait for the shutdown callback to arrive, or fail the test
+ GPR_ASSERT(g_shutdown_callback->Wait(five_seconds_from_now()));
+ gpr_log(GPR_DEBUG, "CQ shutdown wait complete");
+ grpc_core::Delete(g_shutdown_callback);
+}
+
+static void shutdown_server(grpc_end2end_test_fixture* f) {
+ if (!f->server) return;
+ grpc_server_shutdown_and_notify(
+ f->server, f->shutdown_cq,
+ reinterpret_cast(static_cast(1000)));
+ GPR_ASSERT(
+ grpc_completion_queue_pluck(f->shutdown_cq, (void*)((intptr_t)1000),
+ grpc_timeout_seconds_to_deadline(5), nullptr)
+ .type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(f->server);
+ f->server = nullptr;
+}
+
+static void shutdown_client(grpc_end2end_test_fixture* f) {
+ if (!f->client) return;
+ grpc_channel_destroy(f->client);
+ f->client = nullptr;
+}
+
+static void end_test(grpc_end2end_test_fixture* f) {
+ shutdown_server(f);
+ shutdown_client(f);
+
+ grpc_completion_queue_shutdown(f->cq);
+ drain_cq(f->cq);
+ grpc_completion_queue_destroy(f->cq);
+ grpc_completion_queue_destroy(f->shutdown_cq);
+}
+
+static void simple_request_body(grpc_end2end_test_config config,
+ grpc_end2end_test_fixture f) {
+ grpc_call* c;
+ grpc_call* s;
+ grpc_op ops[6];
+ grpc_op* op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_status_code status;
+ const char* error_string;
+ grpc_call_error error;
+ grpc_slice details;
+ int was_cancelled = 2;
+ char* peer;
+ gpr_timespec deadline = five_seconds_from_now();
+
+ c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ grpc_slice_from_static_string("/foo"), nullptr,
+ deadline, nullptr);
+ GPR_ASSERT(c);
+
+ peer = grpc_call_get_peer(c);
+ GPR_ASSERT(peer != nullptr);
+ gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
+ gpr_free(peer);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ // Create a basic client unary request batch (no payload)
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.error_string = &error_string;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1),
+ nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ // Register a call at the server-side to match the incoming client call
+ error = grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.cq, f.cq, tag(2));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ // We expect that the server call creation callback (and no others) will
+ // execute now since no other batch should be complete.
+ expect_tag(2, true);
+ verify_tags(deadline);
+
+ peer = grpc_call_get_peer(s);
+ GPR_ASSERT(peer != nullptr);
+ gpr_log(GPR_DEBUG, "server_peer=%s", peer);
+ gpr_free(peer);
+ peer = grpc_call_get_peer(c);
+ GPR_ASSERT(peer != nullptr);
+ gpr_log(GPR_DEBUG, "client_peer=%s", peer);
+ gpr_free(peer);
+
+ // Create the server response batch (no payload)
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
+ grpc_slice status_details = grpc_slice_from_static_string("xyz");
+ op->data.send_status_from_server.status_details = &status_details;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(3),
+ nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ // Both the client request and server response batches should get complete
+ // now and we should see that their callbacks have been executed
+ expect_tag(3, true);
+ expect_tag(1, true);
+ verify_tags(deadline);
+
+ GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
+ GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
+ // the following sanity check makes sure that the requested error string is
+ // correctly populated by the core. It looks for certain substrings that are
+ // not likely to change much. Some parts of the error, like time created,
+ // obviously are not checked.
+ GPR_ASSERT(nullptr != strstr(error_string, "xyz"));
+ GPR_ASSERT(nullptr != strstr(error_string, "description"));
+ GPR_ASSERT(nullptr != strstr(error_string, "Error received from peer"));
+ GPR_ASSERT(nullptr != strstr(error_string, "grpc_message"));
+ GPR_ASSERT(nullptr != strstr(error_string, "grpc_status"));
+ GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
+ GPR_ASSERT(0 == call_details.flags);
+ GPR_ASSERT(was_cancelled == 1);
+
+ grpc_slice_unref(details);
+ gpr_free(static_cast(const_cast(error_string)));
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+
+ grpc_call_unref(c);
+ grpc_call_unref(s);
+
+ int expected_calls = 1;
+ if (config.feature_mask & FEATURE_MASK_SUPPORTS_REQUEST_PROXYING) {
+ expected_calls *= 2;
+ }
+}
+
+static void test_invoke_simple_request(grpc_end2end_test_config config) {
+ grpc_end2end_test_fixture f;
+
+ f = begin_test(config, "test_invoke_simple_request", nullptr, nullptr);
+ simple_request_body(config, f);
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+static void test_invoke_10_simple_requests(grpc_end2end_test_config config) {
+ int i;
+ grpc_end2end_test_fixture f =
+ begin_test(config, "test_invoke_10_simple_requests", nullptr, nullptr);
+ for (i = 0; i < 10; i++) {
+ simple_request_body(config, f);
+ gpr_log(GPR_INFO, "Running test: Passed simple request %d", i);
+ }
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+static void test_invoke_many_simple_requests(grpc_end2end_test_config config) {
+ int i;
+ const int many = 1000;
+ grpc_end2end_test_fixture f =
+ begin_test(config, "test_invoke_many_simple_requests", nullptr, nullptr);
+ gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
+ for (i = 0; i < many; i++) {
+ simple_request_body(config, f);
+ }
+ double us =
+ gpr_timespec_to_micros(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), t1)) /
+ many;
+ gpr_log(GPR_INFO, "Time per ping %f us", us);
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+static void simple_request(grpc_end2end_test_config config) {
+ int i;
+ for (i = 0; i < 10; i++) {
+ test_invoke_simple_request(config);
+ }
+ test_invoke_10_simple_requests(config);
+ test_invoke_many_simple_requests(config);
+}
+
+static void simple_request_pre_init() {
+ gpr_mu_init(&tags_mu);
+ gpr_cv_init(&tags_cv);
+}
+
+/* All test configurations */
+static grpc_end2end_test_config configs[] = {
+ {"inproc-callback", FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, nullptr,
+ inproc_create_fixture, inproc_init_client, inproc_init_server,
+ inproc_tear_down},
+};
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+
+ simple_request_pre_init();
+ simple_request(configs[0]);
+
+ grpc_shutdown();
+
+ return 0;
+}
diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD
index 002671a5fae..675d9e6278c 100644
--- a/test/core/iomgr/BUILD
+++ b/test/core/iomgr/BUILD
@@ -246,6 +246,19 @@ grpc_cc_test(
],
)
+grpc_cc_test(
+ name = "buffer_list_test",
+ srcs = ["buffer_list_test.cc"],
+ language = "C++",
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+)
+
+
grpc_cc_test(
name = "tcp_server_posix_test",
srcs = ["tcp_server_posix_test.cc"],
diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc
new file mode 100644
index 00000000000..f1773580bd2
--- /dev/null
+++ b/test/core/iomgr/buffer_list_test.cc
@@ -0,0 +1,111 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#include "src/core/lib/iomgr/buffer_list.h"
+
+#include
+
+#include "test/core/util/test_config.h"
+
+#ifdef GRPC_LINUX_ERRQUEUE
+
+static void TestShutdownFlushesListVerifier(void* arg,
+ grpc_core::Timestamps* ts,
+ grpc_error* error) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(arg != nullptr);
+ gpr_atm* done = reinterpret_cast(arg);
+ gpr_atm_rel_store(done, static_cast(1));
+}
+
+/** Tests that all TracedBuffer elements in the list are flushed out on
+ * shutdown.
+ * Also tests that arg is passed correctly.
+ */
+static void TestShutdownFlushesList() {
+ grpc_core::grpc_tcp_set_write_timestamps_callback(
+ TestShutdownFlushesListVerifier);
+ grpc_core::TracedBuffer* list = nullptr;
+#define NUM_ELEM 5
+ gpr_atm verifier_called[NUM_ELEM];
+ for (auto i = 0; i < NUM_ELEM; i++) {
+ gpr_atm_rel_store(&verifier_called[i], static_cast(0));
+ grpc_core::TracedBuffer::AddNewEntry(
+ &list, i, static_cast(&verifier_called[i]));
+ }
+ grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
+ GPR_ASSERT(list == nullptr);
+ for (auto i = 0; i < NUM_ELEM; i++) {
+ GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) ==
+ static_cast(1));
+ }
+}
+
+static void TestVerifierCalledOnAckVerifier(void* arg,
+ grpc_core::Timestamps* ts,
+ grpc_error* error) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(arg != nullptr);
+ GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
+ GPR_ASSERT(ts->acked_time.tv_sec == 123);
+ GPR_ASSERT(ts->acked_time.tv_nsec == 456);
+ gpr_atm* done = reinterpret_cast(arg);
+ gpr_atm_rel_store(done, static_cast(1));
+}
+
+/** Tests that the timestamp verifier is called on an ACK timestamp.
+ */
+static void TestVerifierCalledOnAck() {
+ struct sock_extended_err serr;
+ serr.ee_data = 213;
+ serr.ee_info = SCM_TSTAMP_ACK;
+ struct scm_timestamping tss;
+ tss.ts[0].tv_sec = 123;
+ tss.ts[0].tv_nsec = 456;
+ grpc_core::grpc_tcp_set_write_timestamps_callback(
+ TestVerifierCalledOnAckVerifier);
+ grpc_core::TracedBuffer* list = nullptr;
+ gpr_atm verifier_called;
+ gpr_atm_rel_store(&verifier_called, static_cast(0));
+ grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called);
+ grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss);
+ GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast(1));
+ GPR_ASSERT(list == nullptr);
+ grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
+}
+
+static void TestTcpBufferList() {
+ TestVerifierCalledOnAck();
+ TestShutdownFlushesList();
+}
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+ TestTcpBufferList();
+ grpc_shutdown();
+ return 0;
+}
+
+#else /* GRPC_LINUX_ERRQUEUE */
+
+int main(int argc, char** argv) { return 0; }
+
+#endif /* GRPC_LINUX_ERRQUEUE */
diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc
index 8db8ac5ed6f..a9e8ba86c5d 100644
--- a/test/core/iomgr/endpoint_tests.cc
+++ b/test/core/iomgr/endpoint_tests.cc
@@ -150,8 +150,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) {
&state->current_write_data);
grpc_slice_buffer_reset_and_unref(&state->outgoing);
grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
- grpc_endpoint_write(state->write_ep, &state->outgoing,
- &state->done_write);
+ grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
+ nullptr);
gpr_free(slices);
return;
}
@@ -294,7 +294,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
grpc_endpoint_write(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
- grpc_schedule_on_exec_ctx));
+ grpc_schedule_on_exec_ctx),
+ nullptr);
wait_for_fail_count(&fail_count, 3);
grpc_endpoint_shutdown(f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc
index 3e87831e440..6447cc234db 100644
--- a/test/core/iomgr/tcp_posix_test.cc
+++ b/test/core/iomgr/tcp_posix_test.cc
@@ -36,6 +36,9 @@
#include
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/iomgr/buffer_list.h"
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/iomgr/endpoint_tests.h"
#include "test/core/util/test_config.h"
@@ -68,6 +71,43 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
+static void create_inet_sockets(int sv[2]) {
+ /* Prepare listening socket */
+ struct sockaddr_in addr;
+ memset(&addr, 0, sizeof(struct sockaddr_in));
+ addr.sin_family = AF_INET;
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ GPR_ASSERT(sock);
+ GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
+ listen(sock, 1);
+
+ /* Prepare client socket and connect to server */
+ socklen_t len = sizeof(sockaddr_in);
+ GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
+
+ int client = socket(AF_INET, SOCK_STREAM, 0);
+ GPR_ASSERT(client);
+ int ret;
+ do {
+ ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in));
+ } while (ret == -1 && errno == EINTR);
+
+ /* Accept client connection */
+ len = sizeof(socklen_t);
+ int server;
+ do {
+ server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len);
+ } while (server == -1 && errno == EINTR);
+ GPR_ASSERT(server != -1);
+
+ sv[0] = server;
+ sv[1] = client;
+ int flags = fcntl(sv[0], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
+ flags = fcntl(sv[1], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
+}
+
static ssize_t fill_socket(int fd) {
ssize_t write_bytes;
ssize_t total_bytes = 0;
@@ -289,11 +329,10 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
static void write_done(void* user_data /* write_socket_state */,
grpc_error* error) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
struct write_socket_state* state =
static_cast(user_data);
- gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(g_mu);
- gpr_log(GPR_INFO, "Signalling write done");
state->write_done = 1;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
@@ -340,10 +379,24 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
gpr_free(buf);
}
+/* Verifier for timestamps callback for write_test */
+void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
+ grpc_error* error) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(arg != nullptr);
+ GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME);
+ GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME);
+ GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
+ gpr_atm* done_timestamps = (gpr_atm*)arg;
+ gpr_atm_rel_store(done_timestamps, static_cast(1));
+}
+
/* Write to a socket using the grpc_tcp API, then drain it directly.
Note that if the write does not complete immediately we need to drain the
- socket in parallel with the read. */
-static void write_test(size_t num_bytes, size_t slice_size) {
+ socket in parallel with the read. If collect_timestamps is true, it will
+ try to get timestamps for the write. */
+static void write_test(size_t num_bytes, size_t slice_size,
+ bool collect_timestamps) {
int sv[2];
grpc_endpoint* ep;
struct write_socket_state state;
@@ -356,19 +409,27 @@ static void write_test(size_t num_bytes, size_t slice_size) {
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
grpc_core::ExecCtx exec_ctx;
+ if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
+ return;
+ }
+
gpr_log(GPR_INFO,
"Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
num_bytes, slice_size);
- create_sockets(sv);
+ if (collect_timestamps) {
+ create_inet_sockets(sv);
+ } else {
+ create_sockets(sv);
+ }
grpc_arg a[1];
a[0].key = const_cast(GRPC_ARG_TCP_READ_CHUNK_SIZE);
a[0].type = GRPC_ARG_INTEGER,
a[0].value.integer = static_cast(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args,
- "test");
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
+ &args, "test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
state.ep = ep;
@@ -381,18 +442,26 @@ static void write_test(size_t num_bytes, size_t slice_size) {
GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
grpc_schedule_on_exec_ctx);
- grpc_endpoint_write(ep, &outgoing, &write_done_closure);
+ gpr_atm done_timestamps;
+ gpr_atm_rel_store(&done_timestamps, static_cast(0));
+ grpc_endpoint_write(ep, &outgoing, &write_done_closure,
+ grpc_event_engine_can_track_errors() && collect_timestamps
+ ? (void*)&done_timestamps
+ : nullptr);
drain_socket_blocking(sv[0], num_bytes, num_bytes);
+ exec_ctx.Flush();
gpr_mu_lock(g_mu);
for (;;) {
grpc_pollset_worker* worker = nullptr;
- if (state.write_done) {
+ if (state.write_done &&
+ (!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
+ gpr_atm_acq_load(&done_timestamps) == static_cast(1))) {
break;
}
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
-
+ exec_ctx.Flush();
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
@@ -497,14 +566,21 @@ void run_tests(void) {
large_read_test(8192);
large_read_test(1);
- write_test(100, 8192);
- write_test(100, 1);
- write_test(100000, 8192);
- write_test(100000, 1);
- write_test(100000, 137);
+ write_test(100, 8192, false);
+ write_test(100, 1, false);
+ write_test(100000, 8192, false);
+ write_test(100000, 1, false);
+ write_test(100000, 137, false);
+
+ write_test(100, 8192, true);
+ write_test(100, 1, true);
+ write_test(100000, 8192, true);
+ write_test(100000, 1, true);
+ write_test(100, 137, true);
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
- write_test(40320, i);
+ write_test(40320, i, false);
+ write_test(40320, i, true);
}
release_fd_test(100, 8192);
@@ -549,6 +625,7 @@ int main(int argc, char** argv) {
grpc_closure destroyed;
grpc_test_init(argc, argv);
grpc_init();
+ grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
{
grpc_core::ExecCtx exec_ctx;
g_pollset = static_cast(gpr_zalloc(grpc_pollset_size()));
diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc
index 1156cd5fc5c..ef6fd62b516 100644
--- a/test/core/util/mock_endpoint.cc
+++ b/test/core/util/mock_endpoint.cc
@@ -55,7 +55,7 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
mock_endpoint* m = reinterpret_cast(ep);
for (size_t i = 0; i < slices->count; i++) {
m->on_write(slices->slices[i]);
diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc
index 59582167478..3cc8ad6fe14 100644
--- a/test/core/util/passthru_endpoint.cc
+++ b/test/core/util/passthru_endpoint.cc
@@ -76,7 +76,7 @@ static half* other_half(half* h) {
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
half* m = other_half(reinterpret_cast(ep));
gpr_mu_lock(&m->parent->mu);
grpc_error* error = GRPC_ERROR_NONE;
diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc
index f2efb049b49..62ed72a6295 100644
--- a/test/core/util/trickle_endpoint.cc
+++ b/test/core/util/trickle_endpoint.cc
@@ -62,7 +62,7 @@ static void maybe_call_write_cb_locked(trickle_endpoint* te) {
}
static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
trickle_endpoint* te = reinterpret_cast(ep);
gpr_mu_lock(&te->mu);
GPR_ASSERT(te->write_cb == nullptr);
@@ -186,7 +186,8 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) {
te->last_write = now;
grpc_endpoint_write(
te->wrapped, &te->writing_buffer,
- GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx));
+ GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx),
+ nullptr);
maybe_call_write_cb_locked(te);
}
}
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 1e9bd273aaf..189923a841e 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -96,7 +96,7 @@ class DummyEndpoint : public grpc_endpoint {
}
static void write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
}
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 43ebf8cad95..e6d4b1d98fb 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1067,6 +1067,7 @@ src/core/lib/http/format_request.h \
src/core/lib/http/httpcli.h \
src/core/lib/http/parser.h \
src/core/lib/iomgr/block_annotate.h \
+src/core/lib/iomgr/buffer_list.h \
src/core/lib/iomgr/call_combiner.h \
src/core/lib/iomgr/closure.h \
src/core/lib/iomgr/combiner.h \
@@ -1082,6 +1083,7 @@ src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/exec_ctx.h \
src/core/lib/iomgr/executor.h \
src/core/lib/iomgr/gethostname.h \
+src/core/lib/iomgr/internal_errqueue.h \
src/core/lib/iomgr/iocp_windows.h \
src/core/lib/iomgr/iomgr.h \
src/core/lib/iomgr/iomgr_custom.h \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index c1706fd070b..7cd1dc7bf37 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1159,6 +1159,8 @@ src/core/lib/http/parser.cc \
src/core/lib/http/parser.h \
src/core/lib/iomgr/README.md \
src/core/lib/iomgr/block_annotate.h \
+src/core/lib/iomgr/buffer_list.cc \
+src/core/lib/iomgr/buffer_list.h \
src/core/lib/iomgr/call_combiner.cc \
src/core/lib/iomgr/call_combiner.h \
src/core/lib/iomgr/closure.h \
@@ -1194,6 +1196,8 @@ src/core/lib/iomgr/gethostname.h \
src/core/lib/iomgr/gethostname_fallback.cc \
src/core/lib/iomgr/gethostname_host_name_max.cc \
src/core/lib/iomgr/gethostname_sysconf.cc \
+src/core/lib/iomgr/internal_errqueue.cc \
+src/core/lib/iomgr/internal_errqueue.h \
src/core/lib/iomgr/iocp_windows.cc \
src/core/lib/iomgr/iocp_windows.h \
src/core/lib/iomgr/iomgr.cc \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 3cb3fe20daa..8ea5126fdef 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -163,6 +163,23 @@
"third_party": false,
"type": "target"
},
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc_test_util"
+ ],
+ "headers": [],
+ "is_filegroup": false,
+ "language": "c",
+ "name": "buffer_list_test",
+ "src": [
+ "test/core/iomgr/buffer_list_test.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
{
"deps": [
"gpr",
@@ -1410,6 +1427,26 @@
"third_party": false,
"type": "target"
},
+ {
+ "deps": [
+ "gpr",
+ "gpr_test_util",
+ "grpc",
+ "grpc_test_util"
+ ],
+ "headers": [
+ "test/core/end2end/end2end_tests.h"
+ ],
+ "is_filegroup": false,
+ "language": "c",
+ "name": "inproc_callback_test",
+ "src": [
+ "test/core/end2end/end2end_tests.h",
+ "test/core/end2end/inproc_callback_test.cc"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
{
"deps": [
"gpr",
@@ -9468,6 +9505,7 @@
"src/core/lib/http/format_request.cc",
"src/core/lib/http/httpcli.cc",
"src/core/lib/http/parser.cc",
+ "src/core/lib/iomgr/buffer_list.cc",
"src/core/lib/iomgr/call_combiner.cc",
"src/core/lib/iomgr/combiner.cc",
"src/core/lib/iomgr/endpoint.cc",
@@ -9488,6 +9526,7 @@
"src/core/lib/iomgr/gethostname_fallback.cc",
"src/core/lib/iomgr/gethostname_host_name_max.cc",
"src/core/lib/iomgr/gethostname_sysconf.cc",
+ "src/core/lib/iomgr/internal_errqueue.cc",
"src/core/lib/iomgr/iocp_windows.cc",
"src/core/lib/iomgr/iomgr.cc",
"src/core/lib/iomgr/iomgr_custom.cc",
@@ -9647,6 +9686,7 @@
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/block_annotate.h",
+ "src/core/lib/iomgr/buffer_list.h",
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/combiner.h",
@@ -9662,6 +9702,7 @@
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/gethostname.h",
+ "src/core/lib/iomgr/internal_errqueue.h",
"src/core/lib/iomgr/iocp_windows.h",
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_custom.h",
@@ -9797,6 +9838,7 @@
"src/core/lib/http/httpcli.h",
"src/core/lib/http/parser.h",
"src/core/lib/iomgr/block_annotate.h",
+ "src/core/lib/iomgr/buffer_list.h",
"src/core/lib/iomgr/call_combiner.h",
"src/core/lib/iomgr/closure.h",
"src/core/lib/iomgr/combiner.h",
@@ -9812,6 +9854,7 @@
"src/core/lib/iomgr/exec_ctx.h",
"src/core/lib/iomgr/executor.h",
"src/core/lib/iomgr/gethostname.h",
+ "src/core/lib/iomgr/internal_errqueue.h",
"src/core/lib/iomgr/iocp_windows.h",
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_custom.h",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index a51be28ad51..fba76d69d1e 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -195,6 +195,26 @@
],
"uses_polling": false
},
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [
+ "uv"
+ ],
+ "flaky": false,
+ "gtest": false,
+ "language": "c",
+ "name": "buffer_list_test",
+ "platforms": [
+ "linux"
+ ],
+ "uses_polling": true
+ },
{
"args": [],
"benchmark": false,
@@ -1697,6 +1717,30 @@
],
"uses_polling": false
},
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": false,
+ "language": "c",
+ "name": "inproc_callback_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": false
+ },
{
"args": [],
"benchmark": false,