From 3d4433beb289e840f7623b9aba51efa1d12f6dcc Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 24 Feb 2022 07:01:58 -0800 Subject: [PATCH] retry: always free cached send op data (#28956) * Revert "Revert "retry: fix memory leak due to not freeing cached send ops upon cancellation (#28945)" (#28954)" This reverts commit 9498843a18e702622e3f9924b1d172061d86585e. * retry: always free cached send op data --- CMakeLists.txt | 2 + build_autogenerated.yaml | 2 + gRPC-Core.podspec | 1 + grpc.gyp | 2 + .../filters/client_channel/retry_filter.cc | 14 +- test/core/end2end/end2end_nosec_tests.cc | 8 + test/core/end2end/end2end_tests.cc | 8 + test/core/end2end/generate_tests.bzl | 6 + ...retry_cancel_after_first_attempt_starts.cc | 218 ++++++++++++++++++ 9 files changed, 256 insertions(+), 5 deletions(-) create mode 100644 test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 32c592c8805..9c00c0bb14c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1164,6 +1164,7 @@ add_library(end2end_nosec_tests test/core/end2end/tests/request_with_payload.cc test/core/end2end/tests/resource_quota_server.cc test/core/end2end/tests/retry.cc + test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc test/core/end2end/tests/retry_cancel_during_delay.cc test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc test/core/end2end/tests/retry_cancellation.cc @@ -1309,6 +1310,7 @@ add_library(end2end_tests test/core/end2end/tests/request_with_payload.cc test/core/end2end/tests/resource_quota_server.cc test/core/end2end/tests/retry.cc + test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc test/core/end2end/tests/retry_cancel_during_delay.cc test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc test/core/end2end/tests/retry_cancellation.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index cda30eeb1a1..f45fbc36e57 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -86,6 +86,7 @@ libs: - test/core/end2end/tests/request_with_payload.cc - test/core/end2end/tests/resource_quota_server.cc - test/core/end2end/tests/retry.cc + - test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc - test/core/end2end/tests/retry_cancel_during_delay.cc - test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc - test/core/end2end/tests/retry_cancellation.cc @@ -222,6 +223,7 @@ libs: - test/core/end2end/tests/request_with_payload.cc - test/core/end2end/tests/resource_quota_server.cc - test/core/end2end/tests/retry.cc + - test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc - test/core/end2end/tests/retry_cancel_during_delay.cc - test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc - test/core/end2end/tests/retry_cancellation.cc diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 9175c3e15f4..46d0b4a924b 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -2468,6 +2468,7 @@ Pod::Spec.new do |s| 'test/core/end2end/tests/request_with_payload.cc', 'test/core/end2end/tests/resource_quota_server.cc', 'test/core/end2end/tests/retry.cc', + 'test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc', 'test/core/end2end/tests/retry_cancel_during_delay.cc', 'test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc', 'test/core/end2end/tests/retry_cancellation.cc', diff --git a/grpc.gyp b/grpc.gyp index 5dcae0a9a93..3b471b1cbad 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -239,6 +239,7 @@ 'test/core/end2end/tests/request_with_payload.cc', 'test/core/end2end/tests/resource_quota_server.cc', 'test/core/end2end/tests/retry.cc', + 'test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc', 'test/core/end2end/tests/retry_cancel_during_delay.cc', 'test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc', 'test/core/end2end/tests/retry_cancellation.cc', @@ -352,6 +353,7 @@ 'test/core/end2end/tests/request_with_payload.cc', 'test/core/end2end/tests/resource_quota_server.cc', 'test/core/end2end/tests/retry.cc', + 'test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc', 'test/core/end2end/tests/retry_cancel_during_delay.cc', 'test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc', 'test/core/end2end/tests/retry_cancellation.cc', diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index d320a3a51af..309ded8ab6c 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -2096,6 +2096,7 @@ RetryFilter::CallData::CallData(RetryFilter* chand, retry_timer_pending_(false) {} RetryFilter::CallData::~CallData() { + FreeAllCachedSendOpData(); grpc_slice_unref_internal(path_); // Make sure there are no remaining pending batches. for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { @@ -2294,12 +2295,15 @@ void RetryFilter::CallData::FreeCachedSendInitialMetadata() { } void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: destroying send_messages[%" PRIuPTR "]", chand_, - this, idx); + if (send_messages_[idx] != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: destroying send_messages[%" PRIuPTR "]", + chand_, this, idx); + } + send_messages_[idx]->Destroy(); + send_messages_[idx] = nullptr; } - send_messages_[idx]->Destroy(); } void RetryFilter::CallData::FreeCachedSendTrailingMetadata() { diff --git a/test/core/end2end/end2end_nosec_tests.cc b/test/core/end2end/end2end_nosec_tests.cc index a953188c8dd..950edbe36d5 100644 --- a/test/core/end2end/end2end_nosec_tests.cc +++ b/test/core/end2end/end2end_nosec_tests.cc @@ -125,6 +125,8 @@ extern void resource_quota_server(grpc_end2end_test_config config); extern void resource_quota_server_pre_init(void); extern void retry(grpc_end2end_test_config config); extern void retry_pre_init(void); +extern void retry_cancel_after_first_attempt_starts(grpc_end2end_test_config config); +extern void retry_cancel_after_first_attempt_starts_pre_init(void); extern void retry_cancel_during_delay(grpc_end2end_test_config config); extern void retry_cancel_during_delay_pre_init(void); extern void retry_cancel_with_multiple_send_batches(grpc_end2end_test_config config); @@ -257,6 +259,7 @@ void grpc_end2end_tests_pre_init(void) { request_with_payload_pre_init(); resource_quota_server_pre_init(); retry_pre_init(); + retry_cancel_after_first_attempt_starts_pre_init(); retry_cancel_during_delay_pre_init(); retry_cancel_with_multiple_send_batches_pre_init(); retry_cancellation_pre_init(); @@ -355,6 +358,7 @@ void grpc_end2end_tests(int argc, char **argv, request_with_payload(config); resource_quota_server(config); retry(config); + retry_cancel_after_first_attempt_starts(config); retry_cancel_during_delay(config); retry_cancel_with_multiple_send_batches(config); retry_cancellation(config); @@ -591,6 +595,10 @@ void grpc_end2end_tests(int argc, char **argv, retry(config); continue; } + if (0 == strcmp("retry_cancel_after_first_attempt_starts", argv[i])) { + retry_cancel_after_first_attempt_starts(config); + continue; + } if (0 == strcmp("retry_cancel_during_delay", argv[i])) { retry_cancel_during_delay(config); continue; diff --git a/test/core/end2end/end2end_tests.cc b/test/core/end2end/end2end_tests.cc index a9b0e10dbc6..8c6267327b0 100644 --- a/test/core/end2end/end2end_tests.cc +++ b/test/core/end2end/end2end_tests.cc @@ -129,6 +129,8 @@ extern void resource_quota_server(grpc_end2end_test_config config); extern void resource_quota_server_pre_init(void); extern void retry(grpc_end2end_test_config config); extern void retry_pre_init(void); +extern void retry_cancel_after_first_attempt_starts(grpc_end2end_test_config config); +extern void retry_cancel_after_first_attempt_starts_pre_init(void); extern void retry_cancel_during_delay(grpc_end2end_test_config config); extern void retry_cancel_during_delay_pre_init(void); extern void retry_cancel_with_multiple_send_batches(grpc_end2end_test_config config); @@ -263,6 +265,7 @@ void grpc_end2end_tests_pre_init(void) { request_with_payload_pre_init(); resource_quota_server_pre_init(); retry_pre_init(); + retry_cancel_after_first_attempt_starts_pre_init(); retry_cancel_during_delay_pre_init(); retry_cancel_with_multiple_send_batches_pre_init(); retry_cancellation_pre_init(); @@ -363,6 +366,7 @@ void grpc_end2end_tests(int argc, char **argv, request_with_payload(config); resource_quota_server(config); retry(config); + retry_cancel_after_first_attempt_starts(config); retry_cancel_during_delay(config); retry_cancel_with_multiple_send_batches(config); retry_cancellation(config); @@ -607,6 +611,10 @@ void grpc_end2end_tests(int argc, char **argv, retry(config); continue; } + if (0 == strcmp("retry_cancel_after_first_attempt_starts", argv[i])) { + retry_cancel_after_first_attempt_starts(config); + continue; + } if (0 == strcmp("retry_cancel_during_delay", argv[i])) { retry_cancel_during_delay(config); continue; diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl index 022c3ad2635..6b86a36ca5a 100755 --- a/test/core/end2end/generate_tests.bzl +++ b/test/core/end2end/generate_tests.bzl @@ -308,6 +308,12 @@ END2END_TESTS = { short_name = "retry_cancel3", needs_client_channel = True, ), + "retry_cancel_after_first_attempt_starts": _test_options( + # TODO(jtattermusch): too long bazel test name makes the test flaky on Windows RBE + # See b/151617965 + short_name = "retry_cancel4", + needs_client_channel = True, + ), "retry_disabled": _test_options(needs_client_channel = True), "retry_exceeds_buffer_size_in_delay": _test_options(needs_client_channel = True), "retry_exceeds_buffer_size_in_initial_batch": _test_options( diff --git a/test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc b/test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc new file mode 100644 index 00000000000..88d9adfcfe5 --- /dev/null +++ b/test/core/end2end/tests/retry_cancel_after_first_attempt_starts.cc @@ -0,0 +1,218 @@ +// +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/end2end/end2end_tests.h" +#include "test/core/end2end/tests/cancel_test_helpers.h" + +static void* tag(intptr_t t) { return reinterpret_cast(t); } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char* test_name, + grpc_channel_args* client_args, + grpc_channel_args* server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue* cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture* f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = nullptr; +} + +static void shutdown_client(grpc_end2end_test_fixture* f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = nullptr; +} + +static void end_test(grpc_end2end_test_fixture* f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +// Tests that we can unref a call after the first attempt starts but +// before any ops complete. This should not cause a memory leak. +static void test_retry_cancel_after_first_attempt_starts( + grpc_end2end_test_config config) { + grpc_call* c; + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + // Make sure to dynamically allocate the request payload slice, so + // that the test will fail under ASAN if we don't clean up properly. + const char request_string[] = "foo"; + grpc_slice request_payload_slice = + grpc_slice_malloc_large(sizeof(request_string)); + memcpy(GRPC_SLICE_START_PTR(request_payload_slice), request_string, + sizeof(request_string)); + grpc_byte_buffer* request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref(request_payload_slice); + grpc_byte_buffer* response_payload_recv = nullptr; + grpc_status_code status; + grpc_slice details; + grpc_call_error error; + + grpc_arg args[] = { + grpc_channel_arg_string_create( + const_cast(GRPC_ARG_SERVICE_CONFIG), + const_cast( + "{\n" + " \"methodConfig\": [ {\n" + " \"name\": [\n" + " { \"service\": \"service\", \"method\": \"method\" }\n" + " ],\n" + " \"retryPolicy\": {\n" + " \"maxAttempts\": 3,\n" + " \"initialBackoff\": \"1s\",\n" + " \"maxBackoff\": \"120s\",\n" + " \"backoffMultiplier\": 1.6,\n" + " \"retryableStatusCodes\": [ \"ABORTED\" ]\n" + " }\n" + " } ]\n" + "}")), + }; + grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args}; + grpc_end2end_test_fixture f = begin_test( + config, "retry_cancel_after_first_attempt_starts", &client_args, nullptr); + + cq_verifier* cqv = cq_verifier_create(f.cq); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, deadline, nullptr); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + + // Client starts send ops. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Client starts recv_initial_metadata and recv_message, but not + // recv_trailing_metadata. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(2), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Client starts recv_trailing_metadata. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(3), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Client unrefs the call without starting recv_trailing_metadata. + // This should trigger a cancellation. + grpc_call_unref(c); + + // The send ops batch and the first recv ops batch will fail in most + // fixtures but will pass in the proxy fixtures on some platforms. + CQ_EXPECT_COMPLETION_ANY_STATUS(cqv, tag(1)); + CQ_EXPECT_COMPLETION_ANY_STATUS(cqv, tag(2)); + CQ_EXPECT_COMPLETION(cqv, tag(3), true); + cq_verify(cqv); + + grpc_slice_unref(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload_recv); + + cq_verifier_destroy(cqv); + + end_test(&f); + config.tear_down_data(&f); +} + +void retry_cancel_after_first_attempt_starts(grpc_end2end_test_config config) { + GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL); + test_retry_cancel_after_first_attempt_starts(config); +} + +void retry_cancel_after_first_attempt_starts_pre_init(void) {}