From 0f2a0f5fc9b9e9b9c98d227d16575d106f1e8d43 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 14 Sep 2022 14:44:04 -0700 Subject: [PATCH] Revert "[chttp2] fix stream leak with queued flow control update and absence of writes (#30907)" (#30991) This reverts commit d8f98fb1a76d88fa65e3648a0e38b5872e1df07c. --- CMakeLists.txt | 36 -- build_autogenerated.yaml | 9 - .../chttp2/transport/chttp2_transport.cc | 2 +- test/core/transport/chttp2/BUILD | 14 - ...ak_with_queued_flow_control_update_test.cc | 307 ------------------ tools/run_tests/generated/tests.json | 24 -- 6 files changed, 1 insertion(+), 391 deletions(-) delete mode 100644 test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index b3692f91988..f25c7c99daa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1166,7 +1166,6 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx stranded_event_test) endif() - add_dependencies(buildtests_cxx stream_leak_with_queued_flow_control_update_test) add_dependencies(buildtests_cxx stream_map_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx streaming_throughput_test) @@ -17253,41 +17252,6 @@ endif() endif() if(gRPC_BUILD_TESTS) -add_executable(stream_leak_with_queued_flow_control_update_test - test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc - third_party/googletest/googletest/src/gtest-all.cc - third_party/googletest/googlemock/src/gmock-all.cc -) - -target_include_directories(stream_leak_with_queued_flow_control_update_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - third_party/googletest/googletest/include - third_party/googletest/googletest - third_party/googletest/googlemock/include - third_party/googletest/googlemock - ${_gRPC_PROTO_GENS_DIR} -) - -target_link_libraries(stream_leak_with_queued_flow_control_update_test - ${_gRPC_PROTOBUF_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util -) - - -endif() -if(gRPC_BUILD_TESTS) - add_executable(stream_map_test test/core/transport/chttp2/stream_map_test.cc test/core/util/cmdline.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 22ac578aed0..4905b65bb7f 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -9493,15 +9493,6 @@ targets: - linux - posix - mac -- name: stream_leak_with_queued_flow_control_update_test - gtest: true - build: test - language: c++ - headers: [] - src: - - test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc - deps: - - grpc_test_util - name: stream_map_test gtest: true build: test diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 980bbea2c4b..c4783d9b4d2 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -2314,7 +2314,7 @@ void grpc_chttp2_act_on_flowctl_action( grpc_chttp2_transport* t, grpc_chttp2_stream* s) { WithUrgency(t, action.send_stream_update(), GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, [t, s]() { - if (s->id != 0 && !s->read_closed) { + if (s->id != 0) { grpc_chttp2_mark_stream_writable(t, s); } }); diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index f7245144d9c..0ddcee81dd6 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -313,17 +313,3 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) - -grpc_cc_test( - name = "stream_leak_with_queued_flow_control_update_test", - srcs = ["stream_leak_with_queued_flow_control_update_test.cc"], - external_deps = [ - "gtest", - ], - language = "C++", - deps = [ - "//:gpr", - "//:grpc", - "//test/core/util:grpc_test_util", - ], -) diff --git a/test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc b/test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc deleted file mode 100644 index db7fed0c9eb..00000000000 --- a/test/core/transport/chttp2/stream_leak_with_queued_flow_control_update_test.cc +++ /dev/null @@ -1,307 +0,0 @@ -// Copyright 2022 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include - -#include - -#include "absl/strings/str_cat.h" -#include "gtest/gtest.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gprpp/host_port.h" -#include "src/core/lib/iomgr/iomgr.h" -#include "test/core/util/port.h" -#include "test/core/util/test_config.h" - -namespace { - -class TestServer { - public: - explicit TestServer(grpc_completion_queue* cq, - grpc_channel_args* channel_args) - : cq_(cq) { - server_ = grpc_server_create(channel_args, nullptr); - address_ = grpc_core::JoinHostPort("[::1]", grpc_pick_unused_port_or_die()); - grpc_server_register_completion_queue(server_, cq_, nullptr); - grpc_server_credentials* server_creds = - grpc_insecure_server_credentials_create(); - GPR_ASSERT( - grpc_server_add_http2_port(server_, address_.c_str(), server_creds)); - grpc_server_credentials_release(server_creds); - grpc_server_start(server_); - } - - ~TestServer() { - grpc_server_shutdown_and_notify(server_, cq_, this /* tag */); - grpc_event event = grpc_completion_queue_next( - cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - GPR_ASSERT(event.type == GRPC_OP_COMPLETE); - GPR_ASSERT(event.success); - GPR_ASSERT(event.tag == this); - grpc_server_destroy(server_); - } - - void HandleRpc() { - grpc_call_details call_details; - grpc_call_details_init(&call_details); - grpc_metadata_array request_metadata_recv; - grpc_metadata_array_init(&request_metadata_recv); - grpc_slice status_details = grpc_slice_from_static_string("xyz"); - int was_cancelled; - // request a call - void* tag = this; - grpc_call* call; - grpc_call_error error = grpc_server_request_call( - server_, &call, &call_details, &request_metadata_recv, cq_, cq_, tag); - GPR_ASSERT(error == GRPC_CALL_OK); - grpc_event event = grpc_completion_queue_next( - cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - GPR_ASSERT(event.type == GRPC_OP_COMPLETE); - grpc_call_details_destroy(&call_details); - grpc_metadata_array_destroy(&request_metadata_recv); - GPR_ASSERT(event.success); - GPR_ASSERT(event.tag == tag); - // Send a response with a 1-byte payload. The 1-byte length is important - // because it's enough to get the client to *queue* a flow control update, - // but not long enough to get the client to initiate a write on that update. - grpc_slice response_payload_slice = grpc_slice_from_static_string("a"); - grpc_byte_buffer* response_payload = - grpc_raw_byte_buffer_create(&response_payload_slice, 1); - grpc_op ops[4]; - grpc_op* op; - memset(ops, 0, sizeof(ops)); - op = ops; - op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; - op->data.recv_close_on_server.cancelled = &was_cancelled; - op++; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op++; - op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = response_payload; - op++; - op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; - op->data.send_status_from_server.status = GRPC_STATUS_OK; - op->data.send_status_from_server.status_details = &status_details; - op++; - error = grpc_call_start_batch(call, ops, static_cast(op - ops), tag, - nullptr); - GPR_ASSERT(error == GRPC_CALL_OK); - event = grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME), - nullptr); - GPR_ASSERT(event.type == GRPC_OP_COMPLETE); - GPR_ASSERT(event.success); - GPR_ASSERT(event.tag == tag); - grpc_byte_buffer_destroy(response_payload); - grpc_call_unref(call); - } - - std::string address() const { return address_; } - - private: - grpc_server* server_; - grpc_completion_queue* cq_; - std::string address_; -}; - -void StartCallAndCloseWrites(grpc_call* call, grpc_completion_queue* cq) { - grpc_op ops[2]; - grpc_op* op; - memset(ops, 0, sizeof(ops)); - op = ops; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op++; - op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - op++; - void* tag = call; - grpc_call_error error = grpc_call_start_batch( - call, ops, static_cast(op - ops), tag, nullptr); - GPR_ASSERT(GRPC_CALL_OK == error); - grpc_event event = grpc_completion_queue_next( - cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - GPR_ASSERT(event.type == GRPC_OP_COMPLETE); - GPR_ASSERT(event.success); - GPR_ASSERT(event.tag == tag); -} - -void FinishCall(grpc_call* call, grpc_completion_queue* cq) { - grpc_metadata_array initial_metadata_recv; - grpc_metadata_array_init(&initial_metadata_recv); - grpc_metadata_array trailing_metadata_recv; - grpc_metadata_array_init(&trailing_metadata_recv); - grpc_status_code status = GRPC_STATUS_UNKNOWN; - grpc_slice details; - grpc_byte_buffer* recv_payload = nullptr; - void* tag = call; - // Note: we're only doing read ops here. The goal here is to finish the call - // with a queued stream flow control update, due to receipt of a small - // message. We won't do anything to explicitly initiate writes on the - // transport, which could accidentally flush out that queued update. - grpc_op ops[3]; - grpc_op* op; - memset(ops, 0, sizeof(ops)); - op = ops; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; - op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &recv_payload; - op++; - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; - op->data.recv_status_on_client.status = &status; - op->data.recv_status_on_client.status_details = &details; - op++; - grpc_call_error error = grpc_call_start_batch( - call, ops, static_cast(op - ops), tag, nullptr); - GPR_ASSERT(GRPC_CALL_OK == error); - grpc_event event = grpc_completion_queue_next( - cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - GPR_ASSERT(event.type == GRPC_OP_COMPLETE); - GPR_ASSERT(event.success); - GPR_ASSERT(event.tag == tag); - EXPECT_EQ(status, GRPC_STATUS_OK); - grpc_byte_buffer_destroy(recv_payload); - grpc_metadata_array_destroy(&initial_metadata_recv); - grpc_metadata_array_destroy(&trailing_metadata_recv); - grpc_slice_unref(details); -} - -void EnsureConnectionsArentLeaked(grpc_completion_queue* cq) { - gpr_log( - GPR_INFO, - "The channel has been destroyed, wait for it to shut down and close..."); - // Do a quick initial poll to try to exit the test early if things have - // already cleaned up. - GPR_ASSERT(grpc_completion_queue_next( - cq, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(1, GPR_TIMESPAN)), - nullptr) - .type == GRPC_QUEUE_TIMEOUT); - gpr_timespec overall_deadline = grpc_timeout_seconds_to_deadline(120); - for (;;) { - // TODO(apolcyn): grpc_iomgr_count_objects_for_testing() is an internal - // and unstable API. Consider a different method of detecting leaks if - // it becomes no longer useable. For example, perhaps use - // TestOnlySetGlobalHttp2TransportDestructCallback to check whether - // transports are still around. Note: the main goal of this test is to - // try to repro a chttp2 stream leak, which also holds on to transports - // and iomgr objects, so anything that can detect leaks of transports or - // sockets should suffice. - size_t active_fds = grpc_iomgr_count_objects_for_testing(); - // We should arrive at exactly one iomgr object for the server's listener - // socket, because we haven't destroyed the server yet. This somewhat - // relies on iomgr implementation details; see above TODO if that becomes - // problematic. - if (active_fds == 1) return; - if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) > 0) { - gpr_log(GPR_INFO, - "grpc_iomgr_count_objects_for_testing() never returned 1 (only " - "the server listen socket should remain). " - "It's likely this test has triggered a connection leak."); - GPR_ASSERT(0); - } - gpr_log(GPR_INFO, - "grpc_iomgr_count_objects_for_testing() returned %ld, keep waiting " - "until it reaches 1 (only the server listen socket should remain)", - active_fds); - GPR_ASSERT(grpc_completion_queue_next( - cq, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(1, GPR_TIMESPAN)), - nullptr) - .type == GRPC_QUEUE_TIMEOUT); - } -} - -TEST( - Chttp2, - TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus) { - grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); - { - // Prevent pings from client to server and server to client, since they can - // cause chttp2 to initiate writes and thus dodge the bug we're trying to - // repro. - auto channel_args = - grpc_core::ChannelArgs().Set(GRPC_ARG_HTTP2_BDP_PROBE, 0); - TestServer server(cq, - const_cast(channel_args.ToC().get())); - grpc_channel_credentials* creds = grpc_insecure_credentials_create(); - grpc_channel* channel = - grpc_channel_create(absl::StrCat("ipv6:", server.address()).c_str(), - creds, channel_args.ToC().get()); - grpc_channel_credentials_release(creds); - grpc_call* call = - grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, - grpc_slice_from_static_string("/foo"), nullptr, - gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - // Start the call. It's important for our repro to close writes before - // reading the response, so that the client transport marks the stream - // both read and write closed as soon as it reads a status off the wire. - StartCallAndCloseWrites(call, cq); - // Send a small message from server to client. The message needs to be small - // enough such that the client will queue a stream flow control update, - // without flushing it out to the wire. - server.HandleRpc(); - // Do some polling to let the client to pick up the message and status off - // the wire, *before* it begins the RECV_MESSAGE and RECV_STATUS ops. - // The timeout here just needs to be long enough that the client has - // most likely reads everything the server sent it by the time it's done. - GPR_ASSERT(grpc_completion_queue_next( - cq, grpc_timeout_milliseconds_to_deadline(20), nullptr) - .type == GRPC_QUEUE_TIMEOUT); - // Perform the receive message and status. Note that the incoming bytes - // should already be in the client's buffers by the time we start these ops. - // Thus, the client should *not* need to urgently send a flow control update - // to the server, to ensure progress, and it can simply queue the flow - // control update instead. - FinishCall(call, cq); - grpc_call_unref(call); - grpc_channel_destroy(channel); - // There should be nothing to prevent stream and transport objects from - // shutdown and destruction at this point. So check that this happens. - // The timeout is somewhat arbitrary, and is set long enough so that it's - // extremely unlikely to be hit due to CPU starvation. - EnsureConnectionsArentLeaked(cq); - } - grpc_completion_queue_shutdown(cq); - while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), - nullptr) - .type != GRPC_QUEUE_SHUTDOWN) { - } - grpc_completion_queue_destroy(cq); -} - -} // namespace - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - grpc::testing::TestEnvironment env(&argc, argv); - grpc_init(); - auto result = RUN_ALL_TESTS(); - grpc_shutdown(); - return result; -} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index f2f15bbd80e..0a0ac9971b8 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -6589,30 +6589,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": true, - "language": "c++", - "name": "stream_leak_with_queued_flow_control_update_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": true - }, { "args": [], "benchmark": false,