[promises] Improve logging, fix a rare bug (#33139)

Rare bug: server initial metadata gets stranded in the outbound pipe.
(fix is a little unpleasant, but we'll do better at the five pipes
stage)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
revert-32956-client-channel-resolver-fuzzer
Craig Tiller 2 years ago committed by GitHub
parent 030ecf60ec
commit 74ec5d1684
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      src/core/lib/channel/connected_channel.cc
  2. 6
      src/core/lib/promise/party.h
  3. 11
      src/core/lib/promise/pipe.h
  4. 23
      src/core/lib/transport/batch_builder.cc
  5. 56
      src/core/lib/transport/batch_builder.h
  6. 30
      test/core/promise/pipe_test.cc

@ -824,11 +824,19 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
auto cleanup_polling_entity_latch =
std::unique_ptr<Latch<grpc_polling_entity>, CleanupPollingEntityLatch>(
&call_data->polling_entity_latch);
struct CleanupSendInitialMetadata {
void operator()(CallData* call_data) {
call_data->server_initial_metadata.receiver.CloseWithError();
}
};
auto cleanup_send_initial_metadata =
std::unique_ptr<CallData, CleanupSendInitialMetadata>(call_data);
return Map(
Seq(std::move(recv_initial_metadata_then_run_promise),
std::move(send_trailing_metadata)),
[cleanup_polling_entity_latch = std::move(cleanup_polling_entity_latch),
cleanup_send_initial_metadata = std::move(cleanup_send_initial_metadata),
stream = std::move(stream)](ServerMetadataHandle md) {
stream->set_finished();
return md;

@ -29,6 +29,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -37,6 +38,7 @@
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/trace.h"
#include "src/core/lib/resource_quota/arena.h"
// Two implementations of party synchronization are provided: one using a single
@ -482,6 +484,10 @@ class Party : public Activity, private Wakeable {
template <typename Factory, typename OnComplete>
void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory,
OnComplete on_complete) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s[bulk_spawn] On %p queue %s",
party_->DebugTag().c_str(), this, std::string(name).c_str());
}
participants_[num_participants_++] =
party_->arena_->NewPooled<ParticipantImpl<Factory, OnComplete>>(
name, std::move(promise_factory), std::move(on_complete));

@ -377,8 +377,8 @@ class Center : public InterceptorList<T> {
std::string DebugTag() {
if (auto* activity = Activity::current()) {
return absl::StrCat(activity->DebugTag(), " PIPE[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this),
"]: ");
} else {
return absl::StrCat("PIPE[0x", reinterpret_cast<uintptr_t>(this), "]: ");
}
@ -610,6 +610,13 @@ class PipeReceiver {
return [center = center_]() { return center->PollEmpty(); };
}
void CloseWithError() {
if (center_ != nullptr) {
center_->MarkCancelled();
center_.reset();
}
}
// Interject PromiseFactory f into the pipeline.
// f will be called with the current value traversing the pipe, and should
// return a value to replace it with.

@ -36,8 +36,8 @@ void BatchBuilder::PendingCompletion::CompletionCallback(
auto* party = pc->batch->party.get();
if (grpc_call_trace.enabled()) {
gpr_log(
GPR_DEBUG, "%s[connected] Finish batch-component %s for %s: status=%s",
party->DebugTag().c_str(), std::string(pc->name()).c_str(),
GPR_DEBUG, "%sFinish batch-component %s for %s: status=%s",
pc->batch->DebugPrefix(party).c_str(), std::string(pc->name()).c_str(),
grpc_transport_stream_op_batch_string(&pc->batch->batch, false).c_str(),
error.ToString().c_str());
}
@ -118,8 +118,8 @@ void BatchBuilder::FlushBatch() {
GPR_ASSERT(target_.has_value());
if (grpc_call_trace.enabled()) {
gpr_log(
GPR_DEBUG, "%s[connected] Perform transport stream op batch: %p %s",
batch_->party->DebugTag().c_str(), &batch_->batch,
GPR_DEBUG, "%sPerform transport stream op batch: %p %s",
batch_->DebugPrefix().c_str(), &batch_->batch,
grpc_transport_stream_op_batch_string(&batch_->batch, false).c_str());
}
std::exchange(batch_, nullptr)->PerformWith(*target_);
@ -131,15 +131,14 @@ void BatchBuilder::Batch::PerformWith(Target target) {
}
ServerMetadataHandle BatchBuilder::CompleteSendServerTrailingMetadata(
ServerMetadataHandle sent_metadata, absl::Status send_result,
Batch* batch, ServerMetadataHandle sent_metadata, absl::Status send_result,
bool actually_sent) {
if (!send_result.ok()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG,
"%s[connected] Send metadata failed with error: %s, "
"fabricating trailing metadata",
Activity::current()->DebugTag().c_str(),
send_result.ToString().c_str());
"%sSend metadata failed with error: %s, fabricating trailing "
"metadata",
batch->DebugPrefix().c_str(), send_result.ToString().c_str());
}
sent_metadata->Clear();
sent_metadata->Set(GrpcStatusMetadata(),
@ -152,9 +151,9 @@ ServerMetadataHandle BatchBuilder::CompleteSendServerTrailingMetadata(
if (grpc_call_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"%s[connected] Tagging trailing metadata with "
"cancellation status from transport: %s",
Activity::current()->DebugTag().c_str(),
"%sTagging trailing metadata with cancellation status from "
"transport: %s",
batch->DebugPrefix().c_str(),
actually_sent ? "sent => not-cancelled" : "not-sent => cancelled");
}
sent_metadata->Set(GrpcCallWasCancelled(), !actually_sent);

@ -19,12 +19,14 @@
#include <stdint.h>
#include <initializer_list>
#include <memory>
#include <string>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@ -202,6 +204,11 @@ class BatchBuilder {
~Batch();
Batch(const Batch&) = delete;
Batch& operator=(const Batch&) = delete;
std::string DebugPrefix(Activity* activity = Activity::current()) const {
return absl::StrFormat("%s[connected] [batch %p] ", activity->DebugTag(),
this);
}
void IncrementRefCount() { ++refs; }
void Unref() {
if (--refs == 0) party->arena()->DeletePooled(this);
@ -222,8 +229,8 @@ class BatchBuilder {
if (this->*field != nullptr) return this->*field;
this->*field = party->arena()->NewPooled<T>(Ref());
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] Add batch closure for %s @ %s",
Activity::current()->DebugTag().c_str(),
gpr_log(GPR_DEBUG, "%sAdd batch closure for %s @ %s",
DebugPrefix().c_str(),
std::string((this->*field)->name()).c_str(),
(this->*field)->on_done_closure.DebugString().c_str());
}
@ -273,8 +280,8 @@ class BatchBuilder {
// Combine send status and server metadata into a final status to report back
// to the containing call.
static ServerMetadataHandle CompleteSendServerTrailingMetadata(
ServerMetadataHandle sent_metadata, absl::Status send_result,
bool actually_sent);
Batch* batch, ServerMetadataHandle sent_metadata,
absl::Status send_result, bool actually_sent);
grpc_transport_stream_op_batch_payload* const payload_;
absl::optional<Target> target_;
@ -284,8 +291,7 @@ class BatchBuilder {
inline auto BatchBuilder::SendMessage(Target target, MessageHandle message) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue send message: %s",
Activity::current()->DebugTag().c_str(), batch,
gpr_log(GPR_DEBUG, "%sQueue send message: %s", batch->DebugPrefix().c_str(),
message->DebugString().c_str());
}
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends);
@ -301,10 +307,8 @@ inline auto BatchBuilder::SendInitialMetadata(
Target target, Arena::PoolPtr<grpc_metadata_batch> md) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG,
"%s[connected] [batch %p] Queue send initial metadata: %s",
Activity::current()->DebugTag().c_str(), batch,
md->DebugString().c_str());
gpr_log(GPR_DEBUG, "%sQueue send initial metadata: %s",
batch->DebugPrefix().c_str(), md->DebugString().c_str());
}
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends);
batch->batch.on_complete = &pc->on_done_closure;
@ -322,8 +326,8 @@ inline auto BatchBuilder::SendClientInitialMetadata(
inline auto BatchBuilder::SendClientTrailingMetadata(Target target) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue send trailing metadata",
Activity::current()->DebugTag().c_str(), batch);
gpr_log(GPR_DEBUG, "%sQueue send trailing metadata",
batch->DebugPrefix().c_str());
}
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends);
batch->batch.on_complete = &pc->on_done_closure;
@ -364,20 +368,19 @@ inline auto BatchBuilder::SendServerTrailingMetadata(
payload_->send_trailing_metadata.sent = &pc->trailing_metadata_sent;
}
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] %s: %s",
Activity::current()->DebugTag().c_str(), batch,
gpr_log(GPR_DEBUG, "%s%s: %s", batch->DebugPrefix().c_str(),
convert_to_cancellation ? "Send trailing metadata as cancellation"
: "Queue send trailing metadata",
metadata->DebugString().c_str());
}
batch->batch.on_complete = &pc->on_done_closure;
pc->send_trailing_metadata = std::move(metadata);
auto promise = batch->RefUntil(
Map(pc->done_latch.WaitAndCopy(), [pc](absl::Status status) {
return CompleteSendServerTrailingMetadata(
std::move(pc->send_trailing_metadata), std::move(status),
pc->trailing_metadata_sent);
}));
auto promise = Map(pc->done_latch.WaitAndCopy(),
[pc, batch = batch->Ref()](absl::Status status) {
return CompleteSendServerTrailingMetadata(
batch.get(), std::move(pc->send_trailing_metadata),
std::move(status), pc->trailing_metadata_sent);
});
if (convert_to_cancellation) {
batch->PerformWith(target);
}
@ -387,8 +390,7 @@ inline auto BatchBuilder::SendServerTrailingMetadata(
inline auto BatchBuilder::ReceiveMessage(Target target) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue receive message",
Activity::current()->DebugTag().c_str(), batch);
gpr_log(GPR_DEBUG, "%sQueue receive message", batch->DebugPrefix().c_str());
}
auto* pc = batch->GetInitializedCompletion(&Batch::pending_receive_message);
batch->batch.recv_message = true;
@ -415,9 +417,8 @@ inline auto BatchBuilder::ReceiveMessage(Target target) {
inline auto BatchBuilder::ReceiveInitialMetadata(Target target) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG,
"%s[connected] [batch %p] Queue receive initial metadata",
Activity::current()->DebugTag().c_str(), batch);
gpr_log(GPR_DEBUG, "%sQueue receive initial metadata",
batch->DebugPrefix().c_str());
}
auto* pc =
batch->GetInitializedCompletion(&Batch::pending_receive_initial_metadata);
@ -444,9 +445,8 @@ inline auto BatchBuilder::ReceiveServerInitialMetadata(Target target) {
inline auto BatchBuilder::ReceiveTrailingMetadata(Target target) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG,
"%s[connected] [batch %p] Queue receive trailing metadata",
Activity::current()->DebugTag().c_str(), batch);
gpr_log(GPR_DEBUG, "%sQueue receive trailing metadata",
batch->DebugPrefix().c_str());
}
auto* pc = batch->GetInitializedCompletion(
&Batch::pending_receive_trailing_metadata);

@ -305,6 +305,36 @@ TEST_F(PipeTest, CanCloseWithErrorSend) {
MakeScopedArena(1024, &memory_allocator_));
}
TEST_F(PipeTest, CanCloseWithErrorRecv) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[] {
auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
return Seq(
// Concurrently:
// - wait for a received value (will stall forever since we push
// nothing into the queue)
// - close the sender, which will signal the receiver to return an
// end-of-stream.
Join(pipe->receiver.Next(),
[pipe]() mutable {
pipe->receiver.CloseWithError();
return absl::OkStatus();
}),
// Verify we received end-of-stream and closed the sender.
[](std::tuple<NextResult<int>, absl::Status> result) {
EXPECT_FALSE(std::get<0>(result).has_value());
EXPECT_TRUE(std::get<0>(result).cancelled());
EXPECT_EQ(std::get<1>(result), absl::OkStatus());
return absl::OkStatus();
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, &memory_allocator_));
}
TEST_F(PipeTest, CanCloseSendWithInterceptor) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));

Loading…
Cancel
Save