diff --git a/CMakeLists.txt b/CMakeLists.txt index 6512dd184bc..2f9b81c4a95 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1166,6 +1166,7 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx stranded_event_test) endif() + add_dependencies(buildtests_cxx stream_leak_with_queued_flow_control_update_test) add_dependencies(buildtests_cxx stream_map_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx streaming_throughput_test) @@ -17251,6 +17252,41 @@ endif() endif() if(gRPC_BUILD_TESTS) +add_executable(stream_leak_with_queued_flow_control_update_test + test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(stream_leak_with_queued_flow_control_update_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(stream_leak_with_queued_flow_control_update_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + +endif() +if(gRPC_BUILD_TESTS) + add_executable(stream_map_test test/core/transport/chttp2/stream_map_test.cc test/core/util/cmdline.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 85cd806f133..267f93992e0 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -9493,6 +9493,15 @@ targets: - linux - posix - mac +- name: stream_leak_with_queued_flow_control_update_test + gtest: true + build: test + language: c++ + headers: [] + src: + - test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc + deps: + - grpc_test_util - name: stream_map_test gtest: true build: test diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index c4783d9b4d2..980bbea2c4b 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -2314,7 +2314,7 @@ void grpc_chttp2_act_on_flowctl_action( grpc_chttp2_transport* t, grpc_chttp2_stream* s) { WithUrgency(t, action.send_stream_update(), GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, [t, s]() { - if (s->id != 0) { + if (s->id != 0 && !s->read_closed) { grpc_chttp2_mark_stream_writable(t, s); } }); diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index 8a9eadd18af..dfbc26faf82 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -78,7 +78,12 @@ static size_t count_objects(void) { return n; } -size_t grpc_iomgr_count_objects_for_testing(void) { return count_objects(); } +size_t grpc_iomgr_count_objects_for_testing(void) { + gpr_mu_lock(&g_mu); + size_t ret = count_objects(); + gpr_mu_unlock(&g_mu); + return ret; +} static void dump_objects(const char* kind) { grpc_iomgr_object* obj; diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index 0ddcee81dd6..f7245144d9c 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -313,3 +313,17 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) + +grpc_cc_test( + name = "stream_leak_with_queued_flow_control_update_test", + srcs = ["stream_leak_with_queued_flow_control_update_test.cc"], + external_deps = [ + "gtest", + ], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc b/test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc new file mode 100644 index 00000000000..db7fed0c9eb --- /dev/null +++ b/test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc @@ -0,0 +1,307 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include + +#include "absl/strings/str_cat.h" +#include "gtest/gtest.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/iomgr/iomgr.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +namespace { + +class TestServer { + public: + explicit TestServer(grpc_completion_queue* cq, + grpc_channel_args* channel_args) + : cq_(cq) { + server_ = grpc_server_create(channel_args, nullptr); + address_ = grpc_core::JoinHostPort("[::1]", grpc_pick_unused_port_or_die()); + grpc_server_register_completion_queue(server_, cq_, nullptr); + grpc_server_credentials* server_creds = + grpc_insecure_server_credentials_create(); + GPR_ASSERT( + grpc_server_add_http2_port(server_, address_.c_str(), server_creds)); + grpc_server_credentials_release(server_creds); + grpc_server_start(server_); + } + + ~TestServer() { + grpc_server_shutdown_and_notify(server_, cq_, this /* tag */); + grpc_event event = grpc_completion_queue_next( + cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + GPR_ASSERT(event.type == GRPC_OP_COMPLETE); + GPR_ASSERT(event.success); + GPR_ASSERT(event.tag == this); + grpc_server_destroy(server_); + } + + void HandleRpc() { + grpc_call_details call_details; + grpc_call_details_init(&call_details); + grpc_metadata_array request_metadata_recv; + grpc_metadata_array_init(&request_metadata_recv); + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + int was_cancelled; + // request a call + void* tag = this; + grpc_call* call; + grpc_call_error error = grpc_server_request_call( + server_, &call, &call_details, &request_metadata_recv, cq_, cq_, tag); + GPR_ASSERT(error == GRPC_CALL_OK); + grpc_event event = grpc_completion_queue_next( + cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + GPR_ASSERT(event.type == GRPC_OP_COMPLETE); + grpc_call_details_destroy(&call_details); + grpc_metadata_array_destroy(&request_metadata_recv); + GPR_ASSERT(event.success); + GPR_ASSERT(event.tag == tag); + // Send a response with a 1-byte payload. The 1-byte length is important + // because it's enough to get the client to *queue* a flow control update, + // but not long enough to get the client to initiate a write on that update. + grpc_slice response_payload_slice = grpc_slice_from_static_string("a"); + grpc_byte_buffer* response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + grpc_op ops[4]; + grpc_op* op; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = &status_details; + op++; + error = grpc_call_start_batch(call, ops, static_cast(op - ops), tag, + nullptr); + GPR_ASSERT(error == GRPC_CALL_OK); + event = grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr); + GPR_ASSERT(event.type == GRPC_OP_COMPLETE); + GPR_ASSERT(event.success); + GPR_ASSERT(event.tag == tag); + grpc_byte_buffer_destroy(response_payload); + grpc_call_unref(call); + } + + std::string address() const { return address_; } + + private: + grpc_server* server_; + grpc_completion_queue* cq_; + std::string address_; +}; + +void StartCallAndCloseWrites(grpc_call* call, grpc_completion_queue* cq) { + grpc_op ops[2]; + grpc_op* op; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + void* tag = call; + grpc_call_error error = grpc_call_start_batch( + call, ops, static_cast(op - ops), tag, nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + grpc_event event = grpc_completion_queue_next( + cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + GPR_ASSERT(event.type == GRPC_OP_COMPLETE); + GPR_ASSERT(event.success); + GPR_ASSERT(event.tag == tag); +} + +void FinishCall(grpc_call* call, grpc_completion_queue* cq) { + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_status_code status = GRPC_STATUS_UNKNOWN; + grpc_slice details; + grpc_byte_buffer* recv_payload = nullptr; + void* tag = call; + // Note: we're only doing read ops here. The goal here is to finish the call + // with a queued stream flow control update, due to receipt of a small + // message. We won't do anything to explicitly initiate writes on the + // transport, which could accidentally flush out that queued update. + grpc_op ops[3]; + grpc_op* op; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &recv_payload; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op++; + grpc_call_error error = grpc_call_start_batch( + call, ops, static_cast(op - ops), tag, nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + grpc_event event = grpc_completion_queue_next( + cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + GPR_ASSERT(event.type == GRPC_OP_COMPLETE); + GPR_ASSERT(event.success); + GPR_ASSERT(event.tag == tag); + EXPECT_EQ(status, GRPC_STATUS_OK); + grpc_byte_buffer_destroy(recv_payload); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_slice_unref(details); +} + +void EnsureConnectionsArentLeaked(grpc_completion_queue* cq) { + gpr_log( + GPR_INFO, + "The channel has been destroyed, wait for it to shut down and close..."); + // Do a quick initial poll to try to exit the test early if things have + // already cleaned up. + GPR_ASSERT(grpc_completion_queue_next( + cq, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(1, GPR_TIMESPAN)), + nullptr) + .type == GRPC_QUEUE_TIMEOUT); + gpr_timespec overall_deadline = grpc_timeout_seconds_to_deadline(120); + for (;;) { + // TODO(apolcyn): grpc_iomgr_count_objects_for_testing() is an internal + // and unstable API. Consider a different method of detecting leaks if + // it becomes no longer useable. For example, perhaps use + // TestOnlySetGlobalHttp2TransportDestructCallback to check whether + // transports are still around. Note: the main goal of this test is to + // try to repro a chttp2 stream leak, which also holds on to transports + // and iomgr objects, so anything that can detect leaks of transports or + // sockets should suffice. + size_t active_fds = grpc_iomgr_count_objects_for_testing(); + // We should arrive at exactly one iomgr object for the server's listener + // socket, because we haven't destroyed the server yet. This somewhat + // relies on iomgr implementation details; see above TODO if that becomes + // problematic. + if (active_fds == 1) return; + if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) > 0) { + gpr_log(GPR_INFO, + "grpc_iomgr_count_objects_for_testing() never returned 1 (only " + "the server listen socket should remain). " + "It's likely this test has triggered a connection leak."); + GPR_ASSERT(0); + } + gpr_log(GPR_INFO, + "grpc_iomgr_count_objects_for_testing() returned %ld, keep waiting " + "until it reaches 1 (only the server listen socket should remain)", + active_fds); + GPR_ASSERT(grpc_completion_queue_next( + cq, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(1, GPR_TIMESPAN)), + nullptr) + .type == GRPC_QUEUE_TIMEOUT); + } +} + +TEST( + Chttp2, + TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus) { + grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); + { + // Prevent pings from client to server and server to client, since they can + // cause chttp2 to initiate writes and thus dodge the bug we're trying to + // repro. + auto channel_args = + grpc_core::ChannelArgs().Set(GRPC_ARG_HTTP2_BDP_PROBE, 0); + TestServer server(cq, + const_cast(channel_args.ToC().get())); + grpc_channel_credentials* creds = grpc_insecure_credentials_create(); + grpc_channel* channel = + grpc_channel_create(absl::StrCat("ipv6:", server.address()).c_str(), + creds, channel_args.ToC().get()); + grpc_channel_credentials_release(creds); + grpc_call* call = + grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, + grpc_slice_from_static_string("/foo"), nullptr, + gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + // Start the call. It's important for our repro to close writes before + // reading the response, so that the client transport marks the stream + // both read and write closed as soon as it reads a status off the wire. + StartCallAndCloseWrites(call, cq); + // Send a small message from server to client. The message needs to be small + // enough such that the client will queue a stream flow control update, + // without flushing it out to the wire. + server.HandleRpc(); + // Do some polling to let the client to pick up the message and status off + // the wire, *before* it begins the RECV_MESSAGE and RECV_STATUS ops. + // The timeout here just needs to be long enough that the client has + // most likely reads everything the server sent it by the time it's done. + GPR_ASSERT(grpc_completion_queue_next( + cq, grpc_timeout_milliseconds_to_deadline(20), nullptr) + .type == GRPC_QUEUE_TIMEOUT); + // Perform the receive message and status. Note that the incoming bytes + // should already be in the client's buffers by the time we start these ops. + // Thus, the client should *not* need to urgently send a flow control update + // to the server, to ensure progress, and it can simply queue the flow + // control update instead. + FinishCall(call, cq); + grpc_call_unref(call); + grpc_channel_destroy(channel); + // There should be nothing to prevent stream and transport objects from + // shutdown and destruction at this point. So check that this happens. + // The timeout is somewhat arbitrary, and is set long enough so that it's + // extremely unlikely to be hit due to CPU starvation. + EnsureConnectionsArentLeaked(cq); + } + grpc_completion_queue_shutdown(cq); + while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr) + .type != GRPC_QUEUE_SHUTDOWN) { + } + grpc_completion_queue_destroy(cq); +} + +} // namespace + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + auto result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 57f30f9ed27..a6fe0a3f0b0 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -6589,6 +6589,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "stream_leak_with_queued_flow_control_update_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,