diff --git a/CMakeLists.txt b/CMakeLists.txt index 6117ce13220..68df8560878 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1191,6 +1191,8 @@ add_library(end2end_nosec_tests test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc test/core/end2end/tests/retry_throttled.cc test/core/end2end/tests/retry_too_many_attempts.cc + test/core/end2end/tests/retry_unref_before_finish.cc + test/core/end2end/tests/retry_unref_before_recv.cc test/core/end2end/tests/server_finishes_request.cc test/core/end2end/tests/server_streaming.cc test/core/end2end/tests/shutdown_finishes_calls.cc @@ -1334,6 +1336,8 @@ add_library(end2end_tests test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc test/core/end2end/tests/retry_throttled.cc test/core/end2end/tests/retry_too_many_attempts.cc + test/core/end2end/tests/retry_unref_before_finish.cc + test/core/end2end/tests/retry_unref_before_recv.cc test/core/end2end/tests/server_finishes_request.cc test/core/end2end/tests/server_streaming.cc test/core/end2end/tests/shutdown_finishes_calls.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 09e7896c491..e51a8270dfd 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -112,6 +112,8 @@ libs: - test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc - test/core/end2end/tests/retry_throttled.cc - test/core/end2end/tests/retry_too_many_attempts.cc + - test/core/end2end/tests/retry_unref_before_finish.cc + - test/core/end2end/tests/retry_unref_before_recv.cc - test/core/end2end/tests/server_finishes_request.cc - test/core/end2end/tests/server_streaming.cc - test/core/end2end/tests/shutdown_finishes_calls.cc @@ -246,6 +248,8 @@ libs: - test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc - test/core/end2end/tests/retry_throttled.cc - test/core/end2end/tests/retry_too_many_attempts.cc + - test/core/end2end/tests/retry_unref_before_finish.cc + - test/core/end2end/tests/retry_unref_before_recv.cc - test/core/end2end/tests/server_finishes_request.cc - test/core/end2end/tests/server_streaming.cc - test/core/end2end/tests/shutdown_finishes_calls.cc diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 86597ffab7b..4923a328afa 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -2487,6 +2487,8 @@ Pod::Spec.new do |s| 'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc', 'test/core/end2end/tests/retry_throttled.cc', 'test/core/end2end/tests/retry_too_many_attempts.cc', + 'test/core/end2end/tests/retry_unref_before_finish.cc', + 'test/core/end2end/tests/retry_unref_before_recv.cc', 'test/core/end2end/tests/server_finishes_request.cc', 'test/core/end2end/tests/server_streaming.cc', 'test/core/end2end/tests/shutdown_finishes_calls.cc', diff --git a/grpc.gyp b/grpc.gyp index 737fe8ecc9a..dc53358e716 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -265,6 +265,8 @@ 'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc', 'test/core/end2end/tests/retry_throttled.cc', 'test/core/end2end/tests/retry_too_many_attempts.cc', + 'test/core/end2end/tests/retry_unref_before_finish.cc', + 'test/core/end2end/tests/retry_unref_before_recv.cc', 'test/core/end2end/tests/server_finishes_request.cc', 'test/core/end2end/tests/server_streaming.cc', 'test/core/end2end/tests/shutdown_finishes_calls.cc', @@ -376,6 +378,8 @@ 'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc', 'test/core/end2end/tests/retry_throttled.cc', 'test/core/end2end/tests/retry_too_many_attempts.cc', + 'test/core/end2end/tests/retry_unref_before_finish.cc', + 'test/core/end2end/tests/retry_unref_before_recv.cc', 'test/core/end2end/tests/server_finishes_request.cc', 'test/core/end2end/tests/server_streaming.cc', 'test/core/end2end/tests/shutdown_finishes_calls.cc', diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index d34390ed1e1..4fc34f62c08 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -1065,6 +1065,7 @@ void RetryFilter::CallData::CallAttempt::StartRetriableBatches() { void RetryFilter::CallData::CallAttempt::CancelFromSurface( grpc_transport_stream_op_batch* cancel_batch) { MaybeCancelPerAttemptRecvTimer(); + Abandon(); // Propagate cancellation to LB call. lb_call_->StartTransportStreamOpBatch(cancel_batch); } @@ -1181,23 +1182,23 @@ void RetryFilter::CallData::CallAttempt::Abandon() { !seen_recv_trailing_metadata_from_surface_) { recv_trailing_metadata_internal_batch_.reset( DEBUG_LOCATION, - "internal recv_trailing_metadata completed before that op was " - "started from the surface"); + "unref internal recv_trailing_metadata_ready batch; attempt abandoned"); } GRPC_ERROR_UNREF(recv_trailing_metadata_error_); recv_trailing_metadata_error_ = GRPC_ERROR_NONE; recv_initial_metadata_ready_deferred_batch_.reset( DEBUG_LOCATION, - "unref deferred recv_initial_metadata_ready batch due to retry"); + "unref deferred recv_initial_metadata_ready batch; attempt abandoned"); GRPC_ERROR_UNREF(recv_initial_metadata_error_); recv_initial_metadata_error_ = GRPC_ERROR_NONE; recv_message_ready_deferred_batch_.reset( - DEBUG_LOCATION, "unref deferred recv_message_ready batch due to retry"); + DEBUG_LOCATION, + "unref deferred recv_message_ready batch; attempt abandoned"); GRPC_ERROR_UNREF(recv_message_error_); recv_message_error_ = GRPC_ERROR_NONE; for (auto& on_complete_deferred_batch : on_complete_deferred_batches_) { on_complete_deferred_batch.batch.reset( - DEBUG_LOCATION, "unref deferred on_complete batch due to retry"); + DEBUG_LOCATION, "unref deferred on_complete batch; attempt abandoned"); GRPC_ERROR_UNREF(on_complete_deferred_batch.error); } on_complete_deferred_batches_.clear(); @@ -2106,13 +2107,26 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( committed_call_->StartTransportStreamOpBatch(batch); return; } + // If we were previously cancelled from the surface, fail this + // batch immediately. + if (cancelled_from_surface_ != GRPC_ERROR_NONE) { + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batch, GRPC_ERROR_REF(cancelled_from_surface_), call_combiner_); + return; + } // Handle cancellation. if (GPR_UNLIKELY(batch->cancel_stream)) { - grpc_error_handle cancel_error = batch->payload->cancel_stream.cancel_error; + // Save cancel_error in case subsequent batches are started. + GRPC_ERROR_UNREF(cancelled_from_surface_); + cancelled_from_surface_ = + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: cancelled from surface: %s", chand_, - this, grpc_error_std_string(cancel_error).c_str()); + this, grpc_error_std_string(cancelled_from_surface_).c_str()); } + // Fail any pending batches. + PendingBatchesFail(GRPC_ERROR_REF(cancelled_from_surface_)); // If we have a current call attempt, commit the call, then send // the cancellation down to that attempt. When the call fails, it // will not be retried, because we have committed it here. @@ -2128,10 +2142,7 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( call_attempt_->CancelFromSurface(batch); return; } - // Save cancel_error in case subsequent batches are started. - GRPC_ERROR_UNREF(cancelled_from_surface_); - cancelled_from_surface_ = GRPC_ERROR_REF(cancel_error); - // Cancel retry timer. + // Cancel retry timer if needed. if (retry_timer_pending_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling retry timer", chand_, @@ -2141,11 +2152,11 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( grpc_timer_cancel(&retry_timer_); FreeAllCachedSendOpData(); } - // Fail pending batches. - PendingBatchesFail(GRPC_ERROR_REF(cancel_error)); + // We have no call attempt, so there's nowhere to send the cancellation + // batch. Return it back to the surface immediately. // Note: This will release the call combiner. grpc_transport_stream_op_batch_finish_with_failure( - batch, GRPC_ERROR_REF(cancel_error), call_combiner_); + batch, GRPC_ERROR_REF(cancelled_from_surface_), call_combiner_); return; } // Add the batch to the pending list. @@ -2159,15 +2170,6 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( } // If we do not yet have a call attempt, create one. if (call_attempt_ == nullptr) { - // If we were previously cancelled from the surface, cancel this - // batch instead of creating a call attempt. - if (cancelled_from_surface_ != GRPC_ERROR_NONE) { - PendingBatchClear(pending); - // Note: This will release the call combiner. - grpc_transport_stream_op_batch_finish_with_failure( - batch, GRPC_ERROR_REF(cancelled_from_surface_), call_combiner_); - return; - } // If there is no retry policy, then commit retries immediately. // This ensures that the code below will always jump to the fast path. // TODO(roth): Remove this special case when we implement diff --git a/test/core/end2end/end2end_nosec_tests.cc b/test/core/end2end/end2end_nosec_tests.cc index 1f0ab7d8366..a953188c8dd 100644 --- a/test/core/end2end/end2end_nosec_tests.cc +++ b/test/core/end2end/end2end_nosec_tests.cc @@ -177,6 +177,10 @@ extern void retry_throttled(grpc_end2end_test_config config); extern void retry_throttled_pre_init(void); extern void retry_too_many_attempts(grpc_end2end_test_config config); extern void retry_too_many_attempts_pre_init(void); +extern void retry_unref_before_finish(grpc_end2end_test_config config); +extern void retry_unref_before_finish_pre_init(void); +extern void retry_unref_before_recv(grpc_end2end_test_config config); +extern void retry_unref_before_recv_pre_init(void); extern void server_finishes_request(grpc_end2end_test_config config); extern void server_finishes_request_pre_init(void); extern void server_streaming(grpc_end2end_test_config config); @@ -279,6 +283,8 @@ void grpc_end2end_tests_pre_init(void) { retry_streaming_succeeds_before_replay_finished_pre_init(); retry_throttled_pre_init(); retry_too_many_attempts_pre_init(); + retry_unref_before_finish_pre_init(); + retry_unref_before_recv_pre_init(); server_finishes_request_pre_init(); server_streaming_pre_init(); shutdown_finishes_calls_pre_init(); @@ -375,6 +381,8 @@ void grpc_end2end_tests(int argc, char **argv, retry_streaming_succeeds_before_replay_finished(config); retry_throttled(config); retry_too_many_attempts(config); + retry_unref_before_finish(config); + retry_unref_before_recv(config); server_finishes_request(config); server_streaming(config); shutdown_finishes_calls(config); @@ -687,6 +695,14 @@ void grpc_end2end_tests(int argc, char **argv, retry_too_many_attempts(config); continue; } + if (0 == strcmp("retry_unref_before_finish", argv[i])) { + retry_unref_before_finish(config); + continue; + } + if (0 == strcmp("retry_unref_before_recv", argv[i])) { + retry_unref_before_recv(config); + continue; + } if (0 == strcmp("server_finishes_request", argv[i])) { server_finishes_request(config); continue; diff --git a/test/core/end2end/end2end_tests.cc b/test/core/end2end/end2end_tests.cc index adb637255c4..a9b0e10dbc6 100644 --- a/test/core/end2end/end2end_tests.cc +++ b/test/core/end2end/end2end_tests.cc @@ -181,6 +181,10 @@ extern void retry_throttled(grpc_end2end_test_config config); extern void retry_throttled_pre_init(void); extern void retry_too_many_attempts(grpc_end2end_test_config config); extern void retry_too_many_attempts_pre_init(void); +extern void retry_unref_before_finish(grpc_end2end_test_config config); +extern void retry_unref_before_finish_pre_init(void); +extern void retry_unref_before_recv(grpc_end2end_test_config config); +extern void retry_unref_before_recv_pre_init(void); extern void server_finishes_request(grpc_end2end_test_config config); extern void server_finishes_request_pre_init(void); extern void server_streaming(grpc_end2end_test_config config); @@ -285,6 +289,8 @@ void grpc_end2end_tests_pre_init(void) { retry_streaming_succeeds_before_replay_finished_pre_init(); retry_throttled_pre_init(); retry_too_many_attempts_pre_init(); + retry_unref_before_finish_pre_init(); + retry_unref_before_recv_pre_init(); server_finishes_request_pre_init(); server_streaming_pre_init(); shutdown_finishes_calls_pre_init(); @@ -383,6 +389,8 @@ void grpc_end2end_tests(int argc, char **argv, retry_streaming_succeeds_before_replay_finished(config); retry_throttled(config); retry_too_many_attempts(config); + retry_unref_before_finish(config); + retry_unref_before_recv(config); server_finishes_request(config); server_streaming(config); shutdown_finishes_calls(config); @@ -703,6 +711,14 @@ void grpc_end2end_tests(int argc, char **argv, retry_too_many_attempts(config); continue; } + if (0 == strcmp("retry_unref_before_finish", argv[i])) { + retry_unref_before_finish(config); + continue; + } + if (0 == strcmp("retry_unref_before_recv", argv[i])) { + retry_unref_before_recv(config); + continue; + } if (0 == strcmp("server_finishes_request", argv[i])) { server_finishes_request(config); continue; diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl index abc3faac57d..022c3ad2635 100755 --- a/test/core/end2end/generate_tests.bzl +++ b/test/core/end2end/generate_tests.bzl @@ -356,6 +356,8 @@ END2END_TESTS = { ), "retry_throttled": _test_options(needs_client_channel = True), "retry_too_many_attempts": _test_options(needs_client_channel = True), + "retry_unref_before_finish": _test_options(needs_client_channel = True), + "retry_unref_before_recv": _test_options(needs_client_channel = True), "server_finishes_request": _test_options(), "server_streaming": _test_options(needs_http2 = True), "shutdown_finishes_calls": _test_options(), diff --git a/test/core/end2end/tests/retry_unref_before_finish.cc b/test/core/end2end/tests/retry_unref_before_finish.cc new file mode 100644 index 00000000000..38e3f23932b --- /dev/null +++ b/test/core/end2end/tests/retry_unref_before_finish.cc @@ -0,0 +1,244 @@ +// +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/end2end/end2end_tests.h" +#include "test/core/end2end/tests/cancel_test_helpers.h" + +static void* tag(intptr_t t) { return reinterpret_cast(t); } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char* test_name, + grpc_channel_args* client_args, + grpc_channel_args* server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue* cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture* f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = nullptr; +} + +static void shutdown_client(grpc_end2end_test_fixture* f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = nullptr; +} + +static void end_test(grpc_end2end_test_fixture* f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +// Tests that we can unref a call whose status is cached but not yet +// requested by the application. This should not cause a memory leak. +static void test_retry_unref_before_finish(grpc_end2end_test_config config) { + grpc_call* c; + grpc_call* s; + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_slice request_payload_slice = grpc_slice_from_static_string("foo"); + grpc_slice response_payload_slice = grpc_slice_from_static_string("bar"); + grpc_byte_buffer* request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_byte_buffer* response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + grpc_byte_buffer* request_payload_recv = nullptr; + grpc_byte_buffer* response_payload_recv = nullptr; + grpc_call_error error; + int was_cancelled = 2; + + grpc_arg args[] = { + grpc_channel_arg_string_create( + const_cast(GRPC_ARG_SERVICE_CONFIG), + const_cast( + "{\n" + " \"methodConfig\": [ {\n" + " \"name\": [\n" + " { \"service\": \"service\", \"method\": \"method\" }\n" + " ],\n" + " \"retryPolicy\": {\n" + " \"maxAttempts\": 3,\n" + " \"initialBackoff\": \"1s\",\n" + " \"maxBackoff\": \"120s\",\n" + " \"backoffMultiplier\": 1.6,\n" + " \"retryableStatusCodes\": [ \"ABORTED\" ]\n" + " }\n" + " } ]\n" + "}")), + }; + grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args}; + grpc_end2end_test_fixture f = + begin_test(config, "retry_unref_before_finish", &client_args, nullptr); + + cq_verifier* cqv = cq_verifier_create(f.cq); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, deadline, nullptr); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + + // Client starts send ops. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Client starts recv_initial_metadata and recv_message, but not + // recv_trailing_metadata. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(2), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Server gets a call and client send ops complete. + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(1), true); + CQ_EXPECT_COMPLETION(cqv, tag(101), true); + cq_verify(cqv); + + // Server immediately sends FAILED_PRECONDITION status (not retriable). + // This forces the retry filter to start a recv_trailing_metadata op + // internally, since the application hasn't started it yet. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_FAILED_PRECONDITION; + op->data.send_status_from_server.status_details = &status_details; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Server ops complete and client recv ops complete. + CQ_EXPECT_COMPLETION(cqv, tag(2), true); + CQ_EXPECT_COMPLETION(cqv, tag(102), true); + cq_verify(cqv); + + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); + GPR_ASSERT(0 == call_details.flags); + GPR_ASSERT(was_cancelled == 0); + + grpc_call_unref(s); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_call_details_init(&call_details); + + // Client unrefs the call without starting recv_trailing_metadata. + // This should trigger a cancellation. + grpc_call_unref(c); + + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); + grpc_byte_buffer_destroy(response_payload_recv); + + cq_verifier_destroy(cqv); + + end_test(&f); + config.tear_down_data(&f); +} + +void retry_unref_before_finish(grpc_end2end_test_config config) { + GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL); + test_retry_unref_before_finish(config); +} + +void retry_unref_before_finish_pre_init(void) {} diff --git a/test/core/end2end/tests/retry_unref_before_recv.cc b/test/core/end2end/tests/retry_unref_before_recv.cc new file mode 100644 index 00000000000..006a8fe9c53 --- /dev/null +++ b/test/core/end2end/tests/retry_unref_before_recv.cc @@ -0,0 +1,247 @@ +// +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/end2end/end2end_tests.h" +#include "test/core/end2end/tests/cancel_test_helpers.h" + +static void* tag(intptr_t t) { return reinterpret_cast(t); } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char* test_name, + grpc_channel_args* client_args, + grpc_channel_args* server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue* cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture* f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = nullptr; +} + +static void shutdown_client(grpc_end2end_test_fixture* f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = nullptr; +} + +static void end_test(grpc_end2end_test_fixture* f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +// Tests that we can unref a call while recv ops are started but before +// they complete. This ensures that we don't drop callbacks or cause a +// memory leak. +static void test_retry_unref_before_recv(grpc_end2end_test_config config) { + grpc_call* c; + grpc_call* s; + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_slice request_payload_slice = grpc_slice_from_static_string("foo"); + grpc_slice response_payload_slice = grpc_slice_from_static_string("bar"); + grpc_byte_buffer* request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_byte_buffer* response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + grpc_byte_buffer* request_payload_recv = nullptr; + grpc_byte_buffer* response_payload_recv = nullptr; + grpc_call_error error; + int was_cancelled = 2; + + grpc_arg args[] = { + grpc_channel_arg_string_create( + const_cast(GRPC_ARG_SERVICE_CONFIG), + const_cast( + "{\n" + " \"methodConfig\": [ {\n" + " \"name\": [\n" + " { \"service\": \"service\", \"method\": \"method\" }\n" + " ],\n" + " \"retryPolicy\": {\n" + " \"maxAttempts\": 3,\n" + " \"initialBackoff\": \"1s\",\n" + " \"maxBackoff\": \"120s\",\n" + " \"backoffMultiplier\": 1.6,\n" + " \"retryableStatusCodes\": [ \"ABORTED\" ]\n" + " }\n" + " } ]\n" + "}")), + }; + grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args}; + grpc_end2end_test_fixture f = + begin_test(config, "retry_unref_before_recv", &client_args, nullptr); + + cq_verifier* cqv = cq_verifier_create(f.cq); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, deadline, nullptr); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + + // Client starts send ops. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Client starts recv_initial_metadata and recv_message, but not + // recv_trailing_metadata. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(2), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Server gets a call and client send ops complete. + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(1), true); + CQ_EXPECT_COMPLETION(cqv, tag(101), true); + cq_verify(cqv); + + // Client unrefs the call without starting recv_trailing_metadata. + // This should trigger a cancellation. + grpc_call_unref(c); + + // Server immediately sends FAILED_PRECONDITION status (not retriable). + // This forces the retry filter to start a recv_trailing_metadata op + // internally, since the application hasn't started it yet. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_FAILED_PRECONDITION; + op->data.send_status_from_server.status_details = &status_details; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Server ops complete and client recv ops complete. + CQ_EXPECT_COMPLETION(cqv, tag(2), false); // Failure! + CQ_EXPECT_COMPLETION(cqv, tag(102), true); + cq_verify(cqv); + + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); + GPR_ASSERT(0 == call_details.flags); + // Note: Not checking the value of was_cancelled here, because it will + // be flaky, depending on whether the server sent its response before + // the client sent its cancellation. + + grpc_call_unref(s); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_call_details_init(&call_details); + + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); + grpc_byte_buffer_destroy(response_payload_recv); + + cq_verifier_destroy(cqv); + + end_test(&f); + config.tear_down_data(&f); +} + +void retry_unref_before_recv(grpc_end2end_test_config config) { + GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL); + test_retry_unref_before_recv(config); +} + +void retry_unref_before_recv_pre_init(void) {}