[promises] Enable server promise calls in C++ e2e tests (#33097)

#thistimeforsure

a863532c62 adds some debug to help track
which batches get leaked by a transport
3203e75ec5 makes connected_channel respect
the high level intent of cancellation better (and fixes the last reason
we needed to turn these tests off)
aaf5fa036b re-enables testing of c++ e2e
tests with server based promise calls

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/33139/head
Craig Tiller 2 years ago committed by GitHub
parent e7a67ee224
commit 8a8f1eba4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      bazel/experiments.bzl
  2. 15
      src/core/lib/channel/connected_channel.cc
  3. 3
      src/core/lib/experiments/experiments.yaml
  4. 4
      src/core/lib/transport/batch_builder.cc
  5. 54
      src/core/lib/transport/batch_builder.h

@ -29,6 +29,9 @@ EXPERIMENTS = {
"promise_based_client_call",
"promise_based_server_call",
],
"cpp_end2end_test": [
"promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -52,6 +55,9 @@ EXPERIMENTS = {
"memory_pressure_controller",
"unconstrained_max_quota_buffer_size",
],
"xds_end2end_test": [
"promise_based_server_call",
],
},
"on": {
"flow_control_test": [

@ -710,13 +710,16 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
// Promise factory that accepts a ServerMetadataHandle, and sends it as the
// trailing metadata for this call.
auto send_trailing_metadata =
[call_data, stream = stream->InternalRef()](
ServerMetadataHandle server_trailing_metadata) {
return GetContext<BatchBuilder>()->SendServerTrailingMetadata(
stream->batch_target(), std::move(server_trailing_metadata),
auto send_trailing_metadata = [call_data, stream = stream->InternalRef()](
ServerMetadataHandle
server_trailing_metadata) {
bool is_cancellation =
server_trailing_metadata->get(GrpcCallWasCancelled()).value_or(false);
return GetContext<BatchBuilder>()->SendServerTrailingMetadata(
stream->batch_target(), std::move(server_trailing_metadata),
is_cancellation ||
!std::exchange(call_data->sent_initial_metadata, true));
};
};
// Runs the receive message loop, either until all the messages
// are received or the server call is complete.

@ -115,8 +115,7 @@
default: false
expiry: 2023/06/01
owner: ctiller@google.com
test_tags: ["core_end2end_test"]
disabled_test_tags: ["cpp_end2end_test", "xds_end2end_test"]
test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test"]
- name: transport_supplies_client_latency
description: If set, use the transport represented value for client latency in opencensus
default: false

@ -71,6 +71,10 @@ BatchBuilder::Batch::Batch(grpc_transport_stream_op_batch_payload* payload,
BatchBuilder::Batch::~Batch() {
auto* arena = party->arena();
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Destroy",
Activity::current()->DebugTag().c_str(), this);
}
if (pending_receive_message != nullptr) {
arena->DeletePooled(pending_receive_message);
}

@ -282,12 +282,12 @@ class BatchBuilder {
};
inline auto BatchBuilder::SendMessage(Target target, MessageHandle message) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] Queue send message: %s",
Activity::current()->DebugTag().c_str(),
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue send message: %s",
Activity::current()->DebugTag().c_str(), batch,
message->DebugString().c_str());
}
auto* batch = GetBatch(target);
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends);
batch->batch.on_complete = &pc->on_done_closure;
batch->batch.send_message = true;
@ -299,11 +299,13 @@ inline auto BatchBuilder::SendMessage(Target target, MessageHandle message) {
inline auto BatchBuilder::SendInitialMetadata(
Target target, Arena::PoolPtr<grpc_metadata_batch> md) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] Queue send initial metadata: %s",
Activity::current()->DebugTag().c_str(), md->DebugString().c_str());
gpr_log(GPR_DEBUG,
"%s[connected] [batch %p] Queue send initial metadata: %s",
Activity::current()->DebugTag().c_str(), batch,
md->DebugString().c_str());
}
auto* batch = GetBatch(target);
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends);
batch->batch.on_complete = &pc->on_done_closure;
batch->batch.send_initial_metadata = true;
@ -318,11 +320,11 @@ inline auto BatchBuilder::SendClientInitialMetadata(
}
inline auto BatchBuilder::SendClientTrailingMetadata(Target target) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] Queue send trailing metadata",
Activity::current()->DebugTag().c_str());
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue send trailing metadata",
Activity::current()->DebugTag().c_str(), batch);
}
auto* batch = GetBatch(target);
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends);
batch->batch.on_complete = &pc->on_done_closure;
batch->batch.send_trailing_metadata = true;
@ -342,13 +344,6 @@ inline auto BatchBuilder::SendServerInitialMetadata(
inline auto BatchBuilder::SendServerTrailingMetadata(
Target target, ServerMetadataHandle metadata,
bool convert_to_cancellation) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] %s: %s",
Activity::current()->DebugTag().c_str(),
convert_to_cancellation ? "Send trailing metadata as cancellation"
: "Queue send trailing metadata",
metadata->DebugString().c_str());
}
Batch* batch;
PendingSends* pc;
if (convert_to_cancellation) {
@ -368,6 +363,13 @@ inline auto BatchBuilder::SendServerTrailingMetadata(
payload_->send_trailing_metadata.send_trailing_metadata = metadata.get();
payload_->send_trailing_metadata.sent = &pc->trailing_metadata_sent;
}
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] %s: %s",
Activity::current()->DebugTag().c_str(), batch,
convert_to_cancellation ? "Send trailing metadata as cancellation"
: "Queue send trailing metadata",
metadata->DebugString().c_str());
}
batch->batch.on_complete = &pc->on_done_closure;
pc->send_trailing_metadata = std::move(metadata);
auto promise = batch->RefUntil(
@ -383,11 +385,11 @@ inline auto BatchBuilder::SendServerTrailingMetadata(
}
inline auto BatchBuilder::ReceiveMessage(Target target) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] Queue receive message",
Activity::current()->DebugTag().c_str());
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Queue receive message",
Activity::current()->DebugTag().c_str(), batch);
}
auto* batch = GetBatch(target);
auto* pc = batch->GetInitializedCompletion(&Batch::pending_receive_message);
batch->batch.recv_message = true;
payload_->recv_message.recv_message_ready = &pc->on_done_closure;
@ -411,11 +413,12 @@ inline auto BatchBuilder::ReceiveMessage(Target target) {
}
inline auto BatchBuilder::ReceiveInitialMetadata(Target target) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] Queue receive initial metadata",
Activity::current()->DebugTag().c_str());
gpr_log(GPR_DEBUG,
"%s[connected] [batch %p] Queue receive initial metadata",
Activity::current()->DebugTag().c_str(), batch);
}
auto* batch = GetBatch(target);
auto* pc =
batch->GetInitializedCompletion(&Batch::pending_receive_initial_metadata);
batch->batch.recv_initial_metadata = true;
@ -439,11 +442,12 @@ inline auto BatchBuilder::ReceiveServerInitialMetadata(Target target) {
}
inline auto BatchBuilder::ReceiveTrailingMetadata(Target target) {
auto* batch = GetBatch(target);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] Queue receive trailing metadata",
Activity::current()->DebugTag().c_str());
gpr_log(GPR_DEBUG,
"%s[connected] [batch %p] Queue receive trailing metadata",
Activity::current()->DebugTag().c_str(), batch);
}
auto* batch = GetBatch(target);
auto* pc = batch->GetInitializedCompletion(
&Batch::pending_receive_trailing_metadata);
batch->batch.recv_trailing_metadata = true;

Loading…
Cancel
Save