diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 0e5f00b71eb..e366840a1e6 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -25,6 +25,7 @@ EXPERIMENT_ENABLES = { "event_engine_dns": "event_engine_dns", "event_engine_listener": "event_engine_listener", "free_large_allocator": "free_large_allocator", + "http2_stats_fix": "http2_stats_fix", "keepalive_fix": "keepalive_fix", "keepalive_server_fix": "keepalive_server_fix", "monitoring_experiment": "monitoring_experiment", diff --git a/examples/cpp/csm/BUILD b/examples/cpp/csm/BUILD index 836abbab310..7f1bf30d836 100644 --- a/examples/cpp/csm/BUILD +++ b/examples/cpp/csm/BUILD @@ -41,6 +41,7 @@ cc_binary( "//examples/protos:helloworld_cc_grpc", "@com_google_absl//absl/flags:flag", "@com_google_absl//absl/flags:parse", + "@com_google_absl//absl/log", "@io_opentelemetry_cpp//exporters/prometheus:prometheus_exporter", "@io_opentelemetry_cpp//sdk/src/metrics", ], diff --git a/examples/cpp/csm/csm_greeter_server.cc b/examples/cpp/csm/csm_greeter_server.cc index c5d7857e7fa..910e3f65749 100644 --- a/examples/cpp/csm/csm_greeter_server.cc +++ b/examples/cpp/csm/csm_greeter_server.cc @@ -22,6 +22,7 @@ #include "absl/flags/flag.h" #include "absl/flags/parse.h" +#include "absl/log/log.h" #include "absl/strings/str_cat.h" #include "opentelemetry/exporters/prometheus/exporter_factory.h" #include "opentelemetry/exporters/prometheus/exporter_options.h" @@ -91,7 +92,7 @@ void RunServer(const char* hostname) { xds_builder.AddListeningPort(absl::StrCat("0.0.0.0:", port), grpc::InsecureServerCredentials()); xds_enabled_server = xds_builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server starting on 0.0.0.0:%d", port); + LOG(INFO) << "Server starting on 0.0.0.0:" << port; // Wait for the server to shutdown. Note that some other thread must be // responsible for shutting down the server for this call to ever return. diff --git a/examples/cpp/helloworld/BUILD b/examples/cpp/helloworld/BUILD index 3c01b6ce1c3..1bcee389efb 100644 --- a/examples/cpp/helloworld/BUILD +++ b/examples/cpp/helloworld/BUILD @@ -129,5 +129,6 @@ cc_binary( "//examples/protos:helloworld_cc_grpc", "@com_google_absl//absl/flags:flag", "@com_google_absl//absl/flags:parse", + "@com_google_absl//absl/log", ], ) diff --git a/examples/cpp/helloworld/CMakeLists.txt b/examples/cpp/helloworld/CMakeLists.txt index 73ad4877483..defe3064b14 100644 --- a/examples/cpp/helloworld/CMakeLists.txt +++ b/examples/cpp/helloworld/CMakeLists.txt @@ -68,6 +68,7 @@ foreach(_target absl::check absl::flags absl::flags_parse + absl::log ${_REFLECTION} ${_GRPC_GRPCPP} ${_PROTOBUF_LIBPROTOBUF}) diff --git a/examples/cpp/helloworld/xds_greeter_server.cc b/examples/cpp/helloworld/xds_greeter_server.cc index 9e4ca2a0549..c0bed68ff7a 100644 --- a/examples/cpp/helloworld/xds_greeter_server.cc +++ b/examples/cpp/helloworld/xds_greeter_server.cc @@ -22,6 +22,7 @@ #include "absl/flags/flag.h" #include "absl/flags/parse.h" +#include "absl/log/log.h" #include "absl/strings/str_cat.h" #include @@ -79,21 +80,20 @@ void RunServer() { absl::StrCat("0.0.0.0:", port), grpc::XdsServerCredentials(grpc::InsecureServerCredentials())); xds_enabled_server = xds_builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server starting on 0.0.0.0:%d", port); + LOG(INFO) << "Server starting on 0.0.0.0:" << port; grpc::AddAdminServices(&builder); // For the maintenance server, do not use any authentication mechanism. builder.AddListeningPort(absl::StrCat("0.0.0.0:", maintenance_port), grpc::InsecureServerCredentials()); server = builder.BuildAndStart(); - gpr_log(GPR_INFO, "Maintenance server listening on 0.0.0.0:%d", - maintenance_port); + LOG(INFO) << "Maintenance server listening on 0.0.0.0:" << maintenance_port; } else { grpc::AddAdminServices(&xds_builder); // Listen on the given address without any authentication mechanism. builder.AddListeningPort(absl::StrCat("0.0.0.0:", port), grpc::InsecureServerCredentials()); server = xds_builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server listening on 0.0.0.0:%d", port); + LOG(INFO) << "Server listening on 0.0.0.0:" << port; } // Wait for the server to shutdown. Note that some other thread must be diff --git a/examples/cpp/xds/BUILD b/examples/cpp/xds/BUILD index 6657b1cbbf6..bf5abfd1807 100644 --- a/examples/cpp/xds/BUILD +++ b/examples/cpp/xds/BUILD @@ -37,5 +37,6 @@ cc_binary( "//examples/protos:helloworld_cc_grpc", "@com_google_absl//absl/flags:flag", "@com_google_absl//absl/flags:parse", + "@com_google_absl//absl/log", ], ) diff --git a/examples/cpp/xds/xds_greeter_server.cc b/examples/cpp/xds/xds_greeter_server.cc index bc4fe66c0f5..c35166c819e 100644 --- a/examples/cpp/xds/xds_greeter_server.cc +++ b/examples/cpp/xds/xds_greeter_server.cc @@ -22,6 +22,7 @@ #include "absl/flags/flag.h" #include "absl/flags/parse.h" +#include "absl/log/log.h" #include "absl/strings/str_cat.h" #include @@ -84,21 +85,20 @@ void RunServer() { absl::StrCat("0.0.0.0:", port), grpc::XdsServerCredentials(grpc::InsecureServerCredentials())); xds_enabled_server = xds_builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server starting on 0.0.0.0:%d", port); + LOG(INFO) << "Server starting on 0.0.0.0:" << port; grpc::AddAdminServices(&builder); // For the maintenance server, do not use any authentication mechanism. builder.AddListeningPort(absl::StrCat("0.0.0.0:", maintenance_port), grpc::InsecureServerCredentials()); server = builder.BuildAndStart(); - gpr_log(GPR_INFO, "Maintenance server listening on 0.0.0.0:%d", - maintenance_port); + LOG(INFO) << "Maintenance server listening on 0.0.0.0:" << maintenance_port; } else { grpc::AddAdminServices(&xds_builder); // Listen on the given address without any authentication mechanism. builder.AddListeningPort(absl::StrCat("0.0.0.0:", port), grpc::InsecureServerCredentials()); server = xds_builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server listening on 0.0.0.0:%d", port); + LOG(INFO) << "Server listening on 0.0.0.0:" << port; } // Wait for the server to shutdown. Note that some other thread must be diff --git a/src/core/BUILD b/src/core/BUILD index 437735b143f..3ecb922f38b 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6451,7 +6451,6 @@ grpc_cc_library( "//:endpoint_addresses", "//:gpr", "//:grpc_base", - "//:grpc_client_channel", "//:grpc_public_hdrs", "//:grpc_resolver", "//:grpc_service_config_impl", @@ -7341,7 +7340,6 @@ grpc_cc_library( ], deps = [ "1999", - "call_final_info", "for_each", "if", "latch", @@ -7353,6 +7351,7 @@ grpc_cc_library( "status_flag", "try_seq", "//:gpr", + "//:promise", ], ) diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index 588827b9584..a2dc24594e2 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -370,8 +370,9 @@ const grpc_channel_filter grpc_server_deadline_filter = { return next_promise_factory(std::move(call_args)); }, [](grpc_channel_element*, grpc_core::CallSpineInterface* spine) { - spine->client_initial_metadata().receiver.InterceptAndMap( - [](grpc_core::ClientMetadataHandle md) { + grpc_core::DownCast(spine) + ->client_initial_metadata() + .receiver.InterceptAndMap([](grpc_core::ClientMetadataHandle md) { auto deadline = md->get(grpc_core::GrpcTimeoutMetadata()); if (deadline.has_value()) { grpc_core::GetContext()->UpdateDeadline( diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index 010a8608a5a..379d4944788 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -168,7 +168,7 @@ ServerMetadataHandle CheckPayload(const Message& msg, is_send ? "send" : "recv", msg.payload()->Length(), *max_length); } if (msg.payload()->Length() <= *max_length) return nullptr; - auto r = GetContext()->MakePooled(); + auto r = Arena::MakePooled(); r->Set(GrpcStatusMetadata(), GRPC_STATUS_RESOURCE_EXHAUSTED); r->Set(GrpcMessageMetadata(), Slice::FromCopiedString(absl::StrFormat( diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 14be491f0ee..6b3160e8182 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -102,14 +102,12 @@ auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame, }, []() -> StatusFlag { return Success{}; }); }, - [call_handler, trailers = std::move(frame.trailers)]() mutable { - return If( - trailers != nullptr, - [&call_handler, &trailers]() mutable { - return call_handler.PushServerTrailingMetadata( - std::move(trailers)); - }, - []() -> StatusFlag { return Success{}; }); + [call_handler, + trailers = std::move(frame.trailers)]() mutable -> StatusFlag { + if (trailers != nullptr) { + call_handler.PushServerTrailingMetadata(std::move(trailers)); + } + return Success{}; }); // Wrap the actual sequence with something that owns the call handler so that // its lifetime extends until the push completes. @@ -223,7 +221,7 @@ void ChaoticGoodClientTransport::AbortWithError() { for (const auto& pair : stream_map) { auto call_handler = pair.second; call_handler.SpawnInfallible("cancel", [call_handler]() mutable { - call_handler.Cancel(ServerMetadataFromStatus( + call_handler.PushServerTrailingMetadata(ServerMetadataFromStatus( absl::UnavailableError("Transport closed."))); return Empty{}; }); @@ -300,6 +298,10 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) { const uint32_t stream_id = MakeStream(call_handler); return Map(CallOutboundLoop(stream_id, call_handler), [stream_id, this](absl::Status result) { + if (grpc_chaotic_good_trace.enabled()) { + gpr_log(GPR_INFO, "CHAOTIC_GOOD: Call %d finished with %s", + stream_id, result.ToString().c_str()); + } if (!result.ok()) { CancelFrame frame; frame.stream_id = stream_id; diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index 05edb2d0eb3..7975975203c 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -72,33 +72,29 @@ auto ChaoticGoodServerTransport::TransportWriteLoop( auto ChaoticGoodServerTransport::PushFragmentIntoCall( CallInitiator call_initiator, ClientFragmentFrame frame, uint32_t stream_id) { - auto& headers = frame.headers; - return TrySeq( - If( - headers != nullptr, - [call_initiator, &headers]() mutable { - return call_initiator.PushClientInitialMetadata(std::move(headers)); - }, - []() -> StatusFlag { return Success{}; }), - [call_initiator, message = std::move(frame.message)]() mutable { - return If( - message.has_value(), - [&call_initiator, &message]() mutable { - return call_initiator.PushMessage(std::move(message->message)); - }, - []() -> StatusFlag { return Success{}; }); - }, - [this, call_initiator, end_of_stream = frame.end_of_stream, - stream_id]() mutable -> StatusFlag { - if (end_of_stream) { - call_initiator.FinishSends(); - // We have received end_of_stream. It is now safe to remove the call - // from the stream map. - MutexLock lock(&mu_); - stream_map_.erase(stream_id); - } - return Success{}; - }); + GPR_DEBUG_ASSERT(frame.headers == nullptr); + if (grpc_chaotic_good_trace.enabled()) { + gpr_log(GPR_INFO, "CHAOTIC_GOOD: PushFragmentIntoCall: frame=%s", + frame.ToString().c_str()); + } + return TrySeq(If( + frame.message.has_value(), + [&call_initiator, &frame]() mutable { + return call_initiator.PushMessage( + std::move(frame.message->message)); + }, + []() -> StatusFlag { return Success{}; }), + [this, call_initiator, end_of_stream = frame.end_of_stream, + stream_id]() mutable -> StatusFlag { + if (end_of_stream) { + call_initiator.FinishSends(); + // We have received end_of_stream. It is now safe to remove + // the call from the stream map. + MutexLock lock(&mu_); + stream_map_.erase(stream_id); + } + return Success{}; + }); } auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall( @@ -244,8 +240,8 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall( FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1}); absl::optional call_initiator; if (status.ok()) { - auto create_call_result = - acceptor_->CreateCall(*fragment_frame.headers, arena.release()); + auto create_call_result = acceptor_->CreateCall( + std::move(fragment_frame.headers), arena.release()); if (grpc_chaotic_good_trace.enabled()) { gpr_log(GPR_INFO, "CHAOTIC_GOOD: DeserializeAndPushFragmentToNewCall: " diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 6be0152cae6..a1e490e44b6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1471,9 +1471,11 @@ static void perform_stream_op_locked(void* stream_op, frame_hdr[3] = static_cast(len >> 8); frame_hdr[4] = static_cast(len); - s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES; - s->stats.outgoing.data_bytes += - op_payload->send_message.send_message->Length(); + if (grpc_core::IsHttp2StatsFixEnabled()) { + s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES; + s->stats.outgoing.data_bytes += + op_payload->send_message.send_message->Length(); + } s->next_message_end_offset = s->flow_controlled_bytes_written + static_cast(s->flow_controlled_buffer.length) + diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index 334eff52f22..e10dea34134 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -78,6 +78,9 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf, grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf); stats->framing_bytes += header_size; + if (!grpc_core::IsHttp2StatsFixEnabled()) { + stats->data_bytes += write_bytes; + } } grpc_core::Poll grpc_deframe_unprocessed_incoming_frames( diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 7be425993e1..d49dce8daf6 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -83,7 +83,7 @@ class InprocServerTransport final : public RefCounted, "inproc transport disconnected"); } - absl::StatusOr AcceptCall(ClientMetadata& md) { + absl::StatusOr AcceptCall(ClientMetadataHandle md) { switch (state_.load(std::memory_order_acquire)) { case ConnectionState::kInitial: return absl::InternalError( @@ -93,7 +93,7 @@ class InprocServerTransport final : public RefCounted, case ConnectionState::kReady: break; } - return acceptor_->CreateCall(md, acceptor_->CreateArena()); + return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena()); } private: @@ -116,10 +116,10 @@ class InprocClientTransport final : public Transport, public ClientTransport { TrySeq(call_handler.PullClientInitialMetadata(), [server_transport = server_transport_, call_handler](ClientMetadataHandle md) { - auto call_initiator = server_transport->AcceptCall(*md); + auto call_initiator = + server_transport->AcceptCall(std::move(md)); if (!call_initiator.ok()) return call_initiator.status(); - ForwardCall(call_handler, std::move(*call_initiator), - std::move(md)); + ForwardCall(call_handler, std::move(*call_initiator)); return absl::OkStatus(); })); } diff --git a/src/core/lib/channel/channel_stack_builder_impl.cc b/src/core/lib/channel/channel_stack_builder_impl.cc index 6fd0b9f6484..62c527ffe61 100644 --- a/src/core/lib/channel/channel_stack_builder_impl.cc +++ b/src/core/lib/channel/channel_stack_builder_impl.cc @@ -95,43 +95,37 @@ const grpc_channel_filter* PromiseTracingFilterFor( }, /* init_call: */ [](grpc_channel_element* elem, CallSpineInterface* call) { + auto* c = DownCast(call); auto* source_filter = static_cast(elem->filter)->filter; - call->client_initial_metadata().receiver.InterceptAndMap( + c->client_initial_metadata().receiver.InterceptAndMap( [source_filter](ClientMetadataHandle md) { gpr_log(GPR_DEBUG, "%s[%s] OnClientInitialMetadata: %s", GetContext()->DebugTag().c_str(), source_filter->name, md->DebugString().c_str()); return md; }); - call->client_to_server_messages().receiver.InterceptAndMap( + c->client_to_server_messages().receiver.InterceptAndMap( [source_filter](MessageHandle msg) { gpr_log(GPR_DEBUG, "%s[%s] OnClientToServerMessage: %s", GetContext()->DebugTag().c_str(), source_filter->name, msg->DebugString().c_str()); return msg; }); - call->server_initial_metadata().sender.InterceptAndMap( + c->server_initial_metadata().sender.InterceptAndMap( [source_filter](ServerMetadataHandle md) { gpr_log(GPR_DEBUG, "%s[%s] OnServerInitialMetadata: %s", GetContext()->DebugTag().c_str(), source_filter->name, md->DebugString().c_str()); return md; }); - call->server_to_client_messages().sender.InterceptAndMap( + c->server_to_client_messages().sender.InterceptAndMap( [source_filter](MessageHandle msg) { gpr_log(GPR_DEBUG, "%s[%s] OnServerToClientMessage: %s", GetContext()->DebugTag().c_str(), source_filter->name, msg->DebugString().c_str()); return msg; }); - call->server_trailing_metadata().sender.InterceptAndMap( - [source_filter](ServerMetadataHandle md) { - gpr_log(GPR_DEBUG, "%s[%s] OnServerTrailingMetadata: %s", - GetContext()->DebugTag().c_str(), - source_filter->name, md->DebugString().c_str()); - return md; - }); }, grpc_channel_next_op, /* sizeof_call_data: */ 0, diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index f54e64c1aa7..6cbf9df086e 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -463,8 +463,7 @@ ArenaPromise MakeClientCallPromise(Transport* transport, [](absl::Status) {}); // Start a promise to receive server initial metadata and then forward it up // through the receiving pipe. - auto server_initial_metadata = - GetContext()->MakePooled(); + auto server_initial_metadata = Arena::MakePooled(); party->Spawn( "recv_initial_metadata", TrySeq(GetContext()->ReceiveServerInitialMetadata( @@ -501,27 +500,25 @@ ArenaPromise MakeClientCallPromise(Transport* transport, // Create a promise that will receive server trailing metadata. // If this fails, we massage the error into metadata that we can report // upwards. - auto server_trailing_metadata = - GetContext()->MakePooled(); - auto recv_trailing_metadata = - Map(GetContext()->ReceiveServerTrailingMetadata( - stream->batch_target()), - [](absl::StatusOr status) mutable { - if (!status.ok()) { - auto server_trailing_metadata = - GetContext()->MakePooled(); - grpc_status_code status_code = GRPC_STATUS_UNKNOWN; - std::string message; - grpc_error_get_status(status.status(), Timestamp::InfFuture(), - &status_code, &message, nullptr, nullptr); - server_trailing_metadata->Set(GrpcStatusMetadata(), status_code); - server_trailing_metadata->Set(GrpcMessageMetadata(), - Slice::FromCopiedString(message)); - return server_trailing_metadata; - } else { - return std::move(*status); - } - }); + auto server_trailing_metadata = Arena::MakePooled(); + auto recv_trailing_metadata = Map( + GetContext()->ReceiveServerTrailingMetadata( + stream->batch_target()), + [](absl::StatusOr status) mutable { + if (!status.ok()) { + auto server_trailing_metadata = Arena::MakePooled(); + grpc_status_code status_code = GRPC_STATUS_UNKNOWN; + std::string message; + grpc_error_get_status(status.status(), Timestamp::InfFuture(), + &status_code, &message, nullptr, nullptr); + server_trailing_metadata->Set(GrpcStatusMetadata(), status_code); + server_trailing_metadata->Set(GrpcMessageMetadata(), + Slice::FromCopiedString(message)); + return server_trailing_metadata; + } else { + return std::move(*status); + } + }); // Finally the main call promise. // Concurrently: send initial metadata and receive messages, until BOTH // complete (or one fails). @@ -784,8 +781,7 @@ ArenaPromise MakeServerCallPromise( if (status.ok()) { trailing_metadata = std::move(*status); } else { - trailing_metadata = - GetContext()->MakePooled(); + trailing_metadata = Arena::MakePooled(); grpc_status_code status_code = GRPC_STATUS_UNKNOWN; std::string message; grpc_error_get_status(status.status(), Timestamp::InfFuture(), @@ -888,18 +884,7 @@ ArenaPromise MakeClientTransportCallPromise( Transport* transport, CallArgs call_args, NextPromiseFactory) { auto spine = GetContext()->MakeCallSpine(std::move(call_args)); transport->client_transport()->StartCall(CallHandler{spine}); - return Map(spine->server_trailing_metadata().receiver.Next(), - [](NextResult r) { - if (r.has_value()) { - auto md = std::move(r.value()); - md->Set(GrpcStatusFromWire(), true); - return md; - } - auto m = GetContext()->MakePooled(); - m->Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED); - m->Set(GrpcCallWasCancelled(), true); - return m; - }); + return spine->PullServerTrailingMetadata(); } const grpc_channel_filter kClientPromiseBasedTransportFilter = diff --git a/src/core/lib/channel/promise_based_filter.cc b/src/core/lib/channel/promise_based_filter.cc index 0b3e3401509..9e47aacae55 100644 --- a/src/core/lib/channel/promise_based_filter.cc +++ b/src/core/lib/channel/promise_based_filter.cc @@ -508,7 +508,7 @@ void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher, case State::kGotBatch: if (allow_push_to_pipe) { state_ = State::kPushedToPipe; - auto message = GetContext()->MakePooled(); + auto message = Arena::MakePooled(); message->payload()->Swap(batch_->payload->send_message.send_message); message->mutable_flags() = batch_->payload->send_message.flags; push_ = interceptor()->Push()->Push(std::move(message)); @@ -839,7 +839,7 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher, } else { state_ = State::kCompletedWhilePushedToPipe; } - auto message = GetContext()->MakePooled(); + auto message = Arena::MakePooled(); message->payload()->Swap(&**intercepted_slice_buffer_); message->mutable_flags() = *intercepted_flags_; push_ = interceptor()->Push()->Push(std::move(message)); diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index 38729dee20f..491cbe9c144 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -534,13 +534,14 @@ inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*, template inline void InterceptClientToServerMessage( ServerMetadataHandle (Derived::Call::*fn)(const Message&), - typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { + typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); call_spine->client_to_server_messages().receiver.InterceptAndMap( [call, call_spine](MessageHandle msg) -> absl::optional { auto return_md = call->OnClientToServerMessage(*msg); if (return_md == nullptr) return std::move(msg); - return call_spine->Cancel(std::move(return_md)); + call_spine->PushServerTrailingMetadata(std::move(return_md)); + return absl::nullopt; }); } @@ -548,14 +549,15 @@ template inline void InterceptClientToServerMessage( ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); call_spine->client_to_server_messages().receiver.InterceptAndMap( [call, call_spine, channel](MessageHandle msg) -> absl::optional { auto return_md = call->OnClientToServerMessage(*msg, channel); if (return_md == nullptr) return std::move(msg); - return call_spine->Cancel(std::move(return_md)); + call_spine->PushServerTrailingMetadata(std::move(return_md)); + return absl::nullopt; }); } @@ -563,7 +565,7 @@ template inline void InterceptClientToServerMessage( MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); call_spine->client_to_server_messages().receiver.InterceptAndMap( [call, channel](MessageHandle msg) { @@ -575,24 +577,26 @@ template inline void InterceptClientToServerMessage( absl::StatusOr (Derived::Call::*fn)(MessageHandle, Derived*), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); call_spine->client_to_server_messages().receiver.InterceptAndMap( [call, call_spine, channel](MessageHandle msg) -> absl::optional { auto r = call->OnClientToServerMessage(std::move(msg), channel); if (r.ok()) return std::move(*r); - return call_spine->Cancel(ServerMetadataFromStatus(r.status())); + call_spine->PushServerTrailingMetadata( + ServerMetadataFromStatus(r.status())); + return absl::nullopt; }); } inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*, - CallSpineInterface*) {} + PipeBasedCallSpine*) {} template inline void InterceptClientInitialMetadata( void (Derived::Call::*fn)(ClientMetadata& md), typename Derived::Call* call, - Derived*, CallSpineInterface* call_spine) { + Derived*, PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); call_spine->client_initial_metadata().receiver.InterceptAndMap( [call](ClientMetadataHandle md) { @@ -605,7 +609,7 @@ template inline void InterceptClientInitialMetadata( void (Derived::Call::*fn)(ClientMetadata& md, Derived* channel), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); call_spine->client_initial_metadata().receiver.InterceptAndMap( [call, channel](ClientMetadataHandle md) { @@ -617,14 +621,15 @@ inline void InterceptClientInitialMetadata( template inline void InterceptClientInitialMetadata( ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md), - typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { + typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); call_spine->client_initial_metadata().receiver.InterceptAndMap( [call_spine, call](ClientMetadataHandle md) -> absl::optional { auto return_md = call->OnClientInitialMetadata(*md); if (return_md == nullptr) return std::move(md); - return call_spine->Cancel(std::move(return_md)); + call_spine->PushServerTrailingMetadata(std::move(return_md)); + return absl::nullopt; }); } @@ -633,28 +638,31 @@ inline void InterceptClientInitialMetadata( ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md, Derived* channel), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); call_spine->client_initial_metadata().receiver.InterceptAndMap( [call_spine, call, channel]( ClientMetadataHandle md) -> absl::optional { auto return_md = call->OnClientInitialMetadata(*md, channel); if (return_md == nullptr) return std::move(md); - return call_spine->Cancel(std::move(return_md)); + call_spine->PushServerTrailingMetadata(std::move(return_md)); + return absl::nullopt; }); } template inline void InterceptClientInitialMetadata( absl::Status (Derived::Call::*fn)(ClientMetadata& md), - typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { + typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); call_spine->client_initial_metadata().receiver.InterceptAndMap( [call_spine, call](ClientMetadataHandle md) -> absl::optional { auto status = call->OnClientInitialMetadata(*md); if (status.ok()) return std::move(md); - return call_spine->Cancel(ServerMetadataFromStatus(status)); + call_spine->PushServerTrailingMetadata( + ServerMetadataFromStatus(status)); + return absl::nullopt; }); } @@ -662,14 +670,16 @@ template inline void InterceptClientInitialMetadata( absl::Status (Derived::Call::*fn)(ClientMetadata& md, Derived* channel), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata); call_spine->client_initial_metadata().receiver.InterceptAndMap( [call_spine, call, channel]( ClientMetadataHandle md) -> absl::optional { auto status = call->OnClientInitialMetadata(*md, channel); if (status.ok()) return std::move(md); - return call_spine->Cancel(ServerMetadataFromStatus(status)); + call_spine->PushServerTrailingMetadata( + ServerMetadataFromStatus(status)); + return absl::nullopt; }); } @@ -681,7 +691,7 @@ absl::void_t( InterceptClientInitialMetadata(Promise (Derived::Call::*promise_factory)( ClientMetadata& md, Derived* channel), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(promise_factory == &Derived::Call::OnClientInitialMetadata); call_spine->client_initial_metadata().receiver.InterceptAndMap( [call, call_spine, channel](ClientMetadataHandle md) { @@ -691,8 +701,9 @@ InterceptClientInitialMetadata(Promise (Derived::Call::*promise_factory)( call_spine](PromiseResult status) mutable -> absl::optional { if (IsStatusOk(status)) return std::move(md); - return call_spine->Cancel( + call_spine->PushServerTrailingMetadata( StatusCast(std::move(status))); + return absl::nullopt; }); }); } @@ -766,7 +777,7 @@ inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, void*, template inline void InterceptServerInitialMetadata( void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call, - Derived*, CallSpineInterface* call_spine) { + Derived*, PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); call_spine->server_initial_metadata().sender.InterceptAndMap( [call](ServerMetadataHandle md) { @@ -778,14 +789,16 @@ inline void InterceptServerInitialMetadata( template inline void InterceptServerInitialMetadata( absl::Status (Derived::Call::*fn)(ServerMetadata&), - typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { + typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); call_spine->server_initial_metadata().sender.InterceptAndMap( [call, call_spine]( ServerMetadataHandle md) -> absl::optional { auto status = call->OnServerInitialMetadata(*md); if (status.ok()) return std::move(md); - return call_spine->Cancel(ServerMetadataFromStatus(status)); + call_spine->PushServerTrailingMetadata( + ServerMetadataFromStatus(status)); + return absl::nullopt; }); } @@ -793,7 +806,7 @@ template inline void InterceptServerInitialMetadata( void (Derived::Call::*fn)(ServerMetadata&, Derived*), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); call_spine->server_initial_metadata().sender.InterceptAndMap( [call, channel](ServerMetadataHandle md) { @@ -806,14 +819,16 @@ template inline void InterceptServerInitialMetadata( absl::Status (Derived::Call::*fn)(ServerMetadata&, Derived*), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); call_spine->server_initial_metadata().sender.InterceptAndMap( [call, call_spine, channel]( ServerMetadataHandle md) -> absl::optional { auto status = call->OnServerInitialMetadata(*md, channel); if (status.ok()) return std::move(md); - return call_spine->Cancel(ServerMetadataFromStatus(status)); + call_spine->PullServerTrailingMetadata( + ServerMetadataFromStatus(status)); + return absl::nullopt; }); } @@ -885,13 +900,14 @@ inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*, template inline void InterceptServerToClientMessage( ServerMetadataHandle (Derived::Call::*fn)(const Message&), - typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { + typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage); call_spine->server_to_client_messages().sender.InterceptAndMap( [call, call_spine](MessageHandle msg) -> absl::optional { auto return_md = call->OnServerToClientMessage(*msg); if (return_md == nullptr) return std::move(msg); - return call_spine->Cancel(std::move(return_md)); + call_spine->PushServerTrailingMetadata(std::move(return_md)); + return absl::nullopt; }); } @@ -899,14 +915,15 @@ template inline void InterceptServerToClientMessage( ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage); call_spine->server_to_client_messages().sender.InterceptAndMap( [call, call_spine, channel](MessageHandle msg) -> absl::optional { auto return_md = call->OnServerToClientMessage(*msg, channel); if (return_md == nullptr) return std::move(msg); - return call_spine->Cancel(std::move(return_md)); + call_spine->PushServerTrailingMetadata(std::move(return_md)); + return absl::nullopt; }); } @@ -914,7 +931,7 @@ template inline void InterceptServerToClientMessage( MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage); call_spine->server_to_client_messages().sender.InterceptAndMap( [call, channel](MessageHandle msg) { @@ -926,14 +943,16 @@ template inline void InterceptServerToClientMessage( absl::StatusOr (Derived::Call::*fn)(MessageHandle, Derived*), typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { + PipeBasedCallSpine* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage); call_spine->server_to_client_messages().sender.InterceptAndMap( [call, call_spine, channel](MessageHandle msg) -> absl::optional { auto r = call->OnServerToClientMessage(std::move(msg), channel); if (r.ok()) return std::move(*r); - return call_spine->Cancel(ServerMetadataFromStatus(r.status())); + call_spine->PushServerTrailingMetadata( + ServerMetadataFromStatus(r.status())); + return absl::nullopt; }); } @@ -942,40 +961,25 @@ inline void InterceptServerTrailingMetadata(const NoInterceptor*, void*, void*, template inline void InterceptServerTrailingMetadata( - void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call, - Derived*, CallSpineInterface* call_spine) { - GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata); - call_spine->server_trailing_metadata().sender.InterceptAndMap( - [call](ServerMetadataHandle md) { - call->OnServerTrailingMetadata(*md); - return md; - }); + void (Derived::Call::*)(ServerMetadata&), typename Derived::Call*, Derived*, + PipeBasedCallSpine*) { + gpr_log(GPR_ERROR, + "InterceptServerTrailingMetadata not available for call v2.5"); } - template inline void InterceptServerTrailingMetadata( - void (Derived::Call::*fn)(ServerMetadata&, Derived*), - typename Derived::Call* call, Derived* channel, - CallSpineInterface* call_spine) { - GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata); - call_spine->server_trailing_metadata().sender.InterceptAndMap( - [call, channel](ServerMetadataHandle md) { - call->OnServerTrailingMetadata(*md, channel); - return md; - }); + void (Derived::Call::*)(ServerMetadata&, Derived*), typename Derived::Call*, + Derived*, PipeBasedCallSpine*) { + gpr_log(GPR_ERROR, + "InterceptServerTrailingMetadata not available for call v2.5"); } template inline void InterceptServerTrailingMetadata( - absl::Status (Derived::Call::*fn)(ServerMetadata&), - typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { - GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata); - call_spine->server_trailing_metadata().sender.InterceptAndMap( - [call](ServerMetadataHandle md) -> absl::optional { - auto status = call->OnServerTrailingMetadata(*md); - if (status.ok()) return std::move(md); - return ServerMetadataFromStatus(status); - }); + absl::Status (Derived::Call::*)(ServerMetadata&), typename Derived::Call*, + Derived*, PipeBasedCallSpine*) { + gpr_log(GPR_ERROR, + "InterceptServerTrailingMetadata not available for call v2.5"); } inline void InterceptFinalize(const NoInterceptor*, void*, void*) {} @@ -1085,23 +1089,20 @@ class ImplementChannelFilter : public ChannelFilter, GetContext() ->ManagedNew>( static_cast(this)); + auto* c = DownCast(call_spine); + auto* d = static_cast(this); promise_filter_detail::InterceptClientInitialMetadata( - &Derived::Call::OnClientInitialMetadata, call, - static_cast(this), call_spine); + &Derived::Call::OnClientInitialMetadata, call, d, c); promise_filter_detail::InterceptClientToServerMessage( - &Derived::Call::OnClientToServerMessage, call, - static_cast(this), call_spine); + &Derived::Call::OnClientToServerMessage, call, d, c); promise_filter_detail::InterceptServerInitialMetadata( - &Derived::Call::OnServerInitialMetadata, call, - static_cast(this), call_spine); + &Derived::Call::OnServerInitialMetadata, call, d, c); promise_filter_detail::InterceptServerToClientMessage( - &Derived::Call::OnServerToClientMessage, call, - static_cast(this), call_spine); + &Derived::Call::OnServerToClientMessage, call, d, c); promise_filter_detail::InterceptServerTrailingMetadata( - &Derived::Call::OnServerTrailingMetadata, call, - static_cast(this), call_spine); - promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize, - static_cast(this), call); + &Derived::Call::OnServerTrailingMetadata, call, d, c); + promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize, d, + call); } // Polyfill for the original promise scheme. diff --git a/src/core/lib/channel/server_call_tracer_filter.cc b/src/core/lib/channel/server_call_tracer_filter.cc index 33a202309a2..5effe8f8538 100644 --- a/src/core/lib/channel/server_call_tracer_filter.cc +++ b/src/core/lib/channel/server_call_tracer_filter.cc @@ -108,6 +108,7 @@ ServerCallTracerFilter::Create(const ChannelArgs& /*args*/, } // namespace void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) { + if (IsChaoticGoodEnabled()) return; builder->channel_init()->RegisterFilter( GRPC_SERVER_CHANNEL); } diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 9eee1458bec..3f9e6a840ca 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -48,6 +48,9 @@ const char* const additional_constraints_event_engine_listener = "{}"; const char* const description_free_large_allocator = "If set, return all free bytes from a \042big\042 allocator"; const char* const additional_constraints_free_large_allocator = "{}"; +const char* const description_http2_stats_fix = + "Fix on HTTP2 outgoing data stats reporting"; +const char* const additional_constraints_http2_stats_fix = "{}"; const char* const description_keepalive_fix = "Allows overriding keepalive_permit_without_calls. Refer " "https://github.com/grpc/grpc/pull/33428 for more information."; @@ -139,11 +142,6 @@ const char* const description_work_serializer_dispatch = const char* const additional_constraints_work_serializer_dispatch = "{}"; const uint8_t required_experiments_work_serializer_dispatch[] = { static_cast(grpc_core::kExperimentIdEventEngineClient)}; -#ifdef NDEBUG -const bool kDefaultForDebugOnly = false; -#else -const bool kDefaultForDebugOnly = true; -#endif } // namespace namespace grpc_core { @@ -152,7 +150,7 @@ const ExperimentMetadata g_experiment_metadata[] = { {"call_status_override_on_cancellation", description_call_status_override_on_cancellation, additional_constraints_call_status_override_on_cancellation, nullptr, 0, - kDefaultForDebugOnly, true}, + true, true}, {"call_v3", description_call_v3, additional_constraints_call_v3, nullptr, 0, false, true}, {"canary_client_privacy", description_canary_client_privacy, @@ -167,6 +165,8 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_event_engine_listener, nullptr, 0, false, true}, {"free_large_allocator", description_free_large_allocator, additional_constraints_free_large_allocator, nullptr, 0, false, true}, + {"http2_stats_fix", description_http2_stats_fix, + additional_constraints_http2_stats_fix, nullptr, 0, true, true}, {"keepalive_fix", description_keepalive_fix, additional_constraints_keepalive_fix, nullptr, 0, false, false}, {"keepalive_server_fix", description_keepalive_server_fix, @@ -206,7 +206,7 @@ const ExperimentMetadata g_experiment_metadata[] = { {"tcp_rcv_lowat", description_tcp_rcv_lowat, additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true}, {"trace_record_callops", description_trace_record_callops, - additional_constraints_trace_record_callops, nullptr, 0, false, true}, + additional_constraints_trace_record_callops, nullptr, 0, true, true}, {"unconstrained_max_quota_buffer_size", description_unconstrained_max_quota_buffer_size, additional_constraints_unconstrained_max_quota_buffer_size, nullptr, 0, @@ -248,6 +248,9 @@ const char* const additional_constraints_event_engine_listener = "{}"; const char* const description_free_large_allocator = "If set, return all free bytes from a \042big\042 allocator"; const char* const additional_constraints_free_large_allocator = "{}"; +const char* const description_http2_stats_fix = + "Fix on HTTP2 outgoing data stats reporting"; +const char* const additional_constraints_http2_stats_fix = "{}"; const char* const description_keepalive_fix = "Allows overriding keepalive_permit_without_calls. Refer " "https://github.com/grpc/grpc/pull/33428 for more information."; @@ -339,11 +342,6 @@ const char* const description_work_serializer_dispatch = const char* const additional_constraints_work_serializer_dispatch = "{}"; const uint8_t required_experiments_work_serializer_dispatch[] = { static_cast(grpc_core::kExperimentIdEventEngineClient)}; -#ifdef NDEBUG -const bool kDefaultForDebugOnly = false; -#else -const bool kDefaultForDebugOnly = true; -#endif } // namespace namespace grpc_core { @@ -352,7 +350,7 @@ const ExperimentMetadata g_experiment_metadata[] = { {"call_status_override_on_cancellation", description_call_status_override_on_cancellation, additional_constraints_call_status_override_on_cancellation, nullptr, 0, - kDefaultForDebugOnly, true}, + true, true}, {"call_v3", description_call_v3, additional_constraints_call_v3, nullptr, 0, false, true}, {"canary_client_privacy", description_canary_client_privacy, @@ -367,6 +365,8 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_event_engine_listener, nullptr, 0, true, true}, {"free_large_allocator", description_free_large_allocator, additional_constraints_free_large_allocator, nullptr, 0, false, true}, + {"http2_stats_fix", description_http2_stats_fix, + additional_constraints_http2_stats_fix, nullptr, 0, true, true}, {"keepalive_fix", description_keepalive_fix, additional_constraints_keepalive_fix, nullptr, 0, false, false}, {"keepalive_server_fix", description_keepalive_server_fix, @@ -406,7 +406,7 @@ const ExperimentMetadata g_experiment_metadata[] = { {"tcp_rcv_lowat", description_tcp_rcv_lowat, additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true}, {"trace_record_callops", description_trace_record_callops, - additional_constraints_trace_record_callops, nullptr, 0, false, true}, + additional_constraints_trace_record_callops, nullptr, 0, true, true}, {"unconstrained_max_quota_buffer_size", description_unconstrained_max_quota_buffer_size, additional_constraints_unconstrained_max_quota_buffer_size, nullptr, 0, @@ -448,6 +448,9 @@ const char* const additional_constraints_event_engine_listener = "{}"; const char* const description_free_large_allocator = "If set, return all free bytes from a \042big\042 allocator"; const char* const additional_constraints_free_large_allocator = "{}"; +const char* const description_http2_stats_fix = + "Fix on HTTP2 outgoing data stats reporting"; +const char* const additional_constraints_http2_stats_fix = "{}"; const char* const description_keepalive_fix = "Allows overriding keepalive_permit_without_calls. Refer " "https://github.com/grpc/grpc/pull/33428 for more information."; @@ -539,11 +542,6 @@ const char* const description_work_serializer_dispatch = const char* const additional_constraints_work_serializer_dispatch = "{}"; const uint8_t required_experiments_work_serializer_dispatch[] = { static_cast(grpc_core::kExperimentIdEventEngineClient)}; -#ifdef NDEBUG -const bool kDefaultForDebugOnly = false; -#else -const bool kDefaultForDebugOnly = true; -#endif } // namespace namespace grpc_core { @@ -552,7 +550,7 @@ const ExperimentMetadata g_experiment_metadata[] = { {"call_status_override_on_cancellation", description_call_status_override_on_cancellation, additional_constraints_call_status_override_on_cancellation, nullptr, 0, - kDefaultForDebugOnly, true}, + true, true}, {"call_v3", description_call_v3, additional_constraints_call_v3, nullptr, 0, false, true}, {"canary_client_privacy", description_canary_client_privacy, @@ -567,6 +565,8 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_event_engine_listener, nullptr, 0, true, true}, {"free_large_allocator", description_free_large_allocator, additional_constraints_free_large_allocator, nullptr, 0, false, true}, + {"http2_stats_fix", description_http2_stats_fix, + additional_constraints_http2_stats_fix, nullptr, 0, true, true}, {"keepalive_fix", description_keepalive_fix, additional_constraints_keepalive_fix, nullptr, 0, false, false}, {"keepalive_server_fix", description_keepalive_server_fix, @@ -606,7 +606,7 @@ const ExperimentMetadata g_experiment_metadata[] = { {"tcp_rcv_lowat", description_tcp_rcv_lowat, additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true}, {"trace_record_callops", description_trace_record_callops, - additional_constraints_trace_record_callops, nullptr, 0, false, true}, + additional_constraints_trace_record_callops, nullptr, 0, true, true}, {"unconstrained_max_quota_buffer_size", description_unconstrained_max_quota_buffer_size, additional_constraints_unconstrained_max_quota_buffer_size, nullptr, 0, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index a48a97d63dc..faf13fca4cf 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -57,16 +57,8 @@ namespace grpc_core { #ifdef GRPC_EXPERIMENTS_ARE_FINAL #if defined(GRPC_CFSTREAM) -#ifndef NDEBUG #define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION -#endif -inline bool IsCallStatusOverrideOnCancellationEnabled() { -#ifdef NDEBUG - return false; -#else - return true; -#endif -} +inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; } inline bool IsCallV3Enabled() { return false; } inline bool IsCanaryClientPrivacyEnabled() { return false; } inline bool IsClientPrivacyEnabled() { return false; } @@ -74,6 +66,8 @@ inline bool IsEventEngineClientEnabled() { return false; } inline bool IsEventEngineDnsEnabled() { return false; } inline bool IsEventEngineListenerEnabled() { return false; } inline bool IsFreeLargeAllocatorEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX +inline bool IsHttp2StatsFixEnabled() { return true; } inline bool IsKeepaliveFixEnabled() { return false; } inline bool IsKeepaliveServerFixEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_MONITORING_EXPERIMENT @@ -93,23 +87,16 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } -inline bool IsTraceRecordCallopsEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS +inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } #elif defined(GPR_WINDOWS) -#ifndef NDEBUG #define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION -#endif -inline bool IsCallStatusOverrideOnCancellationEnabled() { -#ifdef NDEBUG - return false; -#else - return true; -#endif -} +inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; } inline bool IsCallV3Enabled() { return false; } inline bool IsCanaryClientPrivacyEnabled() { return false; } inline bool IsClientPrivacyEnabled() { return false; } @@ -118,6 +105,8 @@ inline bool IsEventEngineDnsEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_LISTENER inline bool IsEventEngineListenerEnabled() { return true; } inline bool IsFreeLargeAllocatorEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX +inline bool IsHttp2StatsFixEnabled() { return true; } inline bool IsKeepaliveFixEnabled() { return false; } inline bool IsKeepaliveServerFixEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_MONITORING_EXPERIMENT @@ -137,23 +126,16 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } -inline bool IsTraceRecordCallopsEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS +inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } #else -#ifndef NDEBUG #define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION -#endif -inline bool IsCallStatusOverrideOnCancellationEnabled() { -#ifdef NDEBUG - return false; -#else - return true; -#endif -} +inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; } inline bool IsCallV3Enabled() { return false; } inline bool IsCanaryClientPrivacyEnabled() { return false; } inline bool IsClientPrivacyEnabled() { return false; } @@ -163,6 +145,8 @@ inline bool IsEventEngineDnsEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_LISTENER inline bool IsEventEngineListenerEnabled() { return true; } inline bool IsFreeLargeAllocatorEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX +inline bool IsHttp2StatsFixEnabled() { return true; } inline bool IsKeepaliveFixEnabled() { return false; } inline bool IsKeepaliveServerFixEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_MONITORING_EXPERIMENT @@ -182,7 +166,8 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } inline bool IsTcpRcvLowatEnabled() { return false; } -inline bool IsTraceRecordCallopsEnabled() { return false; } +#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS +inline bool IsTraceRecordCallopsEnabled() { return true; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } @@ -200,6 +185,7 @@ enum ExperimentIds { kExperimentIdEventEngineDns, kExperimentIdEventEngineListener, kExperimentIdFreeLargeAllocator, + kExperimentIdHttp2StatsFix, kExperimentIdKeepaliveFix, kExperimentIdKeepaliveServerFix, kExperimentIdMonitoringExperiment, @@ -254,6 +240,10 @@ inline bool IsEventEngineListenerEnabled() { inline bool IsFreeLargeAllocatorEnabled() { return IsExperimentEnabled(kExperimentIdFreeLargeAllocator); } +#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX +inline bool IsHttp2StatsFixEnabled() { + return IsExperimentEnabled(kExperimentIdHttp2StatsFix); +} #define GRPC_EXPERIMENT_IS_INCLUDED_KEEPALIVE_FIX inline bool IsKeepaliveFixEnabled() { return IsExperimentEnabled(kExperimentIdKeepaliveFix); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 9ab849a823b..9be9619ff3c 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -44,7 +44,7 @@ description: Avoid overriding call status of successfully finished calls if it races with cancellation. - expiry: 2024/04/01 + expiry: 2024/08/01 owner: vigneshbabu@google.com test_tags: [] - name: call_v3 @@ -98,6 +98,12 @@ expiry: 2024/08/01 owner: alishananda@google.com test_tags: [resource_quota_test] +- name: http2_stats_fix + description: + Fix on HTTP2 outgoing data stats reporting + expiry: 2024/09/30 + owner: yashkt@google.com + test_tags: [] - name: keepalive_fix description: Allows overriding keepalive_permit_without_calls. @@ -180,7 +186,7 @@ test_tags: [flow_control_test] - name: schedule_cancellation_over_write description: Allow cancellation op to be scheduled over a write - expiry: 2024/04/01 + expiry: 2024/08/01 owner: vigneshbabu@google.com test_tags: [] - name: server_privacy @@ -206,7 +212,7 @@ test_tags: ["endpoint_test", "flow_control_test"] - name: trace_record_callops description: Enables tracing of call batch initiation and completion. - expiry: 2024/04/01 + expiry: 2024/08/01 owner: vigneshbabu@google.com test_tags: [] - name: unconstrained_max_quota_buffer_size diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index 787265a2da2..ce9cf80c114 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -41,7 +41,7 @@ # Supported platforms: ios, windows, posix - name: call_status_override_on_cancellation - default: debug + default: true - name: call_v3 default: false - name: canary_client_privacy @@ -109,7 +109,7 @@ - name: tcp_rcv_lowat default: false - name: trace_record_callops - default: false + default: true - name: unconstrained_max_quota_buffer_size default: false - name: work_serializer_clears_time_cache diff --git a/src/core/lib/promise/for_each.h b/src/core/lib/promise/for_each.h index 4f431a9c4a9..82322add349 100644 --- a/src/core/lib/promise/for_each.h +++ b/src/core/lib/promise/for_each.h @@ -56,13 +56,52 @@ struct Done { static StatusFlag Make(bool cancelled) { return StatusFlag(!cancelled); } }; +template +struct NextValueTraits; + +enum class NextValueType { + kValue, + kEndOfStream, + kError, +}; + +template +struct NextValueTraits> { + using Value = typename T::value_type; + + static NextValueType Type(const T& t) { + if (t.has_value()) return NextValueType::kValue; + if (t.cancelled()) return NextValueType::kError; + return NextValueType::kEndOfStream; + } + + static Value& MutableValue(T& t) { return *t; } +}; + +template +struct NextValueTraits>> { + using Value = T; + + static NextValueType Type(const ValueOrFailure>& t) { + if (t.ok()) { + if (t.value().has_value()) return NextValueType::kValue; + return NextValueType::kEndOfStream; + } + return NextValueType::kError; + } + + static Value& MutableValue(ValueOrFailure>& t) { + return **t; + } +}; + template class ForEach { private: using ReaderNext = decltype(std::declval().Next()); using ReaderResult = typename PollTraits()())>::Type; - using ReaderResultValue = typename ReaderResult::value_type; + using ReaderResultValue = typename NextValueTraits::Value; using ActionFactory = promise_detail::RepeatedPromiseFactory; using ActionPromise = typename ActionFactory::Promise; @@ -120,22 +159,37 @@ class ForEach { Poll PollReaderNext() { if (grpc_trace_promise_primitives.enabled()) { - gpr_log(GPR_DEBUG, "%s PollReaderNext", DebugTag().c_str()); + gpr_log(GPR_INFO, "%s PollReaderNext", DebugTag().c_str()); } auto r = reader_next_(); if (auto* p = r.value_if_ready()) { - if (grpc_trace_promise_primitives.enabled()) { - gpr_log(GPR_DEBUG, "%s PollReaderNext: got has_value=%s", - DebugTag().c_str(), p->has_value() ? "true" : "false"); - } - if (p->has_value()) { - Destruct(&reader_next_); - auto action = action_factory_.Make(std::move(**p)); - Construct(&in_action_, std::move(action), std::move(*p)); - reading_next_ = false; - return PollAction(); - } else { - return Done::Make(p->cancelled()); + switch (NextValueTraits::Type(*p)) { + case NextValueType::kValue: { + if (grpc_trace_promise_primitives.enabled()) { + gpr_log(GPR_INFO, "%s PollReaderNext: got value", + DebugTag().c_str()); + } + Destruct(&reader_next_); + auto action = action_factory_.Make( + std::move(NextValueTraits::MutableValue(*p))); + Construct(&in_action_, std::move(action), std::move(*p)); + reading_next_ = false; + return PollAction(); + } + case NextValueType::kEndOfStream: { + if (grpc_trace_promise_primitives.enabled()) { + gpr_log(GPR_INFO, "%s PollReaderNext: got end of stream", + DebugTag().c_str()); + } + return Done::Make(false); + } + case NextValueType::kError: { + if (grpc_trace_promise_primitives.enabled()) { + gpr_log(GPR_INFO, "%s PollReaderNext: got error", + DebugTag().c_str()); + } + return Done::Make(true); + } } } return Pending(); @@ -143,7 +197,7 @@ class ForEach { Poll PollAction() { if (grpc_trace_promise_primitives.enabled()) { - gpr_log(GPR_DEBUG, "%s PollAction", DebugTag().c_str()); + gpr_log(GPR_INFO, "%s PollAction", DebugTag().c_str()); } auto r = in_action_.promise(); if (auto* p = r.value_if_ready()) { diff --git a/src/core/lib/promise/pipe.h b/src/core/lib/promise/pipe.h index a1533bbcae5..f56567e6cf2 100644 --- a/src/core/lib/promise/pipe.h +++ b/src/core/lib/promise/pipe.h @@ -89,7 +89,7 @@ class NextResult final { const T& operator*() const; T& operator*(); // Only valid if !has_value() - bool cancelled() { return cancelled_; } + bool cancelled() const { return cancelled_; } private: RefCountedPtr> center_; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index a8e026b0ece..08a3aeb02a5 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -2739,7 +2739,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall { ScopedContext context(this); args->channel->channel_stack()->stats_plugin_group->AddClientCallTracers( *args->path, args->registered_method, this->context()); - send_initial_metadata_ = GetContext()->MakePooled(); + send_initial_metadata_ = Arena::MakePooled(); send_initial_metadata_->Set(HttpPathMetadata(), std::move(*args->path)); if (args->authority.has_value()) { send_initial_metadata_->Set(HttpAuthorityMetadata(), @@ -2818,7 +2818,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall { } RefCountedPtr MakeCallSpine(CallArgs call_args) final { - class WrappingCallSpine final : public CallSpineInterface { + class WrappingCallSpine final : public PipeBasedCallSpine { public: WrappingCallSpine(ClientPromiseBasedCall* call, ClientMetadataHandle metadata) @@ -2859,14 +2859,14 @@ class ClientPromiseBasedCall final : public PromiseBasedCall { return call_->server_to_client_messages_; } - Pipe& server_trailing_metadata() override { - return server_trailing_metadata_; - } - Latch& cancel_latch() override { return cancel_error_; } + Latch& was_cancelled_latch() override { + return was_cancelled_latch_; + } + Party& party() override { return *call_; } Arena* arena() override { return call_->arena(); } @@ -2886,6 +2886,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall { Pipe client_initial_metadata_{call_->arena()}; Pipe server_trailing_metadata_{call_->arena()}; Latch cancel_error_; + Latch was_cancelled_latch_; }; GPR_ASSERT(call_args.server_initial_metadata == &server_initial_metadata_.sender); @@ -3700,11 +3701,12 @@ ServerPromiseBasedCall::MakeTopOfServerCallPromise( /////////////////////////////////////////////////////////////////////////////// // CallSpine based Server Call -class ServerCallSpine final : public CallSpineInterface, +class ServerCallSpine final : public PipeBasedCallSpine, public ServerCallContext, public BasicPromiseBasedCall { public: - ServerCallSpine(ServerInterface* server, Channel* channel, Arena* arena); + ServerCallSpine(ClientMetadataHandle client_initial_metadata, + ServerInterface* server, Channel* channel, Arena* arena); // CallSpineInterface Pipe& client_initial_metadata() override { @@ -3719,10 +3721,8 @@ class ServerCallSpine final : public CallSpineInterface, Pipe& server_to_client_messages() override { return server_to_client_messages_; } - Pipe& server_trailing_metadata() override { - return server_trailing_metadata_; - } Latch& cancel_latch() override { return cancel_latch_; } + Latch& was_cancelled_latch() override { return was_cancelled_latch_; } Party& party() override { return *this; } Arena* arena() override { return BasicPromiseBasedCall::arena(); } void IncrementRefCount() override { InternalRef("CallSpine"); } @@ -3735,7 +3735,9 @@ class ServerCallSpine final : public CallSpineInterface, } void CancelWithError(grpc_error_handle error) override { SpawnInfallible("CancelWithError", [this, error = std::move(error)] { - std::ignore = Cancel(ServerMetadataFromStatus(error)); + auto status = ServerMetadataFromStatus(error); + status->Set(GrpcCallWasCancelled(), true); + PushServerTrailingMetadata(std::move(status)); return Empty{}; }); } @@ -3784,15 +3786,15 @@ class ServerCallSpine final : public CallSpineInterface, Pipe client_to_server_messages_; // Messages travelling from the transport to the application. Pipe server_to_client_messages_; - // Trailing metadata from server to client - Pipe server_trailing_metadata_; // Latch that can be set to terminate the call Latch cancel_latch_; + Latch was_cancelled_latch_; grpc_byte_buffer** recv_message_ = nullptr; ClientMetadataHandle client_initial_metadata_stored_; }; -ServerCallSpine::ServerCallSpine(ServerInterface* server, Channel* channel, +ServerCallSpine::ServerCallSpine(ClientMetadataHandle client_initial_metadata, + ServerInterface* server, Channel* channel, Arena* arena) : BasicPromiseBasedCall(arena, 0, 1, [channel, server]() -> grpc_call_create_args { @@ -3811,11 +3813,15 @@ ServerCallSpine::ServerCallSpine(ServerInterface* server, Channel* channel, client_initial_metadata_(arena), server_initial_metadata_(arena), client_to_server_messages_(arena), - server_to_client_messages_(arena), - server_trailing_metadata_(arena) { + server_to_client_messages_(arena) { global_stats().IncrementServerCallsCreated(); ScopedContext ctx(this); channel->channel_stack()->InitServerCallSpine(this); + SpawnGuarded("push_client_initial_metadata", + [this, md = std::move(client_initial_metadata)]() mutable { + return Map(client_initial_metadata_.sender.Push(std::move(md)), + [](bool r) { return StatusFlag(r); }); + }); } void ServerCallSpine::PublishInitialMetadata( @@ -4081,10 +4087,15 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops, metadata->Set(GrpcMessageMetadata(), Slice(grpc_slice_copy(*details))); } + GPR_ASSERT(metadata != nullptr); return [this, metadata = std::move(metadata)]() mutable { - server_to_client_messages_.sender.Close(); - return Map(server_trailing_metadata_.sender.Push(std::move(metadata)), - [](bool r) { return StatusFlag(r); }); + GPR_ASSERT(metadata != nullptr); + return [this, + metadata = std::move(metadata)]() mutable -> Poll { + GPR_ASSERT(metadata != nullptr); + PushServerTrailingMetadata(std::move(metadata)); + return Success{}; + }; }; }); auto recv_message = @@ -4099,13 +4110,15 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops, }; }); auto primary_ops = AllOk( - std::move(send_initial_metadata), std::move(send_message), - std::move(send_trailing_metadata), std::move(recv_message)); + TrySeq(AllOk(std::move(send_initial_metadata), + std::move(send_message)), + std::move(send_trailing_metadata)), + std::move(recv_message)); if (got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER] != 255) { auto recv_trailing_metadata = MaybeOp( ops, got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER], [this](const grpc_op& op) { return [this, cancelled = op.data.recv_close_on_server.cancelled]() { - return Map(server_trailing_metadata_.receiver.AwaitClosed(), + return Map(WasCancelled(), [cancelled, this](bool result) -> Success { ResetDeadline(); *cancelled = result ? 1 : 0; @@ -4141,14 +4154,15 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops, } } -RefCountedPtr MakeServerCall(ServerInterface* server, - Channel* channel, - Arena* arena) { - return RefCountedPtr( - arena->New(server, channel, arena)); +RefCountedPtr MakeServerCall( + ClientMetadataHandle client_initial_metadata, ServerInterface* server, + Channel* channel, Arena* arena) { + return RefCountedPtr(arena->New( + std::move(client_initial_metadata), server, channel, arena)); } #else -RefCountedPtr MakeServerCall(ServerInterface*, Channel*, +RefCountedPtr MakeServerCall(ClientMetadataHandle, + ServerInterface*, Channel*, Arena*) { Crash("not implemented"); } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 32aba4c3c60..ddc29ed878a 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -158,9 +158,10 @@ class CallContext { template <> struct ContextType {}; -RefCountedPtr MakeServerCall(ServerInterface* server, - Channel* channel, - Arena* arena); +// TODO(ctiller): remove once call-v3 finalized +RefCountedPtr MakeServerCall( + ClientMetadataHandle client_initial_metadata, ServerInterface* server, + Channel* channel, Arena* arena); } // namespace grpc_core diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 7b5fb2b95f9..5e974ec25a6 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -232,7 +232,8 @@ struct Server::RequestedCall { data.registered.optional_payload = optional_payload; } - void Complete(NextResult payload, ClientMetadata& md) { + template + void Complete(OptionalPayload payload, ClientMetadata& md) { Timestamp deadline = GetContext()->deadline(); switch (type) { case RequestedCall::Type::BATCH_CALL: @@ -1301,9 +1302,10 @@ Server::ChannelData::~ChannelData() { Arena* Server::ChannelData::CreateArena() { return channel_->CreateArena(); } absl::StatusOr Server::ChannelData::CreateCall( - ClientMetadata& client_initial_metadata, Arena* arena) { - SetRegisteredMethodOnMetadata(client_initial_metadata); - auto call = MakeServerCall(server_.get(), channel_.get(), arena); + ClientMetadataHandle client_initial_metadata, Arena* arena) { + SetRegisteredMethodOnMetadata(*client_initial_metadata); + auto call = MakeServerCall(std::move(client_initial_metadata), server_.get(), + channel_.get(), arena); InitCall(call); return CallInitiator(std::move(call)); } @@ -1427,10 +1429,10 @@ void Server::ChannelData::InitCall(RefCountedPtr call) { call->SpawnGuarded("request_matcher", [this, call]() { return TrySeq( // Wait for initial metadata to pass through all filters - Map(call->client_initial_metadata().receiver.Next(), - [](NextResult md) + Map(call->PullClientInitialMetadata(), + [](ValueOrFailure md) -> absl::StatusOr { - if (!md.has_value()) { + if (!md.ok()) { return absl::InternalError("Missing metadata"); } if (!md.value()->get_pointer(HttpPathMetadata())) { @@ -1456,24 +1458,19 @@ void Server::ChannelData::InitCall(RefCountedPtr call) { } auto maybe_read_first_message = If( payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, - [call]() { - return call->client_to_server_messages().receiver.Next(); - }, - []() -> NextResult { - return NextResult(); + [call]() { return call->PullClientToServerMessage(); }, + []() -> ValueOrFailure> { + return ValueOrFailure>( + absl::nullopt); }); return TryJoin( - Map(std::move(maybe_read_first_message), - [](NextResult n) { - return ValueOrFailure>{ - std::move(n)}; - }), - rm->MatchRequest(cq_idx()), [md = std::move(md)]() mutable { + std::move(maybe_read_first_message), rm->MatchRequest(cq_idx()), + [md = std::move(md)]() mutable { return ValueOrFailure(std::move(md)); }); }, // Publish call to cq - [](std::tuple, + [](std::tuple, RequestMatcherInterface::MatchResult, ClientMetadataHandle> r) { diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 6e80b96445f..356bcc40269 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -246,7 +246,7 @@ class Server : public ServerInterface, Arena* CreateArena() override; absl::StatusOr CreateCall( - ClientMetadata& client_initial_metadata, Arena* arena) override; + ClientMetadataHandle client_initial_metadata, Arena* arena) override; private: class ConnectivityWatcher; diff --git a/src/core/lib/transport/batch_builder.h b/src/core/lib/transport/batch_builder.h index ee032df12db..68a65ba25be 100644 --- a/src/core/lib/transport/batch_builder.h +++ b/src/core/lib/transport/batch_builder.h @@ -147,8 +147,7 @@ class BatchBuilder { absl::string_view name() const override { return "receive_message"; } MessageHandle IntoMessageHandle() { - return GetContext()->MakePooled(std::move(*payload), - flags); + return Arena::MakePooled(std::move(*payload), flags); } absl::optional payload; @@ -161,7 +160,7 @@ class BatchBuilder { using PendingCompletion::PendingCompletion; Arena::PoolPtr metadata = - GetContext()->MakePooled(); + Arena::MakePooled(); protected: ~PendingReceiveMetadata() = default; @@ -328,7 +327,7 @@ inline auto BatchBuilder::SendClientTrailingMetadata(Target target) { auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); batch->batch.on_complete = &pc->on_done_closure; batch->batch.send_trailing_metadata = true; - auto metadata = GetContext()->MakePooled(); + auto metadata = Arena::MakePooled(); payload_->send_trailing_metadata.send_trailing_metadata = metadata.get(); payload_->send_trailing_metadata.sent = nullptr; pc->send_trailing_metadata = std::move(metadata); diff --git a/src/core/lib/transport/call_spine.cc b/src/core/lib/transport/call_spine.cc index 0edcc3d335b..6b8ce59916e 100644 --- a/src/core/lib/transport/call_spine.cc +++ b/src/core/lib/transport/call_spine.cc @@ -18,16 +18,7 @@ namespace grpc_core { -void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, - ClientMetadataHandle client_initial_metadata) { - // Send initial metadata. - call_initiator.SpawnGuarded( - "send_initial_metadata", - [client_initial_metadata = std::move(client_initial_metadata), - call_initiator]() mutable { - return call_initiator.PushClientInitialMetadata( - std::move(client_initial_metadata)); - }); +void ForwardCall(CallHandler call_handler, CallInitiator call_initiator) { // Read messages from handler into initiator. call_handler.SpawnGuarded("read_messages", [call_handler, call_initiator]() mutable { @@ -88,10 +79,10 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, })), call_initiator.PullServerTrailingMetadata(), [call_handler](ServerMetadataHandle md) mutable { - call_handler.SpawnGuarded( - "recv_trailing_metadata", - [md = std::move(md), call_handler]() mutable { - return call_handler.PushServerTrailingMetadata(std::move(md)); + call_handler.SpawnInfallible( + "recv_trailing", [call_handler, md = std::move(md)]() mutable { + call_handler.PushServerTrailingMetadata(std::move(md)); + return Empty{}; }); return Empty{}; }); @@ -99,9 +90,12 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, } CallInitiatorAndHandler MakeCall( - grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena) { - auto spine = CallSpine::Create(event_engine, arena); - return {CallInitiator(spine), CallHandler(spine)}; + ClientMetadataHandle client_initial_metadata, + grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena, + bool is_arena_owned) { + auto spine = CallSpine::Create(std::move(client_initial_metadata), + event_engine, arena, is_arena_owned); + return {CallInitiator(spine), UnstartedCallHandler(spine)}; } } // namespace grpc_core diff --git a/src/core/lib/transport/call_spine.h b/src/core/lib/transport/call_spine.h index 31381ef9f83..514c92c245d 100644 --- a/src/core/lib/transport/call_spine.h +++ b/src/core/lib/transport/call_spine.h @@ -25,6 +25,7 @@ #include "src/core/lib/promise/party.h" #include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/prioritized_race.h" +#include "src/core/lib/promise/promise.h" #include "src/core/lib/promise/status_flag.h" #include "src/core/lib/promise/try_seq.h" #include "src/core/lib/transport/message.h" @@ -42,12 +43,6 @@ namespace grpc_core { class CallSpineInterface { public: virtual ~CallSpineInterface() = default; - virtual Pipe& client_initial_metadata() = 0; - virtual Pipe& server_initial_metadata() = 0; - virtual Pipe& client_to_server_messages() = 0; - virtual Pipe& server_to_client_messages() = 0; - virtual Pipe& server_trailing_metadata() = 0; - virtual Latch& cancel_latch() = 0; // Add a callback to be called when server trailing metadata is received. void OnDone(absl::AnyInvocable fn) { if (on_done_ == nullptr) { @@ -67,33 +62,24 @@ class CallSpineInterface { virtual void IncrementRefCount() = 0; virtual void Unref() = 0; - // Cancel the call with the given metadata. - // Regarding the `MUST_USE_RESULT absl::nullopt_t`: - // Most cancellation calls right now happen in pipe interceptors; - // there `nullopt` indicates terminate processing of this pipe and close with - // error. - // It's convenient then to have the Cancel operation (setting the latch to - // terminate the call) be the last thing that occurs in a pipe interceptor, - // and this construction supports that (and has helped the author not write - // some bugs). - GRPC_MUST_USE_RESULT absl::nullopt_t Cancel(ServerMetadataHandle metadata) { - GPR_DEBUG_ASSERT(GetContext() == &party()); - auto& c = cancel_latch(); - if (c.is_set()) return absl::nullopt; - c.Set(std::move(metadata)); - CallOnDone(); - client_initial_metadata().sender.CloseWithError(); - server_initial_metadata().sender.CloseWithError(); - client_to_server_messages().sender.CloseWithError(); - server_to_client_messages().sender.CloseWithError(); - server_trailing_metadata().sender.CloseWithError(); - return absl::nullopt; - } - - auto WaitForCancel() { - GPR_DEBUG_ASSERT(GetContext() == &party()); - return cancel_latch().Wait(); - } + virtual Promise>> + PullServerInitialMetadata() = 0; + virtual Promise PullServerTrailingMetadata() = 0; + virtual Promise PushClientToServerMessage( + MessageHandle message) = 0; + virtual Promise>> + PullClientToServerMessage() = 0; + virtual Promise PushServerToClientMessage( + MessageHandle message) = 0; + virtual Promise>> + PullServerToClientMessage() = 0; + virtual void PushServerTrailingMetadata(ServerMetadataHandle md) = 0; + virtual void FinishSends() = 0; + virtual Promise> + PullClientInitialMetadata() = 0; + virtual Promise PushServerInitialMetadata( + absl::optional md) = 0; + virtual Promise WasCancelled() = 0; // Wrap a promise so that if it returns failure it automatically cancels // the rest of the call. @@ -105,7 +91,7 @@ class CallSpineInterface { using ResultType = typename P::Result; return Map(std::move(promise), [this](ResultType r) { if (!IsStatusOk(r)) { - std::ignore = Cancel(StatusCast(r)); + PushServerTrailingMetadata(StatusCast(r)); } return r; }); @@ -121,7 +107,8 @@ class CallSpineInterface { // Spawn a promise that returns some status-like type; if the status // represents failure automatically cancel the rest of the call. template - void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { + void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory, + DebugLocation whence = {}) { using FactoryType = promise_detail::OncePromiseFactory; using PromiseType = typename FactoryType::Promise; @@ -130,27 +117,158 @@ class CallSpineInterface { std::is_same()))>::value, "SpawnGuarded promise must return a status-like object"); - party().Spawn(name, std::move(promise_factory), [this](ResultType r) { - if (!IsStatusOk(r)) { - if (grpc_trace_promise_primitives.enabled()) { - gpr_log(GPR_DEBUG, "SpawnGuarded sees failure: %s", - r.ToString().c_str()); - } - std::ignore = Cancel(StatusCast(std::move(r))); - } - }); + party().Spawn( + name, std::move(promise_factory), [this, whence](ResultType r) { + if (!IsStatusOk(r)) { + if (grpc_trace_promise_primitives.enabled()) { + gpr_log(GPR_INFO, "SpawnGuarded sees failure: %s (source: %s:%d)", + r.ToString().c_str(), whence.file(), whence.line()); + } + auto status = StatusCast(std::move(r)); + status->Set(GrpcCallWasCancelled(), true); + PushServerTrailingMetadata(std::move(status)); + } + }); } private: absl::AnyInvocable on_done_{nullptr}; }; -class CallSpine final : public CallSpineInterface, public Party { +// Implementation of CallSpine atop the v2 Pipe based arrangement. +// This implementation will go away in favor of an implementation atop +// CallFilters by the time v3 lands. +class PipeBasedCallSpine : public CallSpineInterface { + public: + virtual Pipe& client_initial_metadata() = 0; + virtual Pipe& server_initial_metadata() = 0; + virtual Pipe& client_to_server_messages() = 0; + virtual Pipe& server_to_client_messages() = 0; + virtual Latch& cancel_latch() = 0; + virtual Latch& was_cancelled_latch() = 0; + + Promise>> + PullServerInitialMetadata() final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return Map(server_initial_metadata().receiver.Next(), + [](NextResult md) + -> ValueOrFailure> { + if (!md.has_value()) { + if (md.cancelled()) return Failure{}; + return absl::optional(); + } + return absl::optional(std::move(*md)); + }); + } + + Promise PullServerTrailingMetadata() final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return cancel_latch().Wait(); + } + + Promise>> + PullServerToClientMessage() final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return Map(server_to_client_messages().receiver.Next(), MapNextMessage); + } + + Promise PushClientToServerMessage(MessageHandle message) final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return Map(client_to_server_messages().sender.Push(std::move(message)), + [](bool r) { return StatusFlag(r); }); + } + + Promise>> + PullClientToServerMessage() final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return Map(client_to_server_messages().receiver.Next(), MapNextMessage); + } + + Promise PushServerToClientMessage(MessageHandle message) final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return Map(server_to_client_messages().sender.Push(std::move(message)), + [](bool r) { return StatusFlag(r); }); + } + + void FinishSends() final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + client_to_server_messages().sender.Close(); + } + + void PushServerTrailingMetadata(ServerMetadataHandle metadata) final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + auto& c = cancel_latch(); + if (c.is_set()) return; + const bool was_cancelled = + metadata->get(GrpcCallWasCancelled()).value_or(false); + c.Set(std::move(metadata)); + CallOnDone(); + was_cancelled_latch().Set(was_cancelled); + client_initial_metadata().sender.CloseWithError(); + server_initial_metadata().sender.Close(); + client_to_server_messages().sender.CloseWithError(); + server_to_client_messages().sender.Close(); + } + + Promise WasCancelled() final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return was_cancelled_latch().Wait(); + } + + Promise> PullClientInitialMetadata() + final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return Map(client_initial_metadata().receiver.Next(), + [](NextResult md) + -> ValueOrFailure { + if (!md.has_value()) return Failure{}; + return std::move(*md); + }); + } + + Promise PushServerInitialMetadata( + absl::optional md) final { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return If( + md.has_value(), + [&md, this]() { + return Map(server_initial_metadata().sender.Push(std::move(*md)), + [](bool ok) { return StatusFlag(ok); }); + }, + [this]() { + server_initial_metadata().sender.Close(); + return []() -> StatusFlag { return Success{}; }; + }); + } + + private: + static ValueOrFailure> MapNextMessage( + NextResult r) { + if (!r.has_value()) { + if (r.cancelled()) return Failure{}; + return absl::optional(); + } + return absl::optional(std::move(*r)); + } +}; + +class CallSpine final : public PipeBasedCallSpine, public Party { public: static RefCountedPtr Create( - grpc_event_engine::experimental::EventEngine* event_engine, - Arena* arena) { - return RefCountedPtr(arena->New(event_engine, arena)); + ClientMetadataHandle client_initial_metadata, + grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena, + bool is_arena_owned) { + auto spine = RefCountedPtr( + arena->New(event_engine, arena, is_arena_owned)); + spine->SpawnInfallible( + "push_client_initial_metadata", + [spine = spine.get(), client_initial_metadata = std::move( + client_initial_metadata)]() mutable { + return Map(spine->client_initial_metadata_.sender.Push( + std::move(client_initial_metadata)), + [](bool) { return Empty{}; }); + }); + return spine; } Pipe& client_initial_metadata() override { @@ -165,10 +283,8 @@ class CallSpine final : public CallSpineInterface, public Party { Pipe& server_to_client_messages() override { return server_to_client_messages_; } - Pipe& server_trailing_metadata() override { - return server_trailing_metadata_; - } Latch& cancel_latch() override { return cancel_latch_; } + Latch& was_cancelled_latch() override { return was_cancelled_latch_; } Party& party() override { return *this; } Arena* arena() override { return arena_; } void IncrementRefCount() override { Party::IncrementRefCount(); } @@ -177,8 +293,11 @@ class CallSpine final : public CallSpineInterface, public Party { private: friend class Arena; CallSpine(grpc_event_engine::experimental::EventEngine* event_engine, - Arena* arena) - : Party(1), arena_(arena), event_engine_(event_engine) {} + Arena* arena, bool is_arena_owned) + : Party(1), + arena_(arena), + is_arena_owned_(is_arena_owned), + event_engine_(event_engine) {} class ScopedContext : public ScopedActivity, public promise_detail::Context { @@ -208,6 +327,7 @@ class CallSpine final : public CallSpineInterface, public Party { } Arena* arena_; + bool is_arena_owned_; // Initial metadata from client to server Pipe client_initial_metadata_{arena()}; // Initial metadata from server to client @@ -216,10 +336,9 @@ class CallSpine final : public CallSpineInterface, public Party { Pipe client_to_server_messages_{arena()}; // Messages travelling from the transport to the application. Pipe server_to_client_messages_{arena()}; - // Trailing metadata from server to client - Pipe server_trailing_metadata_{arena()}; // Latch that can be set to terminate the call Latch cancel_latch_; + Latch was_cancelled_latch_; // Event engine associated with this call grpc_event_engine::experimental::EventEngine* const event_engine_; }; @@ -229,73 +348,31 @@ class CallInitiator { explicit CallInitiator(RefCountedPtr spine) : spine_(std::move(spine)) {} - auto PushClientInitialMetadata(ClientMetadataHandle md) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map(spine_->client_initial_metadata().sender.Push(std::move(md)), - [](bool ok) { return StatusFlag(ok); }); + template + auto CancelIfFails(Promise promise) { + return spine_->CancelIfFails(std::move(promise)); } auto PullServerInitialMetadata() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map(spine_->server_initial_metadata().receiver.Next(), - [](NextResult md) - -> ValueOrFailure> { - if (!md.has_value()) { - if (md.cancelled()) return Failure{}; - return absl::optional(); - } - return absl::optional(std::move(*md)); - }); - } - - auto PullServerTrailingMetadata() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return PrioritizedRace( - Seq(spine_->server_trailing_metadata().receiver.Next(), - [spine = spine_](NextResult md) mutable { - return [md = std::move(md), - spine]() mutable -> Poll { - // If the pipe was closed at cancellation time, we'll see no - // value here. Return pending and allow the cancellation to win - // the race. - if (!md.has_value()) return Pending{}; - spine->server_trailing_metadata().sender.Close(); - return std::move(*md); - }; - }), - Map(spine_->WaitForCancel(), - [spine = spine_](ServerMetadataHandle md) -> ServerMetadataHandle { - spine->server_trailing_metadata().sender.CloseWithError(); - return md; - })); - } - - auto PullMessage() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return spine_->server_to_client_messages().receiver.Next(); + return spine_->PullServerInitialMetadata(); } auto PushMessage(MessageHandle message) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map( - spine_->client_to_server_messages().sender.Push(std::move(message)), - [](bool r) { return StatusFlag(r); }); + return spine_->PushClientToServerMessage(std::move(message)); } - void FinishSends() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - spine_->client_to_server_messages().sender.Close(); - } + void FinishSends() { spine_->FinishSends(); } - template - auto CancelIfFails(Promise promise) { - return spine_->CancelIfFails(std::move(promise)); + auto PullMessage() { return spine_->PullServerToClientMessage(); } + + auto PullServerTrailingMetadata() { + return spine_->PullServerTrailingMetadata(); } void Cancel() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - std::ignore = - spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError())); + auto status = ServerMetadataFromStatus(absl::CancelledError()); + status->Set(GrpcCallWasCancelled(), true); + spine_->PushServerTrailingMetadata(std::move(status)); } void OnDone(absl::AnyInvocable fn) { spine_->OnDone(std::move(fn)); } @@ -327,55 +404,59 @@ class CallHandler { : spine_(std::move(spine)) {} auto PullClientInitialMetadata() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map(spine_->client_initial_metadata().receiver.Next(), - [](NextResult md) - -> ValueOrFailure { - if (!md.has_value()) return Failure{}; - return std::move(*md); - }); + return spine_->PullClientInitialMetadata(); } auto PushServerInitialMetadata(absl::optional md) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return If( - md.has_value(), - [&md, this]() { - return Map( - spine_->server_initial_metadata().sender.Push(std::move(*md)), - [](bool ok) { return StatusFlag(ok); }); - }, - [this]() { - spine_->server_initial_metadata().sender.Close(); - return []() -> StatusFlag { return Success{}; }; - }); + return spine_->PushServerInitialMetadata(std::move(md)); } - auto PushServerTrailingMetadata(ServerMetadataHandle md) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - spine_->server_initial_metadata().sender.Close(); - spine_->server_to_client_messages().sender.Close(); - spine_->client_to_server_messages().receiver.CloseWithError(); - spine_->CallOnDone(); - return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)), - [](bool ok) { return StatusFlag(ok); }); + void PushServerTrailingMetadata(ServerMetadataHandle status) { + spine_->PushServerTrailingMetadata(std::move(status)); } - auto PullMessage() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return spine_->client_to_server_messages().receiver.Next(); + void OnDone(absl::AnyInvocable fn) { spine_->OnDone(std::move(fn)); } + + template + auto CancelIfFails(Promise promise) { + return spine_->CancelIfFails(std::move(promise)); } auto PushMessage(MessageHandle message) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map( - spine_->server_to_client_messages().sender.Push(std::move(message)), - [](bool ok) { return StatusFlag(ok); }); + return spine_->PushServerToClientMessage(std::move(message)); + } + + auto PullMessage() { return spine_->PullClientToServerMessage(); } + + template + void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory, + DebugLocation whence = {}) { + spine_->SpawnGuarded(name, std::move(promise_factory), whence); } - void Cancel(ServerMetadataHandle status) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - std::ignore = spine_->Cancel(std::move(status)); + template + void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) { + spine_->SpawnInfallible(name, std::move(promise_factory)); + } + + template + auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) { + return spine_->party().SpawnWaitable(name, std::move(promise_factory)); + } + + Arena* arena() { return spine_->arena(); } + + private: + RefCountedPtr spine_; +}; + +class UnstartedCallHandler { + public: + explicit UnstartedCallHandler(RefCountedPtr spine) + : spine_(std::move(spine)) {} + + void PushServerTrailingMetadata(ServerMetadataHandle status) { + spine_->PushServerTrailingMetadata(std::move(status)); } void OnDone(absl::AnyInvocable fn) { spine_->OnDone(std::move(fn)); } @@ -386,8 +467,9 @@ class CallHandler { } template - void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { - spine_->SpawnGuarded(name, std::move(promise_factory)); + void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory, + DebugLocation whence = {}) { + spine_->SpawnGuarded(name, std::move(promise_factory), whence); } template @@ -400,6 +482,11 @@ class CallHandler { return spine_->party().SpawnWaitable(name, std::move(promise_factory)); } + CallHandler V2HackToStartCallWithoutACallFilterStack() { + GPR_ASSERT(DownCast(spine_.get()) != nullptr); + return CallHandler(std::move(spine_)); + } + Arena* arena() { return spine_->arena(); } private: @@ -408,11 +495,13 @@ class CallHandler { struct CallInitiatorAndHandler { CallInitiator initiator; - CallHandler handler; + UnstartedCallHandler handler; }; CallInitiatorAndHandler MakeCall( - grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena); + ClientMetadataHandle client_initial_metadata, + grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena, + bool is_arena_owned); template auto OutgoingMessages(CallHalf h) { @@ -425,8 +514,7 @@ auto OutgoingMessages(CallHalf h) { // Forward a call from `call_handler` to `call_initiator` (with initial metadata // `client_initial_metadata`) -void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, - ClientMetadataHandle client_initial_metadata); +void ForwardCall(CallHandler call_handler, CallInitiator call_initiator); } // namespace grpc_core diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 9a89def3a8e..c824f0ae05c 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -559,7 +559,7 @@ class ServerTransport { // Create a call at the server (or fail) // arena must have been previously allocated by CreateArena() virtual absl::StatusOr CreateCall( - ClientMetadata& client_initial_metadata, Arena* arena) = 0; + ClientMetadataHandle client_initial_metadata, Arena* arena) = 0; protected: ~Acceptor() = default; diff --git a/test/core/end2end/tests/filter_causes_close.cc b/test/core/end2end/tests/filter_causes_close.cc index ba73b4c581f..dc7fdafc667 100644 --- a/test/core/end2end/tests/filter_causes_close.cc +++ b/test/core/end2end/tests/filter_causes_close.cc @@ -102,13 +102,8 @@ const grpc_channel_filter test_filter = { return Immediate(ServerMetadataFromStatus( absl::PermissionDeniedError("Failure that's not preventable."))); }, - [](grpc_channel_element*, CallSpineInterface* args) { - args->client_initial_metadata().receiver.InterceptAndMap( - [args](ClientMetadataHandle) { - return args->Cancel( - ServerMetadataFromStatus(absl::PermissionDeniedError( - "Failure that's not preventable."))); - }); + [](grpc_channel_element*, CallSpineInterface*) { + Crash("Should never be called"); }, grpc_channel_next_op, sizeof(call_data), diff --git a/test/core/end2end/tests/http2_stats.cc b/test/core/end2end/tests/http2_stats.cc index b12d7a431a0..5e497f7b8f2 100644 --- a/test/core/end2end/tests/http2_stats.cc +++ b/test/core/end2end/tests/http2_stats.cc @@ -193,6 +193,9 @@ class NewFakeStatsPlugin : public FakeStatsPlugin { // This test verifies the HTTP2 stats on a stream CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) { + if (!IsHttp2StatsFixEnabled()) { + GTEST_SKIP() << "Test needs http2_stats_fix experiment to be enabled"; + } g_mu = new Mutex(); g_client_call_ended_notify = new Notification(); g_server_call_ended_notify = new Notification(); diff --git a/test/core/transport/chaotic_good/client_transport_error_test.cc b/test/core/transport/chaotic_good/client_transport_error_test.cc index 453e99a3663..bd1e1b77f7a 100644 --- a/test/core/transport/chaotic_good/client_transport_error_test.cc +++ b/test/core/transport/chaotic_good/client_transport_error_test.cc @@ -105,22 +105,21 @@ struct MockPromiseEndpoint { auto SendClientToServerMessages(CallInitiator initiator, int num_messages) { return Loop([initiator, num_messages]() mutable { bool has_message = (num_messages > 0); - return If( - has_message, - Seq(initiator.PushMessage(GetContext()->MakePooled()), - [&num_messages]() -> LoopCtl { - --num_messages; - return Continue(); - }), - [initiator]() mutable -> LoopCtl { - initiator.FinishSends(); - return absl::OkStatus(); - }); + return If(has_message, + Seq(initiator.PushMessage(Arena::MakePooled()), + [&num_messages]() -> LoopCtl { + --num_messages; + return Continue(); + }), + [initiator]() mutable -> LoopCtl { + initiator.FinishSends(); + return absl::OkStatus(); + }); }); } ClientMetadataHandle TestInitialMetadata() { - auto md = GetContext()->MakePooled(); + auto md = Arena::MakePooled(); md->Set(HttpPathMetadata(), Slice::FromStaticString("/test")); return md; } @@ -178,14 +177,13 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) { std::move(control_endpoint.promise_endpoint), std::move(data_endpoint.promise_endpoint), MakeChannelArgs(), event_engine(), HPackParser(), HPackCompressor()); - auto call = - MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator())); - transport->StartCall(std::move(call.handler)); - call.initiator.SpawnGuarded("test-send", [initiator = - call.initiator]() mutable { - return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()), - SendClientToServerMessages(initiator, 1)); - }); + auto call = MakeCall(TestInitialMetadata(), event_engine().get(), + Arena::Create(8192, memory_allocator()), true); + transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack()); + call.initiator.SpawnGuarded("test-send", + [initiator = call.initiator]() mutable { + return SendClientToServerMessages(initiator, 1); + }); StrictMock> on_done; EXPECT_CALL(on_done, Call()); call.initiator.SpawnInfallible( @@ -193,7 +191,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) { return Seq( initiator.PullServerInitialMetadata(), [](ValueOrFailure> md) { - EXPECT_FALSE(md.ok()); + EXPECT_TRUE(md.ok()); return Empty{}; }, initiator.PullServerTrailingMetadata(), @@ -224,14 +222,13 @@ TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) { std::move(control_endpoint.promise_endpoint), std::move(data_endpoint.promise_endpoint), MakeChannelArgs(), event_engine(), HPackParser(), HPackCompressor()); - auto call = - MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator())); - transport->StartCall(std::move(call.handler)); - call.initiator.SpawnGuarded("test-send", [initiator = - call.initiator]() mutable { - return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()), - SendClientToServerMessages(initiator, 1)); - }); + auto call = MakeCall(TestInitialMetadata(), event_engine().get(), + Arena::Create(8192, memory_allocator()), true); + transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack()); + call.initiator.SpawnGuarded("test-send", + [initiator = call.initiator]() mutable { + return SendClientToServerMessages(initiator, 1); + }); StrictMock> on_done; EXPECT_CALL(on_done, Call()); call.initiator.SpawnInfallible( @@ -239,7 +236,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) { return Seq( initiator.PullServerInitialMetadata(), [](ValueOrFailure> md) { - EXPECT_FALSE(md.ok()); + EXPECT_TRUE(md.ok()); return Empty{}; }, initiator.PullServerTrailingMetadata(), @@ -278,22 +275,22 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) { std::move(control_endpoint.promise_endpoint), std::move(data_endpoint.promise_endpoint), MakeChannelArgs(), event_engine(), HPackParser(), HPackCompressor()); - auto call1 = - MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator())); - transport->StartCall(std::move(call1.handler)); - auto call2 = - MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator())); - transport->StartCall(std::move(call2.handler)); - call1.initiator.SpawnGuarded("test-send-1", [initiator = - call1.initiator]() mutable { - return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()), - SendClientToServerMessages(initiator, 1)); - }); - call2.initiator.SpawnGuarded("test-send-2", [initiator = - call2.initiator]() mutable { - return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()), - SendClientToServerMessages(initiator, 1)); - }); + auto call1 = MakeCall(TestInitialMetadata(), event_engine().get(), + Arena::Create(8192, memory_allocator()), true); + transport->StartCall( + call1.handler.V2HackToStartCallWithoutACallFilterStack()); + auto call2 = MakeCall(TestInitialMetadata(), event_engine().get(), + Arena::Create(8192, memory_allocator()), true); + transport->StartCall( + call2.handler.V2HackToStartCallWithoutACallFilterStack()); + call1.initiator.SpawnGuarded( + "test-send-1", [initiator = call1.initiator]() mutable { + return SendClientToServerMessages(initiator, 1); + }); + call2.initiator.SpawnGuarded( + "test-send-2", [initiator = call2.initiator]() mutable { + return SendClientToServerMessages(initiator, 1); + }); StrictMock> on_done1; EXPECT_CALL(on_done1, Call()); StrictMock> on_done2; @@ -303,7 +300,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) { return Seq( initiator.PullServerInitialMetadata(), [](ValueOrFailure> md) { - EXPECT_FALSE(md.ok()); + EXPECT_TRUE(md.ok()); return Empty{}; }, initiator.PullServerTrailingMetadata(), @@ -319,7 +316,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) { return Seq( initiator.PullServerInitialMetadata(), [](ValueOrFailure> md) { - EXPECT_FALSE(md.ok()); + EXPECT_TRUE(md.ok()); return Empty{}; }, initiator.PullServerTrailingMetadata(), @@ -350,22 +347,22 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) { std::move(control_endpoint.promise_endpoint), std::move(data_endpoint.promise_endpoint), MakeChannelArgs(), event_engine(), HPackParser(), HPackCompressor()); - auto call1 = - MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator())); - transport->StartCall(std::move(call1.handler)); - auto call2 = - MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator())); - transport->StartCall(std::move(call2.handler)); - call1.initiator.SpawnGuarded("test-send", [initiator = - call1.initiator]() mutable { - return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()), - SendClientToServerMessages(initiator, 1)); - }); - call2.initiator.SpawnGuarded("test-send", [initiator = - call2.initiator]() mutable { - return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()), - SendClientToServerMessages(initiator, 1)); - }); + auto call1 = MakeCall(TestInitialMetadata(), event_engine().get(), + Arena::Create(8192, memory_allocator()), true); + transport->StartCall( + call1.handler.V2HackToStartCallWithoutACallFilterStack()); + auto call2 = MakeCall(TestInitialMetadata(), event_engine().get(), + Arena::Create(8192, memory_allocator()), true); + transport->StartCall( + call2.handler.V2HackToStartCallWithoutACallFilterStack()); + call1.initiator.SpawnGuarded( + "test-send", [initiator = call1.initiator]() mutable { + return SendClientToServerMessages(initiator, 1); + }); + call2.initiator.SpawnGuarded( + "test-send", [initiator = call2.initiator]() mutable { + return SendClientToServerMessages(initiator, 1); + }); StrictMock> on_done1; EXPECT_CALL(on_done1, Call()); StrictMock> on_done2; @@ -375,7 +372,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) { return Seq( initiator.PullServerInitialMetadata(), [](ValueOrFailure> md) { - EXPECT_FALSE(md.ok()); + EXPECT_TRUE(md.ok()); return Empty{}; }, initiator.PullServerTrailingMetadata(), @@ -391,7 +388,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) { return Seq( initiator.PullServerInitialMetadata(), [](ValueOrFailure> md) { - EXPECT_FALSE(md.ok()); + EXPECT_TRUE(md.ok()); return Empty{}; }, initiator.PullServerTrailingMetadata(), diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index 387e483fc21..3acd0c0c273 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -67,7 +67,7 @@ const uint8_t kGrpcStatus0[] = {0x10, 0x0b, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x01, 0x30}; ClientMetadataHandle TestInitialMetadata() { - auto md = GetContext()->MakePooled(); + auto md = Arena::MakePooled(); md->Set(HttpPathMetadata(), Slice::FromStaticString("/demo.Service/Step")); return md; } @@ -78,7 +78,7 @@ auto SendClientToServerMessages(CallInitiator initiator, int num_messages) { bool has_message = (i < num_messages); return If( has_message, - Seq(initiator.PushMessage(GetContext()->MakePooled( + Seq(initiator.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString(std::to_string(i))), 0)), [&i]() -> LoopCtl { ++i; @@ -115,9 +115,9 @@ TEST_F(TransportTest, AddOneStream) { std::move(control_endpoint.promise_endpoint), std::move(data_endpoint.promise_endpoint), MakeChannelArgs(), event_engine(), HPackParser(), HPackCompressor()); - auto call = - MakeCall(event_engine().get(), Arena::Create(1024, memory_allocator())); - transport->StartCall(std::move(call.handler)); + auto call = MakeCall(TestInitialMetadata(), event_engine().get(), + Arena::Create(1024, memory_allocator()), true); + transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack()); StrictMock> on_done; EXPECT_CALL(on_done, Call()); control_endpoint.ExpectWrite( @@ -133,11 +133,10 @@ TEST_F(TransportTest, AddOneStream) { {EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr); control_endpoint.ExpectWrite( {SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr); - call.initiator.SpawnGuarded("test-send", [initiator = - call.initiator]() mutable { - return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()), - SendClientToServerMessages(initiator, 1)); - }); + call.initiator.SpawnGuarded("test-send", + [initiator = call.initiator]() mutable { + return SendClientToServerMessages(initiator, 1); + }); call.initiator.SpawnInfallible( "test-read", [&on_done, initiator = call.initiator]() mutable { return Seq( @@ -152,18 +151,23 @@ TEST_F(TransportTest, AddOneStream) { "/demo.Service/Step"); return Empty{}; }, - initiator.PullMessage(), - [](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678"); + [initiator]() mutable { return initiator.PullMessage(); }, + [](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "12345678"); return Empty{}; }, - initiator.PullMessage(), - [](NextResult msg) { - EXPECT_FALSE(msg.has_value()); + [initiator]() mutable { return initiator.PullMessage(); }, + [](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return Empty{}; }, - initiator.PullServerTrailingMetadata(), + [initiator]() mutable { + return initiator.PullServerTrailingMetadata(); + }, [&on_done](ServerMetadataHandle md) { EXPECT_EQ(md->get(GrpcStatusMetadata()).value(), GRPC_STATUS_OK); on_done.Call(); @@ -198,9 +202,9 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { std::move(control_endpoint.promise_endpoint), std::move(data_endpoint.promise_endpoint), MakeChannelArgs(), event_engine(), HPackParser(), HPackCompressor()); - auto call = - MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator())); - transport->StartCall(std::move(call.handler)); + auto call = MakeCall(TestInitialMetadata(), event_engine().get(), + Arena::Create(8192, memory_allocator()), true); + transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack()); StrictMock> on_done; EXPECT_CALL(on_done, Call()); control_endpoint.ExpectWrite( @@ -221,11 +225,10 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { {EventEngineSlice::FromCopiedString("1"), Zeros(63)}, nullptr); control_endpoint.ExpectWrite( {SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr); - call.initiator.SpawnGuarded("test-send", [initiator = - call.initiator]() mutable { - return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()), - SendClientToServerMessages(initiator, 2)); - }); + call.initiator.SpawnGuarded("test-send", + [initiator = call.initiator]() mutable { + return SendClientToServerMessages(initiator, 2); + }); call.initiator.SpawnInfallible( "test-read", [&on_done, initiator = call.initiator]() mutable { return Seq( @@ -241,20 +244,25 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { return Empty{}; }, initiator.PullMessage(), - [](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678"); + [](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "12345678"); return Empty{}; }, initiator.PullMessage(), - [](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "87654321"); + [](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "87654321"); return Empty{}; }, initiator.PullMessage(), - [](NextResult msg) { - EXPECT_FALSE(msg.has_value()); + [](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return Empty{}; }, initiator.PullServerTrailingMetadata(), diff --git a/test/core/transport/chaotic_good/server_transport_test.cc b/test/core/transport/chaotic_good/server_transport_test.cc index 7c21a07c72d..a4e0272b87c 100644 --- a/test/core/transport/chaotic_good/server_transport_test.cc +++ b/test/core/transport/chaotic_good/server_transport_test.cc @@ -71,13 +71,13 @@ const uint8_t kGrpcStatus0[] = {0x40, 0x0b, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x01, 0x30}; ServerMetadataHandle TestInitialMetadata() { - auto md = GetContext()->MakePooled(); + auto md = Arena::MakePooled(); md->Set(HttpPathMetadata(), Slice::FromStaticString("/demo.Service/Step")); return md; } ServerMetadataHandle TestTrailingMetadata() { - auto md = GetContext()->MakePooled(); + auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_OK); return md; } @@ -87,7 +87,7 @@ class MockAcceptor : public ServerTransport::Acceptor { virtual ~MockAcceptor() = default; MOCK_METHOD(Arena*, CreateArena, (), (override)); MOCK_METHOD(absl::StatusOr, CreateCall, - (ClientMetadata & client_initial_metadata, Arena* arena), + (ClientMetadataHandle client_initial_metadata, Arena* arena), (override)); }; @@ -113,18 +113,59 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) { {EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr); // Once that's read we'll create a new call auto* call_arena = Arena::Create(1024, memory_allocator()); - CallInitiatorAndHandler call = MakeCall(event_engine().get(), call_arena); EXPECT_CALL(acceptor, CreateArena).WillOnce(Return(call_arena)); + StrictMock> on_done; EXPECT_CALL(acceptor, CreateCall(_, call_arena)) - .WillOnce(WithArgs<0>([call_initiator = std::move(call.initiator)]( - ClientMetadata& client_initial_metadata) { - EXPECT_EQ(client_initial_metadata.get_pointer(HttpPathMetadata()) + .WillOnce(WithArgs<0>([this, call_arena, &on_done]( + ClientMetadataHandle client_initial_metadata) { + EXPECT_EQ(client_initial_metadata->get_pointer(HttpPathMetadata()) ->as_string_view(), "/demo.Service/Step"); - return call_initiator; + CallInitiatorAndHandler call = + MakeCall(std::move(client_initial_metadata), event_engine().get(), + call_arena, true); + auto handler = call.handler.V2HackToStartCallWithoutACallFilterStack(); + handler.SpawnInfallible("test-io", [&on_done, handler]() mutable { + return Seq( + handler.PullClientInitialMetadata(), + [](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + EXPECT_EQ(md.value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); + return Empty{}; + }, + [handler]() mutable { return handler.PullMessage(); }, + [](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "12345678"); + return Empty{}; + }, + [handler]() mutable { return handler.PullMessage(); }, + [](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); + return Empty{}; + }, + [handler]() mutable { + return handler.PushServerInitialMetadata(TestInitialMetadata()); + }, + [handler]() mutable { + return handler.PushMessage(Arena::MakePooled( + SliceBuffer(Slice::FromCopiedString("87654321")), 0)); + }, + [handler, &on_done]() mutable { + handler.PushServerTrailingMetadata(TestTrailingMetadata()); + on_done.Call(); + return Empty{}; + }); + }); + return std::move(call.initiator); })); transport->SetAcceptor(&acceptor); - StrictMock> on_done; EXPECT_CALL(on_done, Call()); EXPECT_CALL(*control_endpoint.endpoint, Read) .InSequence(control_endpoint.read_sequence) @@ -145,39 +186,6 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) { sizeof(kGrpcStatus0)), EventEngineSlice::FromCopiedBuffer(kGrpcStatus0, sizeof(kGrpcStatus0))}, nullptr); - call.handler.SpawnInfallible( - "test-io", [&on_done, handler = call.handler]() mutable { - return Seq( - handler.PullClientInitialMetadata(), - [](ValueOrFailure md) { - EXPECT_TRUE(md.ok()); - EXPECT_EQ( - md.value()->get_pointer(HttpPathMetadata())->as_string_view(), - "/demo.Service/Step"); - return Empty{}; - }, - handler.PullMessage(), - [](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678"); - return Empty{}; - }, - handler.PullMessage(), - [](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - return Empty{}; - }, - handler.PushServerInitialMetadata(TestInitialMetadata()), - handler.PushMessage(Arena::MakePooled( - SliceBuffer(Slice::FromCopiedString("87654321")), 0)), - [handler]() mutable { - return handler.PushServerTrailingMetadata(TestTrailingMetadata()); - }, - [&on_done]() mutable { - on_done.Call(); - return Empty{}; - }); - }); // Wait until ClientTransport's internal activities to finish. event_engine()->TickUntilIdle(); event_engine()->UnsetGlobalHooks(); diff --git a/test/core/transport/test_suite/call_content.cc b/test/core/transport/test_suite/call_content.cc index cea4c5a0aa5..9e334f24fb6 100644 --- a/test/core/transport/test_suite/call_content.cc +++ b/test/core/transport/test_suite/call_content.cc @@ -63,21 +63,17 @@ void FillMetadata(const std::vector>& md, TRANSPORT_TEST(UnaryWithSomeContent) { SetServerAcceptor(); - auto initiator = CreateCall(); const auto client_initial_metadata = RandomMetadata(); const auto server_initial_metadata = RandomMetadata(); const auto server_trailing_metadata = RandomMetadata(); const auto client_payload = RandomMessage(); const auto server_payload = RandomMessage(); + auto md = Arena::MakePooled(); + FillMetadata(client_initial_metadata, *md); + auto initiator = CreateCall(std::move(md)); SpawnTestSeq( initiator, "initiator", - [&]() { - auto md = Arena::MakePooled(); - FillMetadata(client_initial_metadata, *md); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); + [&]() mutable { return initiator.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString(client_payload)), 0)); }, @@ -93,14 +89,16 @@ TRANSPORT_TEST(UnaryWithSomeContent) { UnorderedElementsAreArray(server_initial_metadata)); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), server_payload); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + server_payload); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -118,14 +116,16 @@ TRANSPORT_TEST(UnaryWithSomeContent) { UnorderedElementsAreArray(client_initial_metadata)); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), client_payload); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + client_payload); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); auto md = Arena::MakePooled(); FillMetadata(server_initial_metadata, *md); return handler.PushServerInitialMetadata(std::move(md)); @@ -139,10 +139,7 @@ TRANSPORT_TEST(UnaryWithSomeContent) { EXPECT_TRUE(result.ok()); auto md = Arena::MakePooled(); FillMetadata(server_trailing_metadata, *md); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [&](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); WaitForAllPendingWork(); diff --git a/test/core/transport/test_suite/call_shapes.cc b/test/core/transport/test_suite/call_shapes.cc index acfa0ffb741..fdae653119d 100644 --- a/test/core/transport/test_suite/call_shapes.cc +++ b/test/core/transport/test_suite/call_shapes.cc @@ -18,16 +18,12 @@ namespace grpc_core { TRANSPORT_TEST(MetadataOnlyRequest) { SetServerAcceptor(); - auto initiator = CreateCall(); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + auto initiator = CreateCall(std::move(md)); SpawnTestSeq( initiator, "initiator", - [&]() { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); + [&]() mutable { initiator.FinishSends(); return initiator.PullServerInitialMetadata(); }, @@ -53,8 +49,9 @@ TRANSPORT_TEST(MetadataOnlyRequest) { "/foo/bar"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); auto md = Arena::MakePooled(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -63,10 +60,7 @@ TRANSPORT_TEST(MetadataOnlyRequest) { EXPECT_TRUE(result.ok()); auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [&](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); WaitForAllPendingWork(); @@ -79,16 +73,12 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) { "rolling out soon, so leaving this disabled."; SetServerAcceptor(); - auto initiator = CreateCall(); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + auto initiator = CreateCall(std::move(md)); SpawnTestSeq( initiator, "initiator", - [&]() { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); + [&]() mutable { // We don't close the sending stream here. return initiator.PullServerInitialMetadata(); }, @@ -123,10 +113,7 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) { EXPECT_TRUE(result.ok()); auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [&](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); WaitForAllPendingWork(); @@ -139,16 +126,12 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) { "rolling out soon, so leaving this disabled."; SetServerAcceptor(); - auto initiator = CreateCall(); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + auto initiator = CreateCall(std::move(md)); SpawnTestSeq( initiator, "initiator", - [&]() { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); + [&]() mutable { // We don't close the sending stream here. return initiator.PullServerInitialMetadata(); }, @@ -175,10 +158,7 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) { // and don't send initial metadata - just trailing metadata. auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [&](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); WaitForAllPendingWork(); @@ -186,18 +166,9 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) { TRANSPORT_TEST(CanCreateCallThenAbandonIt) { SetServerAcceptor(); - auto initiator = CreateCall(); - SpawnTestSeq( - initiator, "start-call", - [&]() { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); - return Empty{}; - }); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + auto initiator = CreateCall(std::move(md)); auto handler = TickUntilServerCall(); SpawnTestSeq(initiator, "end-call", [&]() { initiator.Cancel(); @@ -208,16 +179,12 @@ TRANSPORT_TEST(CanCreateCallThenAbandonIt) { TRANSPORT_TEST(UnaryRequest) { SetServerAcceptor(); - auto initiator = CreateCall(); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + auto initiator = CreateCall(std::move(md)); SpawnTestSeq( initiator, "initiator", - [&]() { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); + [&]() mutable { return initiator.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString("hello world")), 0)); }, @@ -233,15 +200,16 @@ TRANSPORT_TEST(UnaryRequest) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -259,14 +227,16 @@ TRANSPORT_TEST(UnaryRequest) { "/foo/bar"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world"); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "hello world"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); auto md = Arena::MakePooled(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -280,10 +250,7 @@ TRANSPORT_TEST(UnaryRequest) { EXPECT_TRUE(result.ok()); auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [&](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); WaitForAllPendingWork(); @@ -291,16 +258,12 @@ TRANSPORT_TEST(UnaryRequest) { TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { SetServerAcceptor(); - auto initiator = CreateCall(); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + auto initiator = CreateCall(std::move(md)); SpawnTestSeq( initiator, "initiator", - [&]() { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); + [&]() mutable { return initiator.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString("hello world")), 0)); }, @@ -316,9 +279,10 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullServerTrailingMetadata(); }, @@ -337,9 +301,11 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { "/foo/bar"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world"); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "hello world"); auto md = Arena::MakePooled(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -353,10 +319,7 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { EXPECT_TRUE(result.ok()); auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [&](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); WaitForAllPendingWork(); @@ -364,18 +327,12 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { SetServerAcceptor(); - auto initiator = CreateCall(); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + auto initiator = CreateCall(std::move(md)); SpawnTestSeq( initiator, "initiator", - [&]() { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); - return initiator.PullServerInitialMetadata(); - }, + [&]() mutable { return initiator.PullServerInitialMetadata(); }, [&](ValueOrFailure> md) { EXPECT_TRUE(md.ok()); EXPECT_TRUE(md.value().has_value()); @@ -389,15 +346,16 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { initiator.FinishSends(); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -422,14 +380,16 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { EXPECT_TRUE(result.ok()); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world"); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "hello world"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return handler.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); }, @@ -437,10 +397,7 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { EXPECT_TRUE(result.ok()); auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [&](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); WaitForAllPendingWork(); @@ -448,18 +405,12 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { TRANSPORT_TEST(ClientStreamingRequest) { SetServerAcceptor(); - auto initiator = CreateCall(); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + auto initiator = CreateCall(std::move(md)); SpawnTestSeq( initiator, "initiator", - [&]() { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); - return initiator.PullServerInitialMetadata(); - }, + [&]() mutable { return initiator.PullServerInitialMetadata(); }, [&](ValueOrFailure> md) { EXPECT_TRUE(md.ok()); EXPECT_TRUE(md.value().has_value()); @@ -493,9 +444,9 @@ TRANSPORT_TEST(ClientStreamingRequest) { initiator.FinishSends(); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -520,40 +471,47 @@ TRANSPORT_TEST(ClientStreamingRequest) { EXPECT_TRUE(result.ok()); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world"); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "hello world"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (2)"); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "hello world (2)"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (3)"); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "hello world (3)"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (4)"); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "hello world (4)"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (5)"); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + "hello world (5)"); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [&](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); WaitForAllPendingWork(); @@ -561,18 +519,12 @@ TRANSPORT_TEST(ClientStreamingRequest) { TRANSPORT_TEST(ServerStreamingRequest) { SetServerAcceptor(); - auto initiator = CreateCall(); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); + auto initiator = CreateCall(std::move(md)); SpawnTestSeq( initiator, "initiator", - [&]() { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar")); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [&](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); - return initiator.PullServerInitialMetadata(); - }, + [&]() mutable { return initiator.PullServerInitialMetadata(); }, [&](ValueOrFailure> md) { EXPECT_TRUE(md.ok()); EXPECT_TRUE(md.value().has_value()); @@ -581,45 +533,51 @@ TRANSPORT_TEST(ServerStreamingRequest) { initiator.FinishSends(); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), "why hello neighbor (2)"); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), "why hello neighbor (3)"); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), "why hello neighbor (4)"); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), "why hello neighbor (5)"); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), "why hello neighbor (6)"); return initiator.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -644,9 +602,9 @@ TRANSPORT_TEST(ServerStreamingRequest) { EXPECT_TRUE(result.ok()); return handler.PullMessage(); }, - [&](NextResult msg) { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [&](ValueOrFailure> msg) { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return handler.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); }, @@ -679,10 +637,7 @@ TRANSPORT_TEST(ServerStreamingRequest) { EXPECT_TRUE(result.ok()); auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [&](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); WaitForAllPendingWork(); diff --git a/test/core/transport/test_suite/stress.cc b/test/core/transport/test_suite/stress.cc index 54e34ba98d5..ca20fb7ff3f 100644 --- a/test/core/transport/test_suite/stress.cc +++ b/test/core/transport/test_suite/stress.cc @@ -30,19 +30,14 @@ TRANSPORT_TEST(ManyUnaryRequests) { std::map client_messages; std::map server_messages; for (int i = 0; i < kNumRequests; i++) { - auto initiator = CreateCall(); + auto md = Arena::MakePooled(); + md->Set(HttpPathMetadata(), Slice::FromCopiedString(std::to_string(i))); + auto initiator = CreateCall(std::move(md)); client_messages[i] = RandomMessage(); server_messages[i] = RandomMessage(); SpawnTestSeq( initiator, make_call_name(i, "initiator"), - [initiator, i]() mutable { - auto md = Arena::MakePooled(); - md->Set(HttpPathMetadata(), - Slice::FromCopiedString(std::to_string(i))); - return initiator.PushClientInitialMetadata(std::move(md)); - }, - [initiator, i, &client_messages](StatusFlag status) mutable { - EXPECT_TRUE(status.ok()); + [initiator, i, &client_messages]() mutable { return initiator.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString(client_messages[i])), 0)); }, @@ -59,16 +54,17 @@ TRANSPORT_TEST(ManyUnaryRequests) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [initiator, i, - &server_messages](NextResult msg) mutable { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [initiator, i, &server_messages]( + ValueOrFailure> msg) mutable { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), server_messages[i]); return initiator.PullMessage(); }, - [initiator](NextResult msg) mutable { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [initiator](ValueOrFailure> msg) mutable { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); return initiator.PullServerTrailingMetadata(); }, [initiator](ValueOrFailure md) mutable { @@ -92,16 +88,17 @@ TRANSPORT_TEST(ManyUnaryRequests) { &*this_call_index)); return handler.PullMessage(); }, - [handler, this_call_index, - &client_messages](NextResult msg) mutable { - EXPECT_TRUE(msg.has_value()); - EXPECT_EQ(msg.value()->payload()->JoinIntoString(), + [handler, this_call_index, &client_messages]( + ValueOrFailure> msg) mutable { + EXPECT_TRUE(msg.ok()); + EXPECT_TRUE(msg.value().has_value()); + EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), client_messages[*this_call_index]); return handler.PullMessage(); }, - [handler](NextResult msg) mutable { - EXPECT_FALSE(msg.has_value()); - EXPECT_FALSE(msg.cancelled()); + [handler](ValueOrFailure> msg) mutable { + EXPECT_TRUE(msg.ok()); + EXPECT_FALSE(msg.value().has_value()); auto md = Arena::MakePooled(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -118,10 +115,7 @@ TRANSPORT_TEST(ManyUnaryRequests) { EXPECT_TRUE(result.ok()); auto md = Arena::MakePooled(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); - return handler.PushServerTrailingMetadata(std::move(md)); - }, - [handler](StatusFlag result) mutable { - EXPECT_TRUE(result.ok()); + handler.PushServerTrailingMetadata(std::move(md)); return Empty{}; }); } diff --git a/test/core/transport/test_suite/test.cc b/test/core/transport/test_suite/test.cc index 6d48a5712fa..7dc767810d7 100644 --- a/test/core/transport/test_suite/test.cc +++ b/test/core/transport/test_suite/test.cc @@ -56,12 +56,16 @@ void TransportTest::SetServerAcceptor() { transport_pair_.server->server_transport()->SetAcceptor(&acceptor_); } -CallInitiator TransportTest::CreateCall() { - auto call = MakeCall(event_engine_.get(), Arena::Create(1024, &allocator_)); - call.handler.SpawnInfallible("start-call", [this, handler = call.handler]() { - transport_pair_.client->client_transport()->StartCall(handler); - return Empty{}; - }); +CallInitiator TransportTest::CreateCall( + ClientMetadataHandle client_initial_metadata) { + auto call = MakeCall(std::move(client_initial_metadata), event_engine_.get(), + Arena::Create(1024, &allocator_), true); + call.handler.SpawnInfallible( + "start-call", [this, handler = call.handler]() mutable { + transport_pair_.client->client_transport()->StartCall( + handler.V2HackToStartCallWithoutACallFilterStack()); + return Empty{}; + }); return std::move(call.initiator); } @@ -231,9 +235,10 @@ Arena* TransportTest::Acceptor::CreateArena() { } absl::StatusOr TransportTest::Acceptor::CreateCall( - ClientMetadata&, Arena* arena) { - auto call = MakeCall(event_engine_, arena); - handlers_.push(std::move(call.handler)); + ClientMetadataHandle client_initial_metadata, Arena* arena) { + auto call = + MakeCall(std::move(client_initial_metadata), event_engine_, arena, true); + handlers_.push(call.handler.V2HackToStartCallWithoutACallFilterStack()); return std::move(call.initiator); } diff --git a/test/core/transport/test_suite/test.h b/test/core/transport/test_suite/test.h index 6981d5ca51f..f0451a4488d 100644 --- a/test/core/transport/test_suite/test.h +++ b/test/core/transport/test_suite/test.h @@ -221,7 +221,7 @@ class TransportTest : public ::testing::Test { rng_(rng) {} void SetServerAcceptor(); - CallInitiator CreateCall(); + CallInitiator CreateCall(ClientMetadataHandle client_initial_metadata); std::string RandomString(int min_length, int max_length, absl::string_view character_set); @@ -272,7 +272,7 @@ class TransportTest : public ::testing::Test { Arena* CreateArena() override; absl::StatusOr CreateCall( - ClientMetadata& client_initial_metadata, Arena* arena) override; + ClientMetadataHandle client_initial_metadata, Arena* arena) override; absl::optional PopHandler(); private: diff --git a/test/distrib/python/test_packages.sh b/test/distrib/python/test_packages.sh index 136632833b1..fa6f2fceeeb 100755 --- a/test/distrib/python/test_packages.sh +++ b/test/distrib/python/test_packages.sh @@ -19,22 +19,25 @@ cd "$(dirname "$0")" shopt -s nullglob +echo "Testing Python packages with input artifacts:" +ls "$EXTERNAL_GIT_ROOT"/input_artifacts + if [[ "$1" == "binary" ]] then echo "Testing Python binary distribution" - ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-[0-9]*.whl) - TOOLS_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio_tools-[0-9]*.whl) - OBSERVABILITY_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio_observability-[0-9]*.whl) + ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[-_0-9a-z.]*.whl) + TOOLS_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*tools[-_0-9a-z.]*.whl) + OBSERVABILITY_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*observability[-_0-9a-z.]*.whl) else echo "Testing Python source distribution" - ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-[0-9]*.tar.gz) - TOOLS_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-tools-[0-9]*.tar.gz) - OBSERVABILITY_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-observability-[0-9]*.tar.gz) + ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[-_0-9a-z.]*.tar.gz) + TOOLS_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*tools[-_0-9a-z.]*.tar.gz) + OBSERVABILITY_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*observability[-_0-9a-z.]*.tar.gz) fi -HEALTH_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-health-checking-[0-9]*.tar.gz) -REFLECTION_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-reflection-[0-9]*.tar.gz) -TESTING_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-testing-[0-9]*.tar.gz) +HEALTH_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*health[_-]*checking[-_0-9a-z.]*.tar.gz) +REFLECTION_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*reflection[-_0-9a-z.]*.tar.gz) +TESTING_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*testing[-_0-9a-z.]*.tar.gz) VIRTUAL_ENV=$(mktemp -d) python3 -m virtualenv "$VIRTUAL_ENV"