From 8a8f1eba4bc8471b199334a6aa41916515372626 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 15 May 2023 15:01:21 -0700 Subject: [PATCH] [promises] Enable server promise calls in C++ e2e tests (#33097) #thistimeforsure a863532c6293eceacdddfd5ce685507091cf30a0 adds some debug to help track which batches get leaked by a transport 3203e75ec5d1e9a703a655552afbc16df78884c5 makes connected_channel respect the high level intent of cancellation better (and fixes the last reason we needed to turn these tests off) aaf5fa036b9a483c4900a959abb96efade884024 re-enables testing of c++ e2e tests with server based promise calls --- bazel/experiments.bzl | 6 +++ src/core/lib/channel/connected_channel.cc | 15 ++++--- src/core/lib/experiments/experiments.yaml | 3 +- src/core/lib/transport/batch_builder.cc | 4 ++ src/core/lib/transport/batch_builder.h | 54 ++++++++++++----------- 5 files changed, 49 insertions(+), 33 deletions(-) diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 893680b7c8a..3bb7d1bb62c 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -29,6 +29,9 @@ EXPERIMENTS = { "promise_based_client_call", "promise_based_server_call", ], + "cpp_end2end_test": [ + "promise_based_server_call", + ], "endpoint_test": [ "tcp_frame_size_tuning", "tcp_rcv_lowat", @@ -52,6 +55,9 @@ EXPERIMENTS = { "memory_pressure_controller", "unconstrained_max_quota_buffer_size", ], + "xds_end2end_test": [ + "promise_based_server_call", + ], }, "on": { "flow_control_test": [ diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index dc6d9e2b017..bd6188dcf5a 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -710,13 +710,16 @@ ArenaPromise MakeServerCallPromise( // Promise factory that accepts a ServerMetadataHandle, and sends it as the // trailing metadata for this call. - auto send_trailing_metadata = - [call_data, stream = stream->InternalRef()]( - ServerMetadataHandle server_trailing_metadata) { - return GetContext()->SendServerTrailingMetadata( - stream->batch_target(), std::move(server_trailing_metadata), + auto send_trailing_metadata = [call_data, stream = stream->InternalRef()]( + ServerMetadataHandle + server_trailing_metadata) { + bool is_cancellation = + server_trailing_metadata->get(GrpcCallWasCancelled()).value_or(false); + return GetContext()->SendServerTrailingMetadata( + stream->batch_target(), std::move(server_trailing_metadata), + is_cancellation || !std::exchange(call_data->sent_initial_metadata, true)); - }; + }; // Runs the receive message loop, either until all the messages // are received or the server call is complete. diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 3db307e1c60..13c59d4d860 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -115,8 +115,7 @@ default: false expiry: 2023/06/01 owner: ctiller@google.com - test_tags: ["core_end2end_test"] - disabled_test_tags: ["cpp_end2end_test", "xds_end2end_test"] + test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test"] - name: transport_supplies_client_latency description: If set, use the transport represented value for client latency in opencensus default: false diff --git a/src/core/lib/transport/batch_builder.cc b/src/core/lib/transport/batch_builder.cc index 06d8c0a72f9..82c3bba863e 100644 --- a/src/core/lib/transport/batch_builder.cc +++ b/src/core/lib/transport/batch_builder.cc @@ -71,6 +71,10 @@ BatchBuilder::Batch::Batch(grpc_transport_stream_op_batch_payload* payload, BatchBuilder::Batch::~Batch() { auto* arena = party->arena(); + if (grpc_call_trace.enabled()) { + gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Destroy", + Activity::current()->DebugTag().c_str(), this); + } if (pending_receive_message != nullptr) { arena->DeletePooled(pending_receive_message); } diff --git a/src/core/lib/transport/batch_builder.h b/src/core/lib/transport/batch_builder.h index 6b494a00fb1..f714af6e7f8 100644 --- a/src/core/lib/transport/batch_builder.h +++ b/src/core/lib/transport/batch_builder.h @@ -282,12 +282,12 @@ class BatchBuilder { }; inline auto BatchBuilder::SendMessage(Target target, MessageHandle message) { + auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] Queue send message: %s", - Activity::current()->DebugTag().c_str(), + gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue send message: %s", + Activity::current()->DebugTag().c_str(), batch, message->DebugString().c_str()); } - auto* batch = GetBatch(target); auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); batch->batch.on_complete = &pc->on_done_closure; batch->batch.send_message = true; @@ -299,11 +299,13 @@ inline auto BatchBuilder::SendMessage(Target target, MessageHandle message) { inline auto BatchBuilder::SendInitialMetadata( Target target, Arena::PoolPtr md) { + auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] Queue send initial metadata: %s", - Activity::current()->DebugTag().c_str(), md->DebugString().c_str()); + gpr_log(GPR_DEBUG, + "%s[connected] [batch %p] Queue send initial metadata: %s", + Activity::current()->DebugTag().c_str(), batch, + md->DebugString().c_str()); } - auto* batch = GetBatch(target); auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); batch->batch.on_complete = &pc->on_done_closure; batch->batch.send_initial_metadata = true; @@ -318,11 +320,11 @@ inline auto BatchBuilder::SendClientInitialMetadata( } inline auto BatchBuilder::SendClientTrailingMetadata(Target target) { + auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] Queue send trailing metadata", - Activity::current()->DebugTag().c_str()); + gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue send trailing metadata", + Activity::current()->DebugTag().c_str(), batch); } - auto* batch = GetBatch(target); auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); batch->batch.on_complete = &pc->on_done_closure; batch->batch.send_trailing_metadata = true; @@ -342,13 +344,6 @@ inline auto BatchBuilder::SendServerInitialMetadata( inline auto BatchBuilder::SendServerTrailingMetadata( Target target, ServerMetadataHandle metadata, bool convert_to_cancellation) { - if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] %s: %s", - Activity::current()->DebugTag().c_str(), - convert_to_cancellation ? "Send trailing metadata as cancellation" - : "Queue send trailing metadata", - metadata->DebugString().c_str()); - } Batch* batch; PendingSends* pc; if (convert_to_cancellation) { @@ -368,6 +363,13 @@ inline auto BatchBuilder::SendServerTrailingMetadata( payload_->send_trailing_metadata.send_trailing_metadata = metadata.get(); payload_->send_trailing_metadata.sent = &pc->trailing_metadata_sent; } + if (grpc_call_trace.enabled()) { + gpr_log(GPR_DEBUG, "%s[connected] [batch %p] %s: %s", + Activity::current()->DebugTag().c_str(), batch, + convert_to_cancellation ? "Send trailing metadata as cancellation" + : "Queue send trailing metadata", + metadata->DebugString().c_str()); + } batch->batch.on_complete = &pc->on_done_closure; pc->send_trailing_metadata = std::move(metadata); auto promise = batch->RefUntil( @@ -383,11 +385,11 @@ inline auto BatchBuilder::SendServerTrailingMetadata( } inline auto BatchBuilder::ReceiveMessage(Target target) { + auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] Queue receive message", - Activity::current()->DebugTag().c_str()); + gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue receive message", + Activity::current()->DebugTag().c_str(), batch); } - auto* batch = GetBatch(target); auto* pc = batch->GetInitializedCompletion(&Batch::pending_receive_message); batch->batch.recv_message = true; payload_->recv_message.recv_message_ready = &pc->on_done_closure; @@ -411,11 +413,12 @@ inline auto BatchBuilder::ReceiveMessage(Target target) { } inline auto BatchBuilder::ReceiveInitialMetadata(Target target) { + auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] Queue receive initial metadata", - Activity::current()->DebugTag().c_str()); + gpr_log(GPR_DEBUG, + "%s[connected] [batch %p] Queue receive initial metadata", + Activity::current()->DebugTag().c_str(), batch); } - auto* batch = GetBatch(target); auto* pc = batch->GetInitializedCompletion(&Batch::pending_receive_initial_metadata); batch->batch.recv_initial_metadata = true; @@ -439,11 +442,12 @@ inline auto BatchBuilder::ReceiveServerInitialMetadata(Target target) { } inline auto BatchBuilder::ReceiveTrailingMetadata(Target target) { + auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] Queue receive trailing metadata", - Activity::current()->DebugTag().c_str()); + gpr_log(GPR_DEBUG, + "%s[connected] [batch %p] Queue receive trailing metadata", + Activity::current()->DebugTag().c_str(), batch); } - auto* batch = GetBatch(target); auto* pc = batch->GetInitializedCompletion( &Batch::pending_receive_trailing_metadata); batch->batch.recv_trailing_metadata = true;