Merge github.com:grpc/grpc into bm_stream

pull/9487/head
Craig Tiller 8 years ago
commit 02cae86b34
  1. 3
      BUILD
  2. 24
      CMakeLists.txt
  3. 36
      Makefile
  4. 14
      build.yaml
  5. 6
      src/core/ext/client_channel/connector.c
  6. 7
      src/core/ext/client_channel/connector.h
  7. 10
      src/core/ext/client_channel/http_connect_handshaker.c
  8. 3
      src/core/ext/client_channel/subchannel.c
  9. 14
      src/core/ext/transport/chttp2/client/chttp2_connector.c
  10. 12
      src/core/ext/transport/chttp2/server/chttp2_server.c
  11. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  12. 2
      src/core/ext/transport/cronet/transport/cronet_transport.c
  13. 14
      src/core/lib/channel/handshaker.c
  14. 8
      src/core/lib/channel/handshaker.h
  15. 5
      src/core/lib/iomgr/endpoint.c
  16. 5
      src/core/lib/iomgr/endpoint.h
  17. 17
      src/core/lib/iomgr/ev_epoll_linux.c
  18. 17
      src/core/lib/iomgr/ev_poll_posix.c
  19. 4
      src/core/lib/iomgr/ev_posix.c
  20. 4
      src/core/lib/iomgr/ev_posix.h
  21. 3
      src/core/lib/iomgr/network_status_tracker.c
  22. 3
      src/core/lib/iomgr/tcp_client_posix.c
  23. 5
      src/core/lib/iomgr/tcp_posix.c
  24. 6
      src/core/lib/iomgr/tcp_server_posix.c
  25. 4
      src/core/lib/iomgr/tcp_uv.c
  26. 27
      src/core/lib/iomgr/tcp_windows.c
  27. 3
      src/core/lib/iomgr/udp_server.c
  28. 12
      src/core/lib/iomgr/unix_sockets_posix.c
  29. 6
      src/core/lib/security/transport/secure_endpoint.c
  30. 13
      src/core/lib/security/transport/security_handshaker.c
  31. 6
      src/core/lib/transport/metadata_batch.c
  32. 6
      src/proto/grpc/testing/BUILD
  33. 6
      test/core/bad_client/bad_client.c
  34. 4
      test/core/client_channel/set_initial_connect_string_test.c
  35. 3
      test/core/end2end/bad_server_response_test.c
  36. 9
      test/core/end2end/fixtures/http_proxy.c
  37. 30
      test/core/end2end/fuzzers/api_fuzzer.c
  38. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/0242a9f4d4fafc96ee9ed762b610e3c68d6efdec
  39. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/0cd9696699bd190463ecef91968624601b64cd8b
  40. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/122b6fc72956541812dd653b726b073b77ca33be
  41. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/315d27e12f2214a56fb9901dacff14852ff2ac0f
  42. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/3f2429e3255ae36fecb57559b57d2b0cb88f5dd1
  43. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/41bda7ff09175f821992adf4314a8ec3007ffe55
  44. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/49d0085058d7fa81247f51b802c0f4206854b4dc
  45. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/614dbc86b17270ef1d5ab705ecbe88c742815ce7
  46. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/6cb17148d52be437332b6fd6f2fc8328bfb63fb0
  47. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/6ea192b1d4c4577ca7511f8ce5027b31b2e0d75d
  48. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/746477e7e8f093f87cb6924ab6476cda9689607d
  49. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/7752153d87017b85112a49ea95aa25ca78d24431
  50. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/7e75ea44aa7347c2f827beecb27e3bf5b1907b8a
  51. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/95e73caecc0ab06beaa9b84125adcb2e6eee2eff
  52. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/9e273a94bf3c60f1c7875874c81d0b9309428752
  53. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/a65bda38b60ae084a5dcc3b616660aa338feef17
  54. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/b39f27387a256019038cddb91f65651c01afb825
  55. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/b6f721156f8dc6a353555929e459e61bab8b394a
  56. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/bbb2429766a7c4ef9cb7110d567fd48cd6507dc5
  57. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/bc330aa616a792ff22a8c7428dcdb4d99accbe4b
  58. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/cd4ccfa79f65f31716296e690f3a76007edde2e3
  59. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/crash-5d73de981fb75553a7b2606e111716ee9f2af844
  60. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/e6b74f64e8bdfdf98177aee58b8729ff2aa7ffb2
  61. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/edecc59c5809796f266abd8df4d5ecf6aae304ca
  62. BIN
      test/core/end2end/fuzzers/api_fuzzer_corpus/f1b2889ae7091d6a14332343fe7a2bffd81039a7
  63. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/0c129f78eacfb0d0d3c89dd4e578724096a3cea0
  64. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/3b55d09b98e3982d6f80913a792463c3974766db
  65. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/416160124b3b64fc9355f24dd789b3d1fd097b8b
  66. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/92f1df2266f34a097e96dd22188d8633832d37b1
  67. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/a40c3ba87b4206142b134f67485859b7c9b7c75c
  68. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/crash-73923add5066617ae08f187b79d2639b4fd96138
  69. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/fcb1dea251d1ce74e30351f13a3f71e3debec3d2
  70. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/fe740f8c4ffd07f79456c8cee24ef556ee348f55
  71. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/slow-unit-03c6f209b2f144734c83d81ed452839d9e244fe9
  72. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/slow-unit-129ecb5e7b80616f36791e3580844e520f2ba7d3
  73. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/slow-unit-30408c9d13f29804168fc62a0818cc894c6375ae
  74. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/slow-unit-33d8bf197de7131be78244e10fbb0da5055cf266
  75. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/slow-unit-51cdbfa3e97a46ceefde405e6ab087a109c26907
  76. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/slow-unit-523cb1bca5ad56690c618b4ceac7fceca1113b9d
  77. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/slow-unit-a877fe99fd0e92721d162bc252bf72a4f67ba1ea
  78. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/slow-unit-bd9d24f5c7c915174b6ca9d1a3573e16e0edee12
  79. BIN
      test/core/end2end/fuzzers/server_fuzzer_corpus/slow-unit-f3c688876395bf7a529f29f7b91532726cf5cbce
  80. 2
      test/core/internal_api_canaries/iomgr.c
  81. 12
      test/core/iomgr/endpoint_tests.c
  82. 3
      test/core/iomgr/ev_epoll_linux_test.c
  83. 3
      test/core/iomgr/fd_posix_test.c
  84. 188
      test/core/iomgr/resolve_address_posix_test.c
  85. 3
      test/core/iomgr/tcp_client_posix_test.c
  86. 2
      test/core/iomgr/tcp_server_posix_test.c
  87. 6
      test/core/security/secure_endpoint_test.c
  88. 3
      test/core/security/ssl_server_fuzzer.c
  89. 2
      test/core/surface/concurrent_connectivity_test.c
  90. 8
      test/core/util/mock_endpoint.c
  91. 10
      test/core/util/passthru_endpoint.c
  92. 2
      test/core/util/reconnect_server.c
  93. 17
      tools/run_tests/generated/sources_and_headers.json
  94. 1114
      tools/run_tests/generated/tests.json
  95. 56
      tools/run_tests/performance/scenario_config.py

