pull/37886/head
Craig Tiller 4 months ago
parent 834e5f3582
commit edc045102b
  1. 103
      src/core/server/server.cc

@ -813,55 +813,60 @@ absl::StatusOr<ClientMetadataHandle> CheckClientMetadata(
} // namespace
auto Server::MatchAndPublishCall(CallHandler call_handler) {
call_handler.SpawnGuarded("request_matcher", [this, call_handler]() mutable {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
// Match request with requested call
[this, call_handler](ClientMetadataHandle md) mutable {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call_handler]() mutable { return call_handler.PullMessage(); },
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(0),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[call_handler, this](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
grpc_call* call =
MakeServerCall(call_handler, std::move(md), this,
rc->cq_bound_to_call, rc->initial_metadata);
*rc->call = call;
return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
call_handler.SpawnGuardedUntilCallCompletes(
"request_matcher", [this, call_handler]() mutable {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
// Match request with requested call
[this, call_handler](ClientMetadataHandle md) mutable {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call_handler]() mutable {
return call_handler.PullMessage();
},
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(0),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[call_handler,
this](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
grpc_call* call =
MakeServerCall(call_handler, std::move(md), this,
rc->cq_bound_to_call, rc->initial_metadata);
*rc->call = call;
return Map(
WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
}
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>

Loading…
Cancel
Save