From 74ec5d1684dcdb7d7a9b62c3d2a80c31f0c87221 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 16 May 2023 11:53:18 -0700 Subject: [PATCH] [promises] Improve logging, fix a rare bug (#33139) Rare bug: server initial metadata gets stranded in the outbound pipe. (fix is a little unpleasant, but we'll do better at the five pipes stage) --- src/core/lib/channel/connected_channel.cc | 8 ++++ src/core/lib/promise/party.h | 6 +++ src/core/lib/promise/pipe.h | 11 ++++- src/core/lib/transport/batch_builder.cc | 23 +++++----- src/core/lib/transport/batch_builder.h | 56 +++++++++++------------ test/core/promise/pipe_test.cc | 30 ++++++++++++ 6 files changed, 92 insertions(+), 42 deletions(-) diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index bd6188dcf5a..44b1fdb97ba 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -824,11 +824,19 @@ ArenaPromise MakeServerCallPromise( auto cleanup_polling_entity_latch = std::unique_ptr, CleanupPollingEntityLatch>( &call_data->polling_entity_latch); + struct CleanupSendInitialMetadata { + void operator()(CallData* call_data) { + call_data->server_initial_metadata.receiver.CloseWithError(); + } + }; + auto cleanup_send_initial_metadata = + std::unique_ptr(call_data); return Map( Seq(std::move(recv_initial_metadata_then_run_promise), std::move(send_trailing_metadata)), [cleanup_polling_entity_latch = std::move(cleanup_polling_entity_latch), + cleanup_send_initial_metadata = std::move(cleanup_send_initial_metadata), stream = std::move(stream)](ServerMetadataHandle md) { stream->set_finished(); return md; diff --git a/src/core/lib/promise/party.h b/src/core/lib/promise/party.h index 90e4f4f7481..12c105edd3b 100644 --- a/src/core/lib/promise/party.h +++ b/src/core/lib/promise/party.h @@ -29,6 +29,7 @@ #include +#include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/construct_destruct.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -37,6 +38,7 @@ #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/detail/promise_factory.h" +#include "src/core/lib/promise/trace.h" #include "src/core/lib/resource_quota/arena.h" // Two implementations of party synchronization are provided: one using a single @@ -482,6 +484,10 @@ class Party : public Activity, private Wakeable { template void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory, OnComplete on_complete) { + if (grpc_trace_promise_primitives.enabled()) { + gpr_log(GPR_DEBUG, "%s[bulk_spawn] On %p queue %s", + party_->DebugTag().c_str(), this, std::string(name).c_str()); + } participants_[num_participants_++] = party_->arena_->NewPooled>( name, std::move(promise_factory), std::move(on_complete)); diff --git a/src/core/lib/promise/pipe.h b/src/core/lib/promise/pipe.h index 599bf0d817d..f485d558f5b 100644 --- a/src/core/lib/promise/pipe.h +++ b/src/core/lib/promise/pipe.h @@ -377,8 +377,8 @@ class Center : public InterceptorList { std::string DebugTag() { if (auto* activity = Activity::current()) { - return absl::StrCat(activity->DebugTag(), " PIPE[0x", - reinterpret_cast(this), "]: "); + return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this), + "]: "); } else { return absl::StrCat("PIPE[0x", reinterpret_cast(this), "]: "); } @@ -610,6 +610,13 @@ class PipeReceiver { return [center = center_]() { return center->PollEmpty(); }; } + void CloseWithError() { + if (center_ != nullptr) { + center_->MarkCancelled(); + center_.reset(); + } + } + // Interject PromiseFactory f into the pipeline. // f will be called with the current value traversing the pipe, and should // return a value to replace it with. diff --git a/src/core/lib/transport/batch_builder.cc b/src/core/lib/transport/batch_builder.cc index 82c3bba863e..09c57d6688b 100644 --- a/src/core/lib/transport/batch_builder.cc +++ b/src/core/lib/transport/batch_builder.cc @@ -36,8 +36,8 @@ void BatchBuilder::PendingCompletion::CompletionCallback( auto* party = pc->batch->party.get(); if (grpc_call_trace.enabled()) { gpr_log( - GPR_DEBUG, "%s[connected] Finish batch-component %s for %s: status=%s", - party->DebugTag().c_str(), std::string(pc->name()).c_str(), + GPR_DEBUG, "%sFinish batch-component %s for %s: status=%s", + pc->batch->DebugPrefix(party).c_str(), std::string(pc->name()).c_str(), grpc_transport_stream_op_batch_string(&pc->batch->batch, false).c_str(), error.ToString().c_str()); } @@ -118,8 +118,8 @@ void BatchBuilder::FlushBatch() { GPR_ASSERT(target_.has_value()); if (grpc_call_trace.enabled()) { gpr_log( - GPR_DEBUG, "%s[connected] Perform transport stream op batch: %p %s", - batch_->party->DebugTag().c_str(), &batch_->batch, + GPR_DEBUG, "%sPerform transport stream op batch: %p %s", + batch_->DebugPrefix().c_str(), &batch_->batch, grpc_transport_stream_op_batch_string(&batch_->batch, false).c_str()); } std::exchange(batch_, nullptr)->PerformWith(*target_); @@ -131,15 +131,14 @@ void BatchBuilder::Batch::PerformWith(Target target) { } ServerMetadataHandle BatchBuilder::CompleteSendServerTrailingMetadata( - ServerMetadataHandle sent_metadata, absl::Status send_result, + Batch* batch, ServerMetadataHandle sent_metadata, absl::Status send_result, bool actually_sent) { if (!send_result.ok()) { if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, - "%s[connected] Send metadata failed with error: %s, " - "fabricating trailing metadata", - Activity::current()->DebugTag().c_str(), - send_result.ToString().c_str()); + "%sSend metadata failed with error: %s, fabricating trailing " + "metadata", + batch->DebugPrefix().c_str(), send_result.ToString().c_str()); } sent_metadata->Clear(); sent_metadata->Set(GrpcStatusMetadata(), @@ -152,9 +151,9 @@ ServerMetadataHandle BatchBuilder::CompleteSendServerTrailingMetadata( if (grpc_call_trace.enabled()) { gpr_log( GPR_DEBUG, - "%s[connected] Tagging trailing metadata with " - "cancellation status from transport: %s", - Activity::current()->DebugTag().c_str(), + "%sTagging trailing metadata with cancellation status from " + "transport: %s", + batch->DebugPrefix().c_str(), actually_sent ? "sent => not-cancelled" : "not-sent => cancelled"); } sent_metadata->Set(GrpcCallWasCancelled(), !actually_sent); diff --git a/src/core/lib/transport/batch_builder.h b/src/core/lib/transport/batch_builder.h index f714af6e7f8..7ef352b7e77 100644 --- a/src/core/lib/transport/batch_builder.h +++ b/src/core/lib/transport/batch_builder.h @@ -19,12 +19,14 @@ #include +#include #include #include #include #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "absl/strings/str_format.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" @@ -202,6 +204,11 @@ class BatchBuilder { ~Batch(); Batch(const Batch&) = delete; Batch& operator=(const Batch&) = delete; + std::string DebugPrefix(Activity* activity = Activity::current()) const { + return absl::StrFormat("%s[connected] [batch %p] ", activity->DebugTag(), + this); + } + void IncrementRefCount() { ++refs; } void Unref() { if (--refs == 0) party->arena()->DeletePooled(this); @@ -222,8 +229,8 @@ class BatchBuilder { if (this->*field != nullptr) return this->*field; this->*field = party->arena()->NewPooled(Ref()); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] Add batch closure for %s @ %s", - Activity::current()->DebugTag().c_str(), + gpr_log(GPR_DEBUG, "%sAdd batch closure for %s @ %s", + DebugPrefix().c_str(), std::string((this->*field)->name()).c_str(), (this->*field)->on_done_closure.DebugString().c_str()); } @@ -273,8 +280,8 @@ class BatchBuilder { // Combine send status and server metadata into a final status to report back // to the containing call. static ServerMetadataHandle CompleteSendServerTrailingMetadata( - ServerMetadataHandle sent_metadata, absl::Status send_result, - bool actually_sent); + Batch* batch, ServerMetadataHandle sent_metadata, + absl::Status send_result, bool actually_sent); grpc_transport_stream_op_batch_payload* const payload_; absl::optional target_; @@ -284,8 +291,7 @@ class BatchBuilder { inline auto BatchBuilder::SendMessage(Target target, MessageHandle message) { auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue send message: %s", - Activity::current()->DebugTag().c_str(), batch, + gpr_log(GPR_DEBUG, "%sQueue send message: %s", batch->DebugPrefix().c_str(), message->DebugString().c_str()); } auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); @@ -301,10 +307,8 @@ inline auto BatchBuilder::SendInitialMetadata( Target target, Arena::PoolPtr md) { auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, - "%s[connected] [batch %p] Queue send initial metadata: %s", - Activity::current()->DebugTag().c_str(), batch, - md->DebugString().c_str()); + gpr_log(GPR_DEBUG, "%sQueue send initial metadata: %s", + batch->DebugPrefix().c_str(), md->DebugString().c_str()); } auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); batch->batch.on_complete = &pc->on_done_closure; @@ -322,8 +326,8 @@ inline auto BatchBuilder::SendClientInitialMetadata( inline auto BatchBuilder::SendClientTrailingMetadata(Target target) { auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue send trailing metadata", - Activity::current()->DebugTag().c_str(), batch); + gpr_log(GPR_DEBUG, "%sQueue send trailing metadata", + batch->DebugPrefix().c_str()); } auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); batch->batch.on_complete = &pc->on_done_closure; @@ -364,20 +368,19 @@ inline auto BatchBuilder::SendServerTrailingMetadata( payload_->send_trailing_metadata.sent = &pc->trailing_metadata_sent; } if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] [batch %p] %s: %s", - Activity::current()->DebugTag().c_str(), batch, + gpr_log(GPR_DEBUG, "%s%s: %s", batch->DebugPrefix().c_str(), convert_to_cancellation ? "Send trailing metadata as cancellation" : "Queue send trailing metadata", metadata->DebugString().c_str()); } batch->batch.on_complete = &pc->on_done_closure; pc->send_trailing_metadata = std::move(metadata); - auto promise = batch->RefUntil( - Map(pc->done_latch.WaitAndCopy(), [pc](absl::Status status) { - return CompleteSendServerTrailingMetadata( - std::move(pc->send_trailing_metadata), std::move(status), - pc->trailing_metadata_sent); - })); + auto promise = Map(pc->done_latch.WaitAndCopy(), + [pc, batch = batch->Ref()](absl::Status status) { + return CompleteSendServerTrailingMetadata( + batch.get(), std::move(pc->send_trailing_metadata), + std::move(status), pc->trailing_metadata_sent); + }); if (convert_to_cancellation) { batch->PerformWith(target); } @@ -387,8 +390,7 @@ inline auto BatchBuilder::SendServerTrailingMetadata( inline auto BatchBuilder::ReceiveMessage(Target target) { auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue receive message", - Activity::current()->DebugTag().c_str(), batch); + gpr_log(GPR_DEBUG, "%sQueue receive message", batch->DebugPrefix().c_str()); } auto* pc = batch->GetInitializedCompletion(&Batch::pending_receive_message); batch->batch.recv_message = true; @@ -415,9 +417,8 @@ inline auto BatchBuilder::ReceiveMessage(Target target) { inline auto BatchBuilder::ReceiveInitialMetadata(Target target) { auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, - "%s[connected] [batch %p] Queue receive initial metadata", - Activity::current()->DebugTag().c_str(), batch); + gpr_log(GPR_DEBUG, "%sQueue receive initial metadata", + batch->DebugPrefix().c_str()); } auto* pc = batch->GetInitializedCompletion(&Batch::pending_receive_initial_metadata); @@ -444,9 +445,8 @@ inline auto BatchBuilder::ReceiveServerInitialMetadata(Target target) { inline auto BatchBuilder::ReceiveTrailingMetadata(Target target) { auto* batch = GetBatch(target); if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, - "%s[connected] [batch %p] Queue receive trailing metadata", - Activity::current()->DebugTag().c_str(), batch); + gpr_log(GPR_DEBUG, "%sQueue receive trailing metadata", + batch->DebugPrefix().c_str()); } auto* pc = batch->GetInitializedCompletion( &Batch::pending_receive_trailing_metadata); diff --git a/test/core/promise/pipe_test.cc b/test/core/promise/pipe_test.cc index be4444f8446..19733df3d84 100644 --- a/test/core/promise/pipe_test.cc +++ b/test/core/promise/pipe_test.cc @@ -305,6 +305,36 @@ TEST_F(PipeTest, CanCloseWithErrorSend) { MakeScopedArena(1024, &memory_allocator_)); } +TEST_F(PipeTest, CanCloseWithErrorRecv) { + StrictMock> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + MakeActivity( + [] { + auto* pipe = GetContext()->ManagedNew>(); + return Seq( + // Concurrently: + // - wait for a received value (will stall forever since we push + // nothing into the queue) + // - close the sender, which will signal the receiver to return an + // end-of-stream. + Join(pipe->receiver.Next(), + [pipe]() mutable { + pipe->receiver.CloseWithError(); + return absl::OkStatus(); + }), + // Verify we received end-of-stream and closed the sender. + [](std::tuple, absl::Status> result) { + EXPECT_FALSE(std::get<0>(result).has_value()); + EXPECT_TRUE(std::get<0>(result).cancelled()); + EXPECT_EQ(std::get<1>(result), absl::OkStatus()); + return absl::OkStatus(); + }); + }, + NoWakeupScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }, + MakeScopedArena(1024, &memory_allocator_)); +} + TEST_F(PipeTest, CanCloseSendWithInterceptor) { StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus()));