@ -1233,6 +1233,9 @@ grpc_cc_library(
public_hdrs = [
"include/grpc++/impl/codegen/config_protobuf.h",
],
external_deps = [
"protobuf",
],
)
grpc_cc_library(

@ -6922,6 +6922,30 @@ target_link_libraries(percent_encoding_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(resolve_address_posix_test
test/core/iomgr/resolve_address_posix_test.c
)
target_include_directories(resolve_address_posix_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${BORINGSSL_ROOT_DIR}/include
PRIVATE ${PROTOBUF_ROOT_DIR}/src
PRIVATE ${ZLIB_ROOT_DIR}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
)
target_link_libraries(resolve_address_posix_test
grpc_test_util
grpc
gpr_test_util
gpr
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(resolve_address_test
test/core/iomgr/resolve_address_test.c
)

@ -1005,6 +1005,7 @@ no_server_test: $(BINDIR)/$(CONFIG)/no_server_test
percent_decode_fuzzer: $(BINDIR)/$(CONFIG)/percent_decode_fuzzer
percent_encode_fuzzer: $(BINDIR)/$(CONFIG)/percent_encode_fuzzer
percent_encoding_test: $(BINDIR)/$(CONFIG)/percent_encoding_test
resolve_address_posix_test: $(BINDIR)/$(CONFIG)/resolve_address_posix_test
resolve_address_test: $(BINDIR)/$(CONFIG)/resolve_address_test
resource_quota_test: $(BINDIR)/$(CONFIG)/resource_quota_test
secure_channel_create_test: $(BINDIR)/$(CONFIG)/secure_channel_create_test
@ -1348,6 +1349,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/murmur_hash_test \
$(BINDIR)/$(CONFIG)/no_server_test \
$(BINDIR)/$(CONFIG)/percent_encoding_test \
$(BINDIR)/$(CONFIG)/resolve_address_posix_test \
$(BINDIR)/$(CONFIG)/resolve_address_test \
$(BINDIR)/$(CONFIG)/resource_quota_test \
$(BINDIR)/$(CONFIG)/secure_channel_create_test \
@ -1754,6 +1756,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/no_server_test || ( echo test no_server_test failed ; exit 1 )
$(E) "[RUN] Testing percent_encoding_test"
$(Q) $(BINDIR)/$(CONFIG)/percent_encoding_test || ( echo test percent_encoding_test failed ; exit 1 )
$(E) "[RUN] Testing resolve_address_posix_test"
$(Q) $(BINDIR)/$(CONFIG)/resolve_address_posix_test || ( echo test resolve_address_posix_test failed ; exit 1 )
$(E) "[RUN] Testing resolve_address_test"
$(Q) $(BINDIR)/$(CONFIG)/resolve_address_test || ( echo test resolve_address_test failed ; exit 1 )
$(E) "[RUN] Testing resource_quota_test"
@ -11101,6 +11105,38 @@ endif
endif
RESOLVE_ADDRESS_POSIX_TEST_SRC = \
test/core/iomgr/resolve_address_posix_test.c \
RESOLVE_ADDRESS_POSIX_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(RESOLVE_ADDRESS_POSIX_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/resolve_address_posix_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/resolve_address_posix_test: $(RESOLVE_ADDRESS_POSIX_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) $(RESOLVE_ADDRESS_POSIX_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/resolve_address_posix_test
endif
$(OBJDIR)/$(CONFIG)/test/core/iomgr/resolve_address_posix_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_resolve_address_posix_test: $(RESOLVE_ADDRESS_POSIX_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(RESOLVE_ADDRESS_POSIX_TEST_OBJS:.o=.dep)
endif
endif
RESOLVE_ADDRESS_TEST_SRC = \
test/core/iomgr/resolve_address_test.c \

@ -2489,6 +2489,20 @@ targets:
- grpc
- gpr_test_util
- gpr
- name: resolve_address_posix_test
build: test
language: c
src:
- test/core/iomgr/resolve_address_posix_test.c
deps:
- grpc_test_util
- grpc
- gpr_test_util
- gpr
platforms:
- mac
- linux
- posix
- name: resolve_address_test
build: test
language: c

@ -49,7 +49,7 @@ void grpc_connector_connect(grpc_exec_ctx* exec_ctx, grpc_connector* connector,
connector->vtable->connect(exec_ctx, connector, in_args, out_args, notify);
}
void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx,
grpc_connector* connector) {
connector->vtable->shutdown(exec_ctx, connector);
void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx, grpc_connector* connector,
grpc_error* why) {
connector->vtable->shutdown(exec_ctx, connector, why);
}

@ -68,7 +68,8 @@ struct grpc_connector_vtable {
void (*ref)(grpc_connector *connector);
void (*unref)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
/** Implementation of grpc_connector_shutdown */
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
grpc_error *why);
/** Implementation of grpc_connector_connect */
void (*connect)(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
const grpc_connect_in_args *in_args,
@ -83,7 +84,7 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
grpc_connect_out_args *out_args,
grpc_closure *notify);
/** Cancel any pending connection */
void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_connector *connector);
void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
grpc_error *why);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H */

@ -123,7 +123,8 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint,
GRPC_ERROR_REF(error));
// Not shutting down, so the handshake failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(exec_ctx, handshaker);
@ -251,15 +252,18 @@ static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
}
static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker_in) {
grpc_handshaker* handshaker_in,
grpc_error* why) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
gpr_mu_lock(&handshaker->mu);
if (!handshaker->shutdown) {
handshaker->shutdown = true;
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint,
GRPC_ERROR_REF(why));
cleanup_args_for_failure_locked(exec_ctx, handshaker);
}
gpr_mu_unlock(&handshaker->mu);
GRPC_ERROR_UNREF(why);
}
static void http_connect_handshaker_do_handshake(

@ -273,7 +273,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
c->disconnected = true;
grpc_connector_shutdown(exec_ctx, c->connector);
grpc_connector_shutdown(exec_ctx, c->connector,
GRPC_ERROR_CREATE("Subchannel disconnected"));
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");

@ -92,19 +92,21 @@ static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx,
}
static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_connector *con) {
grpc_connector *con, grpc_error *why) {
chttp2_connector *c = (chttp2_connector *)con;
gpr_mu_lock(&c->mu);
c->shutdown = true;
if (c->handshake_mgr != NULL) {
grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr);
grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr,
GRPC_ERROR_REF(why));
}
// If handshaking is not yet in progress, shutdown the endpoint.
// Otherwise, the handshaker will do this for us.
if (!c->connecting && c->endpoint != NULL) {
grpc_endpoint_shutdown(exec_ctx, c->endpoint);
grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(why));
}
gpr_mu_unlock(&c->mu);
GRPC_ERROR_UNREF(why);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
@ -121,7 +123,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, args->endpoint);
grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_REF(error));
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(exec_ctx, args->args);
grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
@ -195,7 +197,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_closure_sched(exec_ctx, notify, error);
if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
if (c->endpoint != NULL) {
grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error));
}
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg);
} else {

@ -101,16 +101,19 @@ static void pending_handshake_manager_remove_locked(
}
static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx,
server_state *state) {
server_state *state,
grpc_error *why) {
pending_handshake_manager_node *prev_node = NULL;
for (pending_handshake_manager_node *node = state->pending_handshake_mgrs;
node != NULL; node = node->next) {
grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr);
grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr,
GRPC_ERROR_REF(why));
gpr_free(prev_node);
prev_node = node;
}
gpr_free(prev_node);
state->pending_handshake_mgrs = NULL;
GRPC_ERROR_UNREF(why);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
@ -129,7 +132,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, args->endpoint);
grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_NONE);
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(exec_ctx, args->args);
grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
@ -210,7 +213,8 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&state->mu);
grpc_closure *destroy_done = state->server_destroy_listener_done;
GPR_ASSERT(state->shutdown);
pending_handshake_manager_shutdown_locked(exec_ctx, state);
pending_handshake_manager_shutdown_locked(exec_ctx, state,
GRPC_ERROR_REF(error));
gpr_mu_unlock(&state->mu);
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.

@ -417,7 +417,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
t->closed = 1;
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
grpc_endpoint_shutdown(exec_ctx, t->ep);
grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream *s;

@ -655,8 +655,6 @@ static void convert_metadata_to_cronet_headers(
headers[num_headers].key = key;
headers[num_headers].value = value;
num_headers++;
gpr_free(key);
gpr_free(value);
if (curr == NULL) {
break;
}

@ -55,8 +55,8 @@ void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
}
void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) {
handshaker->vtable->shutdown(exec_ctx, handshaker);
grpc_handshaker* handshaker, grpc_error* why) {
handshaker->vtable->shutdown(exec_ctx, handshaker, why);
}
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
@ -141,14 +141,17 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
}
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr) {
grpc_handshake_manager* mgr,
grpc_error* why) {
gpr_mu_lock(&mgr->mu);
// Shutdown the handshaker that's currently in progress, if any.
if (!mgr->shutdown && mgr->index > 0) {
mgr->shutdown = true;
grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]);
grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1],
GRPC_ERROR_REF(why));
}
gpr_mu_unlock(&mgr->mu);
GRPC_ERROR_UNREF(why);
}
// Helper function to call either the next handshaker or the
@ -197,7 +200,8 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg,
static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_handshake_manager* mgr = arg;
if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled.
grpc_handshake_manager_shutdown(exec_ctx, mgr);
grpc_handshake_manager_shutdown(exec_ctx, mgr,
GRPC_ERROR_CREATE("Handshake timed out"));
}
grpc_handshake_manager_unref(exec_ctx, mgr);
}

@ -86,7 +86,8 @@ typedef struct {
/// Shuts down the handshaker (e.g., to clean up when the operation is
/// aborted in the middle).
void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
grpc_error* why);
/// Performs handshaking, modifying \a args as needed (e.g., to
/// replace \a endpoint with a wrapped endpoint).
@ -111,7 +112,7 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
grpc_handshaker* handshaker, grpc_error* why);
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_tcp_server_acceptor* acceptor,
@ -141,7 +142,8 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
/// The caller must still call grpc_handshake_manager_destroy() after
/// calling this function.
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr);
grpc_handshake_manager* mgr,
grpc_error* why);
/// Invokes handshakers in the order they were added.
/// Takes ownership of \a endpoint, and then passes that ownership to

@ -54,8 +54,9 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx,
ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set);
}
void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
ep->vtable->shutdown(exec_ctx, ep);
void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_error* why) {
ep->vtable->shutdown(exec_ctx, ep, why);
}
void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {

@ -57,7 +57,7 @@ struct grpc_endpoint_vtable {
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep);
char *(*get_peer)(grpc_endpoint *ep);
@ -96,7 +96,8 @@ void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Causes any pending and future read/write callbacks to run immediately with
success==0 */
void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why);
void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
/* Add an endpoint to a pollset, so that when the pollset is polled, events from

@ -143,6 +143,7 @@ struct grpc_fd {
/* Indicates that the fd is shutdown and that any pending read/write closures
should fail */
bool shutdown;
grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */
/* The fd is either closed or we relinquished control of it. In either cases,
this indicates that the 'fd' on this structure is no longer valid */
@ -907,6 +908,7 @@ static void unref_by(grpc_fd *fd, int n) {
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_mu_unlock(&fd_freelist_mu);
} else {
@ -1058,11 +1060,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
GRPC_ERROR_UNREF(error);
}
static grpc_error *fd_shutdown_error(bool shutdown) {
if (!shutdown) {
static grpc_error *fd_shutdown_error(grpc_fd *fd) {
if (!fd->shutdown) {
return GRPC_ERROR_NONE;
} else {
return GRPC_ERROR_CREATE("FD shutdown");
return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
}
}
@ -1076,7 +1078,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@ -1098,7 +1100,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@ -1123,17 +1125,20 @@ static bool fd_is_shutdown(grpc_fd *fd) {
}
/* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
gpr_mu_lock(&fd->po.mu);
/* Do the actual shutdown only once */
if (!fd->shutdown) {
fd->shutdown = true;
fd->shutdown_error = why;
shutdown(fd->fd, SHUT_RDWR);
/* Flush any pending read and write closures. Since fd->shutdown is 'true'
at this point, the closures would be called with 'success = false' */
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
} else {
GRPC_ERROR_UNREF(why);
}
gpr_mu_unlock(&fd->po.mu);
}

@ -82,6 +82,7 @@ struct grpc_fd {
int shutdown;
int closed;
int released;
grpc_error *shutdown_error;
/* The watcher list.
@ -306,6 +307,7 @@ static void unref_by(grpc_fd *fd, int n) {
if (old == n) {
gpr_mu_destroy(&fd->mu);
grpc_iomgr_unregister_object(&fd->iomgr_object);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_free(fd);
} else {
GPR_ASSERT(old > n);
@ -444,11 +446,11 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static grpc_error *fd_shutdown_error(bool shutdown) {
if (!shutdown) {
static grpc_error *fd_shutdown_error(grpc_fd *fd) {
if (!fd->shutdown) {
return GRPC_ERROR_NONE;
} else {
return GRPC_ERROR_CREATE("FD shutdown");
return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
}
}
@ -462,7 +464,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@ -485,7 +487,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@ -496,15 +498,18 @@ static void set_read_notifier_pollset_locked(
fd->read_notifier_pollset = read_notifier_pollset;
}
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
gpr_mu_lock(&fd->mu);
/* only shutdown once */
if (!fd->shutdown) {
fd->shutdown = 1;
fd->shutdown_error = why;
/* signal read/write closed to OS so that future operations fail */
shutdown(fd->fd, SHUT_RDWR);
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
} else {
GRPC_ERROR_UNREF(why);
}
gpr_mu_unlock(&fd->mu);
}

@ -162,8 +162,8 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason);
}
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
g_event_engine->fd_shutdown(exec_ctx, fd);
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
g_event_engine->fd_shutdown(exec_ctx, fd, why);
}
bool grpc_fd_is_shutdown(grpc_fd *fd) {

@ -51,7 +51,7 @@ typedef struct grpc_event_engine_vtable {
int (*fd_wrapped_fd)(grpc_fd *fd);
void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason);
void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@ -140,7 +140,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
bool grpc_fd_is_shutdown(grpc_fd *fd);
/* Cause any current and future callbacks to fail. */
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
/* Register read interest, causing read_cb to be called once when fd becomes
readable, on deadline specified by deadline, or on shutdown triggered by

@ -117,7 +117,8 @@ void grpc_network_status_shutdown_all_endpoints() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) {
curr->ep->vtable->shutdown(&exec_ctx, curr->ep);
curr->ep->vtable->shutdown(&exec_ctx, curr->ep,
GRPC_ERROR_CREATE("Network unavailable"));
}
gpr_mu_unlock(&g_endpoint_mutex);
grpc_exec_ctx_finish(&exec_ctx);

@ -121,7 +121,8 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
}
gpr_mu_lock(&ac->mu);
if (ac->fd != NULL) {
grpc_fd_shutdown(exec_ctx, ac->fd);
grpc_fd_shutdown(exec_ctx, ac->fd,
GRPC_ERROR_CREATE("connect() timed out"));
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);

@ -119,9 +119,10 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_fd_shutdown(exec_ctx, tcp->em_fd);
grpc_fd_shutdown(exec_ctx, tcp->em_fd, why);
grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
}

@ -276,7 +276,8 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->active_ports) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(exec_ctx, sp->emfd);
grpc_fd_shutdown(exec_ctx, sp->emfd,
GRPC_ERROR_CREATE("Server destroyed"));
}
gpr_mu_unlock(&s->mu);
} else {
@ -773,7 +774,8 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
if (s->active_ports) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(exec_ctx, sp->emfd);
grpc_fd_shutdown(exec_ctx, sp->emfd,
GRPC_ERROR_CREATE("Server shutdown"));
}
}
gpr_mu_unlock(&s->mu);

@ -298,13 +298,15 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void shutdown_callback(uv_shutdown_t *req, int status) {}
static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_tcp *tcp = (grpc_tcp *)ep;
if (!tcp->shutting_down) {
tcp->shutting_down = true;
uv_shutdown_t *req = &tcp->shutdown_req;
uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
}
GRPC_ERROR_UNREF(why);
}
static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {

@ -116,6 +116,7 @@ typedef struct grpc_tcp {
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
grpc_error *shutdown_error;
char *peer_string;
} grpc_tcp;
@ -125,6 +126,7 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
gpr_free(tcp);
}
@ -182,7 +184,10 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
grpc_slice_buffer_add(tcp->read_slices, sub);
} else {
grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
error = GRPC_ERROR_CREATE("End of TCP stream");
error = tcp->shutting_down
? GRPC_ERROR_CREATE_REFERENCING("TCP stream shutting down",
&tcp->shutdown_error, 1)
: GRPC_ERROR_CREATE("End of TCP stream");
}
}
}
@ -203,8 +208,9 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
grpc_closure_sched(exec_ctx, cb,
GRPC_ERROR_CREATE("TCP socket is shutting down"));
grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING(
"TCP socket is shutting down",
&tcp->shutdown_error, 1));
return;
}
@ -291,8 +297,9 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t len;
if (tcp->shutting_down) {
grpc_closure_sched(exec_ctx, cb,
GRPC_ERROR_CREATE("TCP socket is shutting down"));
grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING(
"TCP socket is shutting down",
&tcp->shutdown_error, 1));
return;
}
@ -373,12 +380,18 @@ static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
we're not going to protect against these. However the IO Completion Port
callback will happen from another thread, so we need to protect against
concurrent access of the data structure in that regard. */
static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_tcp *tcp = (grpc_tcp *)ep;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
if (!tcp->shutting_down) {
tcp->shutting_down = 1;
tcp->shutdown_error = why;
} else {
GRPC_ERROR_UNREF(why);
}
grpc_winsocket_shutdown(tcp->socket);
gpr_mu_unlock(&tcp->mu);
grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);

@ -203,7 +203,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(sp->emfd);
grpc_fd_shutdown(exec_ctx, sp->emfd);
grpc_fd_shutdown(exec_ctx, sp->emfd,
GRPC_ERROR_CREATE("Server destroyed"));
}
gpr_mu_unlock(&s->mu);
} else {

@ -45,6 +45,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
void grpc_create_socketpair_if_unix(int sv[2]) {
GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
@ -53,7 +54,16 @@ void grpc_create_socketpair_if_unix(int sv[2]) {
grpc_error *grpc_resolve_unix_domain_address(const char *name,
grpc_resolved_addresses **addrs) {
struct sockaddr_un *un;
if (strlen(name) > GPR_ARRAY_SIZE(((struct sockaddr_un *)0)->sun_path) - 1) {
char *err_msg;
grpc_error *err;
gpr_asprintf(&err_msg,
"Path name should not have more than %" PRIuPTR " characters.",
GPR_ARRAY_SIZE(un->sun_path) - 1);
err = GRPC_ERROR_CREATE(err_msg);
gpr_free(err_msg);
return err;
}
*addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
(*addrs)->naddrs = 1;
(*addrs)->addrs = gpr_malloc(sizeof(grpc_resolved_address));

@ -341,10 +341,10 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
GPR_TIMER_END("secure_endpoint.endpoint_write", 0);
}
static void endpoint_shutdown(grpc_exec_ctx *exec_ctx,
grpc_endpoint *secure_ep) {
static void endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
grpc_error *why) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep);
grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep, why);
}
static void endpoint_destroy(grpc_exec_ctx *exec_ctx,

@ -130,7 +130,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(error));
// Not shutting down, so the write failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(exec_ctx, h);
@ -347,15 +347,17 @@ static void security_handshaker_destroy(grpc_exec_ctx *exec_ctx,
}
static void security_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
grpc_handshaker *handshaker) {
grpc_handshaker *handshaker,
grpc_error *why) {
security_handshaker *h = (security_handshaker *)handshaker;
gpr_mu_lock(&h->mu);
if (!h->shutdown) {
h->shutdown = true;
grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(why));
cleanup_args_for_failure_locked(exec_ctx, h);
}
gpr_mu_unlock(&h->mu);
GRPC_ERROR_UNREF(why);
}
static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
@ -417,7 +419,10 @@ static void fail_handshaker_destroy(grpc_exec_ctx *exec_ctx,
}
static void fail_handshaker_shutdown(grpc_exec_ctx *exec_ctx,
grpc_handshaker *handshaker) {}
grpc_handshaker *handshaker,
grpc_error *why) {
GRPC_ERROR_UNREF(why);
}
static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_handshaker *handshaker,

@ -258,16 +258,22 @@ grpc_error *grpc_metadata_batch_substitute(grpc_exec_ctx *exec_ctx,
grpc_metadata_batch *batch,
grpc_linked_mdelem *storage,
grpc_mdelem new) {
assert_valid_callouts(exec_ctx, batch);
grpc_error *error = GRPC_ERROR_NONE;
grpc_mdelem old = storage->md;
if (!grpc_slice_eq(GRPC_MDKEY(new), GRPC_MDKEY(old))) {
maybe_unlink_callout(batch, storage);
storage->md = new;
error = maybe_link_callout(batch, storage);
if (error != GRPC_ERROR_NONE) {
unlink_storage(&batch->list, storage);
GRPC_MDELEM_UNREF(exec_ctx, storage->md);
}
} else {
storage->md = new;
}
GRPC_MDELEM_UNREF(exec_ctx, old);
assert_valid_callouts(exec_ctx, batch);
return error;
}

@ -42,11 +42,13 @@ grpc_proto_library(
name = "control_proto",
srcs = ["control.proto"],
deps = ["payloads_proto", "stats_proto"],
has_services = False,
)
grpc_proto_library(
name = "echo_messages_proto",
srcs = ["echo_messages.proto"],
has_services = False,
)
grpc_proto_library(
@ -58,11 +60,13 @@ grpc_proto_library(
grpc_proto_library(
name = "empty_proto",
srcs = ["empty.proto"],
has_services = False,
)
grpc_proto_library(
name = "messages_proto",
srcs = ["messages.proto"],
has_services = False,
)
grpc_proto_library(
@ -73,6 +77,7 @@ grpc_proto_library(
grpc_proto_library(
name = "payloads_proto",
srcs = ["payloads.proto"],
has_services = False,
)
grpc_proto_library(
@ -84,6 +89,7 @@ grpc_proto_library(
grpc_proto_library(
name = "stats_proto",
srcs = ["stats.proto"],
has_services = False,
)
grpc_proto_library(

@ -163,7 +163,8 @@ void grpc_run_bad_client_test(
gpr_event_wait(&a.done_write, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)));
if (flags & GRPC_BAD_CLIENT_DISCONNECT) {
grpc_endpoint_shutdown(&exec_ctx, sfd.client);
grpc_endpoint_shutdown(&exec_ctx, sfd.client,
GRPC_ERROR_CREATE("Forced Disconnect"));
grpc_endpoint_destroy(&exec_ctx, sfd.client);
grpc_exec_ctx_finish(&exec_ctx);
sfd.client = NULL;
@ -189,7 +190,8 @@ void grpc_run_bad_client_test(
grpc_slice_buffer_destroy_internal(&exec_ctx, &args.incoming);
}
// Shutdown.
grpc_endpoint_shutdown(&exec_ctx, sfd.client);
grpc_endpoint_shutdown(&exec_ctx, sfd.client,
GRPC_ERROR_CREATE("Test Shutdown"));
grpc_endpoint_destroy(&exec_ctx, sfd.client);
grpc_exec_ctx_finish(&exec_ctx);
}

@ -81,7 +81,9 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
state.incoming_buffer.length, strlen(magic_connect_string));
if (state.incoming_buffer.length > strlen(magic_connect_string)) {
gpr_atm_rel_store(&state.done_atm, 1);
grpc_endpoint_shutdown(exec_ctx, state.tcp);
grpc_endpoint_shutdown(
exec_ctx, state.tcp,
GRPC_ERROR_CREATE("Incoming buffer longer than magic_connect_string"));
grpc_endpoint_destroy(exec_ctx, state.tcp);
} else {
grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer,

@ -298,7 +298,8 @@ static void run_test(const char *response_payload,
gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
/* clean up */
grpc_endpoint_shutdown(&exec_ctx, state.tcp);
grpc_endpoint_shutdown(&exec_ctx, state.tcp,
GRPC_ERROR_CREATE("Test Shutdown"));
grpc_endpoint_destroy(&exec_ctx, state.tcp);
cleanup_rpc(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);

@ -133,9 +133,12 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
const char* msg = grpc_error_string(error);
gpr_log(GPR_INFO, "%s: %s", prefix, msg);
grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint);
if (conn->server_endpoint != NULL)
grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint);
grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint,
GRPC_ERROR_REF(error));
if (conn->server_endpoint != NULL) {
grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint,
GRPC_ERROR_REF(error));
}
proxy_connection_unref(exec_ctx, conn);
}

@ -604,12 +604,12 @@ static call_state *maybe_delete_call_state(call_state *call) {
grpc_slice_unref(call->recv_status_details);
grpc_call_details_destroy(&call->call_details);
for (size_t i = 0; i < call->num_to_free; i++) {
gpr_free(call->to_free[i]);
}
for (size_t i = 0; i < call->num_slices_to_unref; i++) {
grpc_slice_unref(call->slices_to_unref[i]);
}
for (size_t i = 0; i < call->num_to_free; i++) {
gpr_free(call->to_free[i]);
}
gpr_free(call->to_free);
gpr_free(call->slices_to_unref);
@ -627,7 +627,7 @@ static void add_to_free(call_state *call, void *p) {
call->to_free[call->num_to_free++] = p;
}
static grpc_slice *add_to_slice_unref(call_state *call, grpc_slice s) {
static grpc_slice *add_slice_to_unref(call_state *call, grpc_slice s) {
if (call->num_slices_to_unref == call->cap_slices_to_unref) {
call->cap_slices_to_unref = GPR_MAX(8, 2 * call->cap_slices_to_unref);
call->slices_to_unref =
@ -648,8 +648,8 @@ static void read_metadata(input_stream *inp, size_t *count,
(*metadata)[i].key = read_string_like_slice(inp);
(*metadata)[i].value = read_buffer_like_slice(inp);
(*metadata)[i].flags = read_uint32(inp);
add_to_slice_unref(cs, (*metadata)[i].key);
add_to_slice_unref(cs, (*metadata)[i].value);
add_slice_to_unref(cs, (*metadata)[i].key);
add_slice_to_unref(cs, (*metadata)[i].value);
}
} else {
*metadata = gpr_malloc(1);
@ -1008,7 +1008,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
g_active_call);
op->data.send_status_from_server.status = next_byte(&inp);
op->data.send_status_from_server.status_details =
add_to_slice_unref(g_active_call,
add_slice_to_unref(g_active_call,
read_buffer_like_slice(&inp));
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@ -1056,22 +1056,6 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
grpc_byte_buffer_destroy(g_active_call->send_message);
g_active_call->send_message = NULL;
}
for (i = 0; i < num_ops; i++) {
op = &ops[i];
switch (op->op) {
case GRPC_OP_SEND_STATUS_FROM_SERVER:
gpr_free((void *)op->data.send_status_from_server.status_details);
break;
case GRPC_OP_SEND_MESSAGE:
case GRPC_OP_SEND_INITIAL_METADATA:
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
case GRPC_OP_RECV_INITIAL_METADATA:
case GRPC_OP_RECV_MESSAGE:
case GRPC_OP_RECV_STATUS_ON_CLIENT:
case GRPC_OP_RECV_CLOSE_ON_SERVER:
break;
}
}
gpr_free(ops);
break;

@ -92,7 +92,7 @@ static void test_code(void) {
grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL);
grpc_endpoint_get_peer(&endpoint);
grpc_endpoint_write(&exec_ctx, &endpoint, NULL, NULL);
grpc_endpoint_shutdown(&exec_ctx, &endpoint);
grpc_endpoint_shutdown(&exec_ctx, &endpoint, GRPC_ERROR_CANCELLED);
grpc_endpoint_destroy(&exec_ctx, &endpoint);
grpc_endpoint_add_to_pollset(&exec_ctx, &endpoint, NULL);
grpc_endpoint_add_to_pollset_set(&exec_ctx, &endpoint, NULL);

@ -233,9 +233,11 @@ static void read_and_write_test(grpc_endpoint_test_config config,
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
grpc_endpoint_shutdown(&exec_ctx, state.read_ep);
grpc_endpoint_shutdown(&exec_ctx, state.read_ep,
GRPC_ERROR_CREATE("Test Shutdown"));
gpr_log(GPR_DEBUG, "shutdown write");
grpc_endpoint_shutdown(&exec_ctx, state.write_ep);
grpc_endpoint_shutdown(&exec_ctx, state.write_ep,
GRPC_ERROR_CREATE("Test Shutdown"));
}
grpc_exec_ctx_flush(&exec_ctx);
@ -296,7 +298,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_closure_create(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
wait_for_fail_count(&exec_ctx, &fail_count, 0);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
GRPC_ERROR_CREATE("Test Shutdown"));
wait_for_fail_count(&exec_ctx, &fail_count, 1);
grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer,
grpc_closure_create(inc_on_failure, &fail_count,
@ -307,7 +310,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_closure_create(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx));
wait_for_fail_count(&exec_ctx, &fail_count, 3);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
GRPC_ERROR_CREATE("Test Shutdown"));
wait_for_fail_count(&exec_ctx, &fail_count, 3);
grpc_slice_buffer_destroy_internal(&exec_ctx, &slice_buffer);

@ -89,7 +89,8 @@ static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *tfds,
int i;
for (i = 0; i < num_fds; i++) {
grpc_fd_shutdown(exec_ctx, tfds[i].fd);
grpc_fd_shutdown(exec_ctx, tfds[i].fd,
GRPC_ERROR_CREATE("test_fd_cleanup"));
grpc_exec_ctx_flush(exec_ctx);
grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup");

@ -132,7 +132,8 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a");
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(exec_ctx, sv->em_fd);
grpc_fd_shutdown(exec_ctx, sv->em_fd,
GRPC_ERROR_CREATE("session_shutdown_cb"));
}
/* Called when data become readable in a session. */

@ -0,0 +1,188 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/lib/iomgr/resolve_address.h"
#include <string.h>
#include <sys/un.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "test/core/util/test_config.h"
static gpr_timespec test_deadline(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(100);
}
typedef struct args_struct {
gpr_event ev;
grpc_resolved_addresses *addrs;
gpr_atm done_atm;
gpr_mu *mu;
grpc_pollset *pollset;
grpc_pollset_set *pollset_set;
} args_struct;
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
void args_init(grpc_exec_ctx *exec_ctx, args_struct *args) {
gpr_event_init(&args->ev);
args->pollset = gpr_malloc(grpc_pollset_size());
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(exec_ctx, args->pollset_set, args->pollset);
args->addrs = NULL;
}
void args_finish(grpc_exec_ctx *exec_ctx, args_struct *args) {
GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline()));
grpc_resolved_addresses_destroy(args->addrs);
grpc_pollset_set_del_pollset(exec_ctx, args->pollset_set, args->pollset);
grpc_pollset_set_destroy(args->pollset_set);
grpc_closure do_nothing_cb;
grpc_closure_init(&do_nothing_cb, do_nothing, NULL,
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(exec_ctx, args->pollset, &do_nothing_cb);
// exec_ctx needs to be flushed before calling grpc_pollset_destroy()
grpc_exec_ctx_flush(exec_ctx);
grpc_pollset_destroy(args->pollset);
gpr_free(args->pollset);
}
static gpr_timespec n_sec_deadline(int seconds) {
return gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(seconds, GPR_TIMESPAN));
}
static void actually_poll(void *argsp) {
args_struct *args = argsp;
gpr_timespec deadline = n_sec_deadline(10);
while (true) {
bool done = gpr_atm_acq_load(&args->done_atm) != 0;
if (done) {
break;
}
gpr_timespec time_left =
gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME));
gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64 ".%09d", done,
time_left.tv_sec, time_left.tv_nsec);
GPR_ASSERT(gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) >= 0);
grpc_pollset_worker *worker = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(&exec_ctx, args->pollset, &worker,
gpr_now(GPR_CLOCK_REALTIME), n_sec_deadline(1)));
gpr_mu_unlock(args->mu);
grpc_exec_ctx_finish(&exec_ctx);
}
gpr_event_set(&args->ev, (void *)1);
}
static void poll_pollset_until_request_done(args_struct *args) {
gpr_atm_rel_store(&args->done_atm, 0);
gpr_thd_id id;
gpr_thd_new(&id, actually_poll, args, NULL);
}
static void must_succeed(grpc_exec_ctx *exec_ctx, void *argsp,
grpc_error *err) {
args_struct *args = argsp;
GPR_ASSERT(err == GRPC_ERROR_NONE);
GPR_ASSERT(args->addrs != NULL);
GPR_ASSERT(args->addrs->naddrs > 0);
gpr_atm_rel_store(&args->done_atm, 1);
}
static void must_fail(grpc_exec_ctx *exec_ctx, void *argsp, grpc_error *err) {
args_struct *args = argsp;
GPR_ASSERT(err != GRPC_ERROR_NONE);
gpr_atm_rel_store(&args->done_atm, 1);
}
static void test_unix_socket(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
args_init(&exec_ctx, &args);
poll_pollset_until_request_done(&args);
grpc_resolve_address(
&exec_ctx, "unix:/path/name", NULL, args.pollset_set,
grpc_closure_create(must_succeed, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
args_finish(&exec_ctx, &args);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_unix_socket_path_name_too_long(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
args_struct args;
args_init(&exec_ctx, &args);
const char prefix[] = "unix:/path/name";
size_t path_name_length =
GPR_ARRAY_SIZE(((struct sockaddr_un *)0)->sun_path) + 6;
char *path_name = gpr_malloc(sizeof(char) * path_name_length);
memset(path_name, 'a', path_name_length);
memcpy(path_name, prefix, strlen(prefix) - 1);
path_name[path_name_length - 1] = '\0';
poll_pollset_until_request_done(&args);
grpc_resolve_address(
&exec_ctx, path_name, NULL, args.pollset_set,
grpc_closure_create(must_fail, &args, grpc_schedule_on_exec_ctx),
&args.addrs);
gpr_free(path_name);
args_finish(&exec_ctx, &args);
grpc_exec_ctx_finish(&exec_ctx);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_executor_init();
grpc_iomgr_init();
test_unix_socket();
test_unix_socket_path_name_too_long();
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_executor_shutdown(&exec_ctx);
grpc_iomgr_shutdown(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);
}
return 0;
}

@ -72,7 +72,8 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_ASSERT(g_connecting != NULL);
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_endpoint_shutdown(exec_ctx, g_connecting);
grpc_endpoint_shutdown(exec_ctx, g_connecting,
GRPC_ERROR_CREATE("must_succeed called"));
grpc_endpoint_destroy(exec_ctx, g_connecting);
g_connecting = NULL;
finish_connection();

@ -121,7 +121,7 @@ static void server_weak_ref_set(server_weak_ref *weak_ref,
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_pollset *pollset,
grpc_tcp_server_acceptor *acceptor) {
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(exec_ctx, tcp);
on_connect_result temp_result;

@ -166,8 +166,10 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
GPR_ASSERT(incoming.count == 1);
GPR_ASSERT(grpc_slice_eq(s, incoming.slices[0]));
grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
grpc_endpoint_shutdown(&exec_ctx, f.server_ep);
grpc_endpoint_shutdown(&exec_ctx, f.client_ep,
GRPC_ERROR_CREATE("test_leftover end"));
grpc_endpoint_shutdown(&exec_ctx, f.server_ep,
GRPC_ERROR_CREATE("test_leftover end"));
grpc_endpoint_destroy(&exec_ctx, f.client_ep);
grpc_endpoint_destroy(&exec_ctx, f.server_ep);
grpc_exec_ctx_finish(&exec_ctx);

@ -121,7 +121,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
// server will wait for more data. Explicitly fail the server by shutting down
// the endpoint.
if (!state.done_callback_called) {
grpc_endpoint_shutdown(&exec_ctx, mock_endpoint);
grpc_endpoint_shutdown(&exec_ctx, mock_endpoint,
GRPC_ERROR_CREATE("Explicit close"));
grpc_exec_ctx_flush(&exec_ctx);
}

@ -107,7 +107,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp,
grpc_tcp_server_acceptor *acceptor) {
gpr_free(acceptor);
struct server_thread_args *args = (struct server_thread_args *)vargs;
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(exec_ctx, tcp);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL));
}

@ -78,16 +78,18 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
gpr_mu_lock(&m->mu);
if (m->on_read) {
grpc_closure_sched(exec_ctx, m->on_read,
GRPC_ERROR_CREATE("Endpoint Shutdown"));
grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE_REFERENCING(
"Endpoint Shutdown", &why, 1));
m->on_read = NULL;
}
gpr_mu_unlock(&m->mu);
grpc_resource_user_shutdown(exec_ctx, m->resource_user);
GRPC_ERROR_UNREF(why);
}
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {

@ -114,21 +114,25 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
half *m = (half *)ep;
gpr_mu_lock(&m->parent->mu);
m->parent->shutdown = true;
if (m->on_read) {
grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"));
grpc_closure_sched(exec_ctx, m->on_read,
GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1));
m->on_read = NULL;
}
m = other_half(m);
if (m->on_read) {
grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"));
grpc_closure_sched(exec_ctx, m->on_read,
GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1));
m->on_read = NULL;
}
gpr_mu_unlock(&m->parent->mu);
grpc_resource_user_shutdown(exec_ctx, m->resource_user);
GRPC_ERROR_UNREF(why);
}
static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {

@ -80,7 +80,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
timestamp_list *new_tail;
peer = grpc_endpoint_get_peer(tcp);
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(exec_ctx, tcp);
if (peer) {
last_colon = strrchr(peer, ':');

@ -1710,6 +1710,23 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c",
"name": "resolve_address_posix_test",
"src": [
"test/core/iomgr/resolve_address_posix_test.c"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",

File diff suppressed because it is too large Load Diff

@ -48,25 +48,6 @@ HISTOGRAM_PARAMS = {
'max_possible': 60e9,
}
EMPTY_GENERIC_PAYLOAD = {
'bytebuf_params': {
'req_size': 0,
'resp_size': 0,
}
}
EMPTY_PROTO_PAYLOAD = {
'simple_params': {
'req_size': 0,
'resp_size': 0,
}
}
BIG_GENERIC_PAYLOAD = {
'bytebuf_params': {
'req_size': 65536,
'resp_size': 65536,
}
}
# target number of RPCs outstanding on across all client channels in
# non-ping-pong tests (since we can only specify per-channel numbers, the
# actual target will be slightly higher)
@ -102,10 +83,24 @@ def geometric_progression(start, stop, step):
n *= step
def _payload_type(use_generic_payload, req_size, resp_size):
r = {}
sizes = {
'req_size': req_size,
'resp_size': resp_size,
}
if use_generic_payload:
r['bytebuf_params'] = sizes
else:
r['simple_params'] = sizes
def _ping_pong_scenario(name, rpc_type,
client_type, server_type,
secure=True,
use_generic_payload=False,
req_size=0,
resp_size=0,
unconstrained_client=None,
client_language=None,
server_language=None,
@ -145,14 +140,15 @@ def _ping_pong_scenario(name, rpc_type,
if use_generic_payload:
if server_type != 'ASYNC_GENERIC_SERVER':
raise Exception('Use ASYNC_GENERIC_SERVER for generic payload.')
scenario['client_config']['payload_config'] = EMPTY_GENERIC_PAYLOAD
scenario['server_config']['payload_config'] = EMPTY_GENERIC_PAYLOAD
else:
# For proto payload, only the client should get the config.
scenario['client_config']['payload_config'] = EMPTY_PROTO_PAYLOAD
scenario['client_config']['payload_config'] = _payload_type(use_generic_payload, req_size, resp_size)
scenario['server_config']['payload_config'] = _payload_type(use_generic_payload, req_size, resp_size)
if unconstrained_client:
outstanding_calls = outstanding if outstanding is not None else OUTSTANDING_REQUESTS[unconstrained_client]
# clamp buffer usage to something reasonable (16 gig for now)
MAX_MEMORY_USE = 16 * 1024 * 1024 * 1024
if outstanding_calls * max(req_size, resp_size) > MAX_MEMORY_USE:
outstanding_calls = max(1, MAX_MEMORY_USE / max(req_size, resp_size))
wide = channels if channels is not None else WIDE
deep = int(math.ceil(1.0 * outstanding_calls / wide))
@ -249,6 +245,18 @@ class CXXLanguage:
async_server_threads=1,
secure=secure)
for size in geometric_progression(1, 1024*1024*1024+1, 8):
yield _ping_pong_scenario(
'cpp_protobuf_%s_%s_qps_unconstrained_%s_%db' % (synchronicity, rpc_type, secstr, size),
rpc_type=rpc_type.upper(),
req_size=size,
resp_size=size,
client_type='%s_CLIENT' % synchronicity.upper(),
server_type='%s_SERVER' % synchronicity.upper(),
unconstrained_client=synchronicity,
secure=secure,
categories=[SWEEP])
yield _ping_pong_scenario(
'cpp_protobuf_%s_%s_qps_unconstrained_%s' % (synchronicity, rpc_type, secstr),
rpc_type=rpc_type.upper(),

Loading…
Cancel
Save