From edc045102b0ec69d876b2b28d1470cbe39323e96 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 10 Oct 2024 15:00:40 -0700 Subject: [PATCH] x --- src/core/server/server.cc | 103 ++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 49 deletions(-) diff --git a/src/core/server/server.cc b/src/core/server/server.cc index 5ec784a5c96..87bb72a0dee 100644 --- a/src/core/server/server.cc +++ b/src/core/server/server.cc @@ -813,55 +813,60 @@ absl::StatusOr CheckClientMetadata( } // namespace auto Server::MatchAndPublishCall(CallHandler call_handler) { - call_handler.SpawnGuarded("request_matcher", [this, call_handler]() mutable { - return TrySeq( - // Wait for initial metadata to pass through all filters - Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata), - // Match request with requested call - [this, call_handler](ClientMetadataHandle md) mutable { - auto* registered_method = static_cast( - md->get(GrpcRegisteredMethod()).value_or(nullptr)); - RequestMatcherInterface* rm; - grpc_server_register_method_payload_handling payload_handling = - GRPC_SRM_PAYLOAD_NONE; - if (registered_method == nullptr) { - rm = unregistered_request_matcher_.get(); - } else { - payload_handling = registered_method->payload_handling; - rm = registered_method->matcher.get(); - } - auto maybe_read_first_message = If( - payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, - [call_handler]() mutable { return call_handler.PullMessage(); }, - []() -> ValueOrFailure> { - return ValueOrFailure>( - absl::nullopt); - }); - return TryJoin( - std::move(maybe_read_first_message), rm->MatchRequest(0), - [md = std::move(md)]() mutable { - return ValueOrFailure(std::move(md)); - }); - }, - // Publish call to cq - [call_handler, this](std::tuple, - RequestMatcherInterface::MatchResult, - ClientMetadataHandle> - r) { - RequestMatcherInterface::MatchResult& mr = std::get<1>(r); - auto md = std::move(std::get<2>(r)); - auto* rc = mr.TakeCall(); - rc->Complete(std::move(std::get<0>(r)), *md); - grpc_call* call = - MakeServerCall(call_handler, std::move(md), this, - rc->cq_bound_to_call, rc->initial_metadata); - *rc->call = call; - return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()), - [rc = std::unique_ptr(rc)](Empty) { - return absl::OkStatus(); - }); - }); - }); + call_handler.SpawnGuardedUntilCallCompletes( + "request_matcher", [this, call_handler]() mutable { + return TrySeq( + // Wait for initial metadata to pass through all filters + Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata), + // Match request with requested call + [this, call_handler](ClientMetadataHandle md) mutable { + auto* registered_method = static_cast( + md->get(GrpcRegisteredMethod()).value_or(nullptr)); + RequestMatcherInterface* rm; + grpc_server_register_method_payload_handling payload_handling = + GRPC_SRM_PAYLOAD_NONE; + if (registered_method == nullptr) { + rm = unregistered_request_matcher_.get(); + } else { + payload_handling = registered_method->payload_handling; + rm = registered_method->matcher.get(); + } + auto maybe_read_first_message = If( + payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, + [call_handler]() mutable { + return call_handler.PullMessage(); + }, + []() -> ValueOrFailure> { + return ValueOrFailure>( + absl::nullopt); + }); + return TryJoin( + std::move(maybe_read_first_message), rm->MatchRequest(0), + [md = std::move(md)]() mutable { + return ValueOrFailure(std::move(md)); + }); + }, + // Publish call to cq + [call_handler, + this](std::tuple, + RequestMatcherInterface::MatchResult, + ClientMetadataHandle> + r) { + RequestMatcherInterface::MatchResult& mr = std::get<1>(r); + auto md = std::move(std::get<2>(r)); + auto* rc = mr.TakeCall(); + rc->Complete(std::move(std::get<0>(r)), *md); + grpc_call* call = + MakeServerCall(call_handler, std::move(md), this, + rc->cq_bound_to_call, rc->initial_metadata); + *rc->call = call; + return Map( + WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()), + [rc = std::unique_ptr(rc)](Empty) { + return absl::OkStatus(); + }); + }); + }); } absl::StatusOr>