|
|
|
@ -57,6 +57,7 @@ |
|
|
|
|
#include "src/core/lib/gprpp/crash.h" |
|
|
|
|
#include "src/core/lib/gprpp/debug_location.h" |
|
|
|
|
#include "src/core/lib/gprpp/mpscq.h" |
|
|
|
|
#include "src/core/lib/gprpp/orphanable.h" |
|
|
|
|
#include "src/core/lib/gprpp/status_helper.h" |
|
|
|
|
#include "src/core/lib/iomgr/exec_ctx.h" |
|
|
|
|
#include "src/core/lib/iomgr/pollset_set.h" |
|
|
|
@ -81,6 +82,7 @@ |
|
|
|
|
#include "src/core/lib/surface/wait_for_cq_end_op.h" |
|
|
|
|
#include "src/core/lib/transport/connectivity_state.h" |
|
|
|
|
#include "src/core/lib/transport/error_utils.h" |
|
|
|
|
#include "src/core/lib/transport/interception_chain.h" |
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
|
|
|
|
|
@ -235,7 +237,8 @@ struct Server::RequestedCall { |
|
|
|
|
|
|
|
|
|
template <typename OptionalPayload> |
|
|
|
|
void Complete(OptionalPayload payload, ClientMetadata& md) { |
|
|
|
|
Timestamp deadline = GetContext<Call>()->deadline(); |
|
|
|
|
Timestamp deadline = |
|
|
|
|
md.get(GrpcTimeoutMetadata()).value_or(Timestamp::InfFuture()); |
|
|
|
|
switch (type) { |
|
|
|
|
case RequestedCall::Type::BATCH_CALL: |
|
|
|
|
CHECK(!payload.has_value()); |
|
|
|
@ -288,23 +291,29 @@ struct Server::RequestedCall { |
|
|
|
|
// application to explicitly request RPCs and then matching those to incoming
|
|
|
|
|
// RPCs, along with a slow path by which incoming RPCs are put on a locked
|
|
|
|
|
// pending list if they aren't able to be matched to an application request.
|
|
|
|
|
class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { |
|
|
|
|
class Server::RealRequestMatcher : public RequestMatcherInterface { |
|
|
|
|
public: |
|
|
|
|
explicit RealRequestMatcherFilterStack(Server* server) |
|
|
|
|
explicit RealRequestMatcher(Server* server) |
|
|
|
|
: server_(server), requests_per_cq_(server->cqs_.size()) {} |
|
|
|
|
|
|
|
|
|
~RealRequestMatcherFilterStack() override { |
|
|
|
|
~RealRequestMatcher() override { |
|
|
|
|
for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { |
|
|
|
|
CHECK_EQ(queue.Pop(), nullptr); |
|
|
|
|
} |
|
|
|
|
CHECK(pending_.empty()); |
|
|
|
|
CHECK(pending_filter_stack_.empty()); |
|
|
|
|
CHECK(pending_promises_.empty()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ZombifyPending() override { |
|
|
|
|
while (!pending_.empty()) { |
|
|
|
|
pending_.front().calld->SetState(CallData::CallState::ZOMBIED); |
|
|
|
|
pending_.front().calld->KillZombie(); |
|
|
|
|
pending_.pop(); |
|
|
|
|
while (!pending_filter_stack_.empty()) { |
|
|
|
|
pending_filter_stack_.front().calld->SetState( |
|
|
|
|
CallData::CallState::ZOMBIED); |
|
|
|
|
pending_filter_stack_.front().calld->KillZombie(); |
|
|
|
|
pending_filter_stack_.pop(); |
|
|
|
|
} |
|
|
|
|
while (!pending_promises_.empty()) { |
|
|
|
|
pending_promises_.front()->Finish(absl::InternalError("Server closed")); |
|
|
|
|
pending_promises_.pop(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -329,35 +338,56 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { |
|
|
|
|
// matching calls
|
|
|
|
|
struct NextPendingCall { |
|
|
|
|
RequestedCall* rc = nullptr; |
|
|
|
|
CallData* pending; |
|
|
|
|
CallData* pending_filter_stack = nullptr; |
|
|
|
|
PendingCallPromises pending_promise; |
|
|
|
|
}; |
|
|
|
|
while (true) { |
|
|
|
|
NextPendingCall pending_call; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&server_->mu_call_); |
|
|
|
|
while (!pending_.empty() && |
|
|
|
|
pending_.front().Age() > server_->max_time_in_pending_queue_) { |
|
|
|
|
pending_.front().calld->SetState(CallData::CallState::ZOMBIED); |
|
|
|
|
pending_.front().calld->KillZombie(); |
|
|
|
|
pending_.pop(); |
|
|
|
|
while (!pending_filter_stack_.empty() && |
|
|
|
|
pending_filter_stack_.front().Age() > |
|
|
|
|
server_->max_time_in_pending_queue_) { |
|
|
|
|
pending_filter_stack_.front().calld->SetState( |
|
|
|
|
CallData::CallState::ZOMBIED); |
|
|
|
|
pending_filter_stack_.front().calld->KillZombie(); |
|
|
|
|
pending_filter_stack_.pop(); |
|
|
|
|
} |
|
|
|
|
if (!pending_.empty()) { |
|
|
|
|
if (!pending_promises_.empty()) { |
|
|
|
|
pending_call.rc = reinterpret_cast<RequestedCall*>( |
|
|
|
|
requests_per_cq_[request_queue_index].Pop()); |
|
|
|
|
if (pending_call.rc != nullptr) { |
|
|
|
|
pending_call.pending = pending_.front().calld; |
|
|
|
|
pending_.pop(); |
|
|
|
|
pending_call.pending_promise = |
|
|
|
|
std::move(pending_promises_.front()); |
|
|
|
|
pending_promises_.pop(); |
|
|
|
|
} |
|
|
|
|
} else if (!pending_filter_stack_.empty()) { |
|
|
|
|
pending_call.rc = reinterpret_cast<RequestedCall*>( |
|
|
|
|
requests_per_cq_[request_queue_index].Pop()); |
|
|
|
|
if (pending_call.rc != nullptr) { |
|
|
|
|
pending_call.pending_filter_stack = |
|
|
|
|
pending_filter_stack_.front().calld; |
|
|
|
|
pending_filter_stack_.pop(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (pending_call.rc == nullptr) break; |
|
|
|
|
if (!pending_call.pending->MaybeActivate()) { |
|
|
|
|
// Zombied Call
|
|
|
|
|
pending_call.pending->KillZombie(); |
|
|
|
|
requests_per_cq_[request_queue_index].Push( |
|
|
|
|
&pending_call.rc->mpscq_node); |
|
|
|
|
if (pending_call.pending_filter_stack != nullptr) { |
|
|
|
|
if (!pending_call.pending_filter_stack->MaybeActivate()) { |
|
|
|
|
// Zombied Call
|
|
|
|
|
pending_call.pending_filter_stack->KillZombie(); |
|
|
|
|
requests_per_cq_[request_queue_index].Push( |
|
|
|
|
&pending_call.rc->mpscq_node); |
|
|
|
|
} else { |
|
|
|
|
pending_call.pending_filter_stack->Publish(request_queue_index, |
|
|
|
|
pending_call.rc); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
pending_call.pending->Publish(request_queue_index, pending_call.rc); |
|
|
|
|
if (!pending_call.pending_promise->Finish( |
|
|
|
|
server(), request_queue_index, pending_call.rc)) { |
|
|
|
|
requests_per_cq_[request_queue_index].Push( |
|
|
|
|
&pending_call.rc->mpscq_node); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -395,7 +425,7 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { |
|
|
|
|
} |
|
|
|
|
if (rc == nullptr) { |
|
|
|
|
calld->SetState(CallData::CallState::PENDING); |
|
|
|
|
pending_.push(PendingCall{calld}); |
|
|
|
|
pending_filter_stack_.push(PendingCallFilterStack{calld}); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -403,91 +433,6 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { |
|
|
|
|
calld->Publish(cq_idx, rc); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(size_t) override { |
|
|
|
|
Crash("not implemented for filter stack request matcher"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Server* server() const final { return server_; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
Server* const server_; |
|
|
|
|
struct PendingCall { |
|
|
|
|
CallData* calld; |
|
|
|
|
Timestamp created = Timestamp::Now(); |
|
|
|
|
Duration Age() { return Timestamp::Now() - created; } |
|
|
|
|
}; |
|
|
|
|
std::queue<PendingCall> pending_; |
|
|
|
|
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class Server::RealRequestMatcherPromises : public RequestMatcherInterface { |
|
|
|
|
public: |
|
|
|
|
explicit RealRequestMatcherPromises(Server* server) |
|
|
|
|
: server_(server), requests_per_cq_(server->cqs_.size()) {} |
|
|
|
|
|
|
|
|
|
~RealRequestMatcherPromises() override { |
|
|
|
|
for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { |
|
|
|
|
CHECK_EQ(queue.Pop(), nullptr); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ZombifyPending() override { |
|
|
|
|
while (!pending_.empty()) { |
|
|
|
|
pending_.front()->Finish(absl::InternalError("Server closed")); |
|
|
|
|
pending_.pop(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void KillRequests(grpc_error_handle error) override { |
|
|
|
|
for (size_t i = 0; i < requests_per_cq_.size(); i++) { |
|
|
|
|
RequestedCall* rc; |
|
|
|
|
while ((rc = reinterpret_cast<RequestedCall*>( |
|
|
|
|
requests_per_cq_[i].Pop())) != nullptr) { |
|
|
|
|
server_->FailCall(i, rc, error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
size_t request_queue_count() const override { |
|
|
|
|
return requests_per_cq_.size(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RequestCallWithPossiblePublish(size_t request_queue_index, |
|
|
|
|
RequestedCall* call) override { |
|
|
|
|
if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) { |
|
|
|
|
// this was the first queued request: we need to lock and start
|
|
|
|
|
// matching calls
|
|
|
|
|
struct NextPendingCall { |
|
|
|
|
RequestedCall* rc = nullptr; |
|
|
|
|
PendingCall pending; |
|
|
|
|
}; |
|
|
|
|
while (true) { |
|
|
|
|
NextPendingCall pending_call; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&server_->mu_call_); |
|
|
|
|
if (!pending_.empty()) { |
|
|
|
|
pending_call.rc = reinterpret_cast<RequestedCall*>( |
|
|
|
|
requests_per_cq_[request_queue_index].Pop()); |
|
|
|
|
if (pending_call.rc != nullptr) { |
|
|
|
|
pending_call.pending = std::move(pending_.front()); |
|
|
|
|
pending_.pop(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (pending_call.rc == nullptr) break; |
|
|
|
|
if (!pending_call.pending->Finish(server(), request_queue_index, |
|
|
|
|
pending_call.rc)) { |
|
|
|
|
requests_per_cq_[request_queue_index].Push( |
|
|
|
|
&pending_call.rc->mpscq_node); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void MatchOrQueue(size_t, CallData*) override { |
|
|
|
|
Crash("not implemented for promises"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest( |
|
|
|
|
size_t start_request_queue_index) override { |
|
|
|
|
for (size_t i = 0; i < requests_per_cq_.size(); i++) { |
|
|
|
@ -509,10 +454,11 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface { |
|
|
|
|
{ |
|
|
|
|
std::vector<std::shared_ptr<ActivityWaiter>> removed_pending; |
|
|
|
|
MutexLock lock(&server_->mu_call_); |
|
|
|
|
while (!pending_.empty() && |
|
|
|
|
pending_.front()->Age() > server_->max_time_in_pending_queue_) { |
|
|
|
|
removed_pending.push_back(std::move(pending_.front())); |
|
|
|
|
pending_.pop(); |
|
|
|
|
while (!pending_promises_.empty() && |
|
|
|
|
pending_promises_.front()->Age() > |
|
|
|
|
server_->max_time_in_pending_queue_) { |
|
|
|
|
removed_pending.push_back(std::move(pending_promises_.front())); |
|
|
|
|
pending_promises_.pop(); |
|
|
|
|
} |
|
|
|
|
for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) { |
|
|
|
|
cq_idx = |
|
|
|
@ -521,14 +467,14 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface { |
|
|
|
|
if (rc != nullptr) break; |
|
|
|
|
} |
|
|
|
|
if (rc == nullptr) { |
|
|
|
|
if (server_->pending_backlog_protector_.Reject(pending_.size(), |
|
|
|
|
if (server_->pending_backlog_protector_.Reject(pending_promises_.size(), |
|
|
|
|
server_->bitgen_)) { |
|
|
|
|
return Immediate(absl::ResourceExhaustedError( |
|
|
|
|
"Too many pending requests for this server")); |
|
|
|
|
} |
|
|
|
|
auto w = std::make_shared<ActivityWaiter>( |
|
|
|
|
GetContext<Activity>()->MakeOwningWaker()); |
|
|
|
|
pending_.push(w); |
|
|
|
|
pending_promises_.push(w); |
|
|
|
|
return OnCancel( |
|
|
|
|
[w]() -> Poll<absl::StatusOr<MatchResult>> { |
|
|
|
|
std::unique_ptr<absl::StatusOr<MatchResult>> r( |
|
|
|
@ -546,6 +492,11 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface { |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
Server* const server_; |
|
|
|
|
struct PendingCallFilterStack { |
|
|
|
|
CallData* calld; |
|
|
|
|
Timestamp created = Timestamp::Now(); |
|
|
|
|
Duration Age() { return Timestamp::Now() - created; } |
|
|
|
|
}; |
|
|
|
|
struct ActivityWaiter { |
|
|
|
|
using ResultType = absl::StatusOr<MatchResult>; |
|
|
|
|
explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {} |
|
|
|
@ -580,8 +531,9 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface { |
|
|
|
|
std::atomic<ResultType*> result{nullptr}; |
|
|
|
|
const Timestamp created = Timestamp::Now(); |
|
|
|
|
}; |
|
|
|
|
using PendingCall = std::shared_ptr<ActivityWaiter>; |
|
|
|
|
std::queue<PendingCall> pending_; |
|
|
|
|
using PendingCallPromises = std::shared_ptr<ActivityWaiter>; |
|
|
|
|
std::queue<PendingCallFilterStack> pending_filter_stack_; |
|
|
|
|
std::queue<PendingCallPromises> pending_promises_; |
|
|
|
|
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -784,13 +736,40 @@ class ChannelBroadcaster { |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Server::TransportConnectivityWatcher
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
class Server::TransportConnectivityWatcher |
|
|
|
|
: public AsyncConnectivityStateWatcherInterface { |
|
|
|
|
public: |
|
|
|
|
TransportConnectivityWatcher(RefCountedPtr<ServerTransport> transport, |
|
|
|
|
RefCountedPtr<Server> server) |
|
|
|
|
: transport_(std::move(transport)), server_(std::move(server)) {} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
|
|
|
|
const absl::Status& /*status*/) override { |
|
|
|
|
// Don't do anything until we are being shut down.
|
|
|
|
|
if (new_state != GRPC_CHANNEL_SHUTDOWN) return; |
|
|
|
|
// Shut down channel.
|
|
|
|
|
MutexLock lock(&server_->mu_global_); |
|
|
|
|
server_->connections_.erase(transport_.get()); |
|
|
|
|
--server_->connections_open_; |
|
|
|
|
server_->MaybeFinishShutdown(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
RefCountedPtr<ServerTransport> transport_; |
|
|
|
|
RefCountedPtr<Server> server_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Server
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
const grpc_channel_filter Server::kServerTopFilter = { |
|
|
|
|
Server::CallData::StartTransportStreamOpBatch, |
|
|
|
|
Server::ChannelData::MakeCallPromise, |
|
|
|
|
nullptr, |
|
|
|
|
[](grpc_channel_element*, CallSpineInterface*) { |
|
|
|
|
// TODO(ctiller): remove the server filter when call-v3 is finalized
|
|
|
|
|
}, |
|
|
|
@ -826,12 +805,91 @@ RefCountedPtr<channelz::ServerNode> CreateChannelzNode( |
|
|
|
|
return channelz_node; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
absl::StatusOr<ClientMetadataHandle> CheckClientMetadata( |
|
|
|
|
ValueOrFailure<ClientMetadataHandle> md) { |
|
|
|
|
if (!md.ok()) { |
|
|
|
|
return absl::InternalError("Missing metadata"); |
|
|
|
|
} |
|
|
|
|
if (!md.value()->get_pointer(HttpPathMetadata())) { |
|
|
|
|
return absl::InternalError("Missing :path header"); |
|
|
|
|
} |
|
|
|
|
if (!md.value()->get_pointer(HttpAuthorityMetadata())) { |
|
|
|
|
return absl::InternalError("Missing :authority header"); |
|
|
|
|
} |
|
|
|
|
return std::move(*md); |
|
|
|
|
} |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
auto Server::MatchAndPublishCall(CallHandler call_handler) { |
|
|
|
|
call_handler.SpawnGuarded("request_matcher", [this, call_handler]() mutable { |
|
|
|
|
return TrySeq( |
|
|
|
|
// Wait for initial metadata to pass through all filters
|
|
|
|
|
Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata), |
|
|
|
|
// Match request with requested call
|
|
|
|
|
[this, call_handler](ClientMetadataHandle md) mutable { |
|
|
|
|
auto* registered_method = static_cast<RegisteredMethod*>( |
|
|
|
|
md->get(GrpcRegisteredMethod()).value_or(nullptr)); |
|
|
|
|
RequestMatcherInterface* rm; |
|
|
|
|
grpc_server_register_method_payload_handling payload_handling = |
|
|
|
|
GRPC_SRM_PAYLOAD_NONE; |
|
|
|
|
if (registered_method == nullptr) { |
|
|
|
|
rm = unregistered_request_matcher_.get(); |
|
|
|
|
} else { |
|
|
|
|
payload_handling = registered_method->payload_handling; |
|
|
|
|
rm = registered_method->matcher.get(); |
|
|
|
|
} |
|
|
|
|
auto maybe_read_first_message = If( |
|
|
|
|
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, |
|
|
|
|
[call_handler]() mutable { return call_handler.PullMessage(); }, |
|
|
|
|
[]() -> ValueOrFailure<absl::optional<MessageHandle>> { |
|
|
|
|
return ValueOrFailure<absl::optional<MessageHandle>>( |
|
|
|
|
absl::nullopt); |
|
|
|
|
}); |
|
|
|
|
return TryJoin<absl::StatusOr>( |
|
|
|
|
std::move(maybe_read_first_message), rm->MatchRequest(0), |
|
|
|
|
[md = std::move(md)]() mutable { |
|
|
|
|
return ValueOrFailure<ClientMetadataHandle>(std::move(md)); |
|
|
|
|
}); |
|
|
|
|
}, |
|
|
|
|
// Publish call to cq
|
|
|
|
|
[call_handler, this](std::tuple<absl::optional<MessageHandle>, |
|
|
|
|
RequestMatcherInterface::MatchResult, |
|
|
|
|
ClientMetadataHandle> |
|
|
|
|
r) { |
|
|
|
|
RequestMatcherInterface::MatchResult& mr = std::get<1>(r); |
|
|
|
|
auto md = std::move(std::get<2>(r)); |
|
|
|
|
auto* rc = mr.TakeCall(); |
|
|
|
|
rc->Complete(std::move(std::get<0>(r)), *md); |
|
|
|
|
grpc_call* call = |
|
|
|
|
MakeServerCall(call_handler, std::move(md), this, |
|
|
|
|
rc->cq_bound_to_call, rc->initial_metadata); |
|
|
|
|
*rc->call = call; |
|
|
|
|
return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()), |
|
|
|
|
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) { |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> |
|
|
|
|
Server::MakeCallDestination(const ChannelArgs& args) { |
|
|
|
|
InterceptionChainBuilder builder(args); |
|
|
|
|
builder.AddOnClientInitialMetadata( |
|
|
|
|
[this](ClientMetadata& md) { SetRegisteredMethodOnMetadata(md); }); |
|
|
|
|
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder( |
|
|
|
|
GRPC_SERVER_CHANNEL, builder); |
|
|
|
|
return builder.Build( |
|
|
|
|
MakeCallDestinationFromHandlerFunction([this](CallHandler handler) { |
|
|
|
|
return MatchAndPublishCall(std::move(handler)); |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Server::Server(const ChannelArgs& args) |
|
|
|
|
: channel_args_(args), |
|
|
|
|
channelz_node_(CreateChannelzNode(args)), |
|
|
|
|
server_call_tracer_factory_(ServerCallTracerFactory::Get(args)), |
|
|
|
|
compression_options_(CompressionOptionsFromChannelArgs(args)), |
|
|
|
|
max_time_in_pending_queue_(Duration::Seconds( |
|
|
|
|
channel_args_ |
|
|
|
|
.GetInt(GRPC_ARG_SERVER_MAX_UNREQUESTED_TIME_IN_SERVER_SECONDS) |
|
|
|
@ -862,15 +920,6 @@ void Server::AddListener(OrphanablePtr<ListenerInterface> listener) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Server::Start() { |
|
|
|
|
auto make_real_request_matcher = |
|
|
|
|
[this]() -> std::unique_ptr<RequestMatcherInterface> { |
|
|
|
|
if (IsPromiseBasedServerCallEnabled()) { |
|
|
|
|
return std::make_unique<RealRequestMatcherPromises>(this); |
|
|
|
|
} else { |
|
|
|
|
return std::make_unique<RealRequestMatcherFilterStack>(this); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
started_ = true; |
|
|
|
|
for (grpc_completion_queue* cq : cqs_) { |
|
|
|
|
if (grpc_cq_can_listen(cq)) { |
|
|
|
@ -878,11 +927,11 @@ void Server::Start() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (unregistered_request_matcher_ == nullptr) { |
|
|
|
|
unregistered_request_matcher_ = make_real_request_matcher(); |
|
|
|
|
unregistered_request_matcher_ = std::make_unique<RealRequestMatcher>(this); |
|
|
|
|
} |
|
|
|
|
for (auto& rm : registered_methods_) { |
|
|
|
|
if (rm.second->matcher == nullptr) { |
|
|
|
|
rm.second->matcher = make_real_request_matcher(); |
|
|
|
|
rm.second->matcher = std::make_unique<RealRequestMatcher>(this); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
@ -913,37 +962,61 @@ grpc_error_handle Server::SetupTransport( |
|
|
|
|
const RefCountedPtr<channelz::SocketNode>& socket_node) { |
|
|
|
|
// Create channel.
|
|
|
|
|
global_stats().IncrementServerChannelsCreated(); |
|
|
|
|
absl::StatusOr<OrphanablePtr<Channel>> channel = |
|
|
|
|
LegacyChannel::Create("", args.SetObject(transport), GRPC_SERVER_CHANNEL); |
|
|
|
|
if (!channel.ok()) { |
|
|
|
|
return absl_status_to_grpc_error(channel.status()); |
|
|
|
|
} |
|
|
|
|
ChannelData* chand = static_cast<ChannelData*>( |
|
|
|
|
grpc_channel_stack_element((*channel)->channel_stack(), 0)->channel_data); |
|
|
|
|
// Set up CQs.
|
|
|
|
|
size_t cq_idx; |
|
|
|
|
for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) { |
|
|
|
|
if (grpc_cq_pollset(cqs_[cq_idx]) == accepting_pollset) break; |
|
|
|
|
} |
|
|
|
|
if (cq_idx == cqs_.size()) { |
|
|
|
|
// Completion queue not found. Pick a random one to publish new calls to.
|
|
|
|
|
cq_idx = static_cast<size_t>(rand()) % std::max<size_t>(1, cqs_.size()); |
|
|
|
|
} |
|
|
|
|
// Set up channelz node.
|
|
|
|
|
intptr_t channelz_socket_uuid = 0; |
|
|
|
|
if (socket_node != nullptr) { |
|
|
|
|
channelz_socket_uuid = socket_node->uuid(); |
|
|
|
|
channelz_node_->AddChildSocket(socket_node); |
|
|
|
|
} |
|
|
|
|
// Initialize chand.
|
|
|
|
|
chand->InitTransport(Ref(), std::move(*channel), cq_idx, transport, |
|
|
|
|
channelz_socket_uuid); |
|
|
|
|
if (transport->server_transport() != nullptr) { |
|
|
|
|
// Take ownership
|
|
|
|
|
// TODO(ctiller): post-v3-transition make this method take an
|
|
|
|
|
// OrphanablePtr<ServerTransport> directly.
|
|
|
|
|
OrphanablePtr<ServerTransport> t(transport->server_transport()); |
|
|
|
|
auto destination = MakeCallDestination(args.SetObject(transport)); |
|
|
|
|
if (!destination.ok()) { |
|
|
|
|
return absl_status_to_grpc_error(destination.status()); |
|
|
|
|
} |
|
|
|
|
// TODO(ctiller): add channelz node
|
|
|
|
|
t->SetCallDestination(std::move(*destination)); |
|
|
|
|
MutexLock lock(&mu_global_); |
|
|
|
|
if (ShutdownCalled()) { |
|
|
|
|
t->DisconnectWithError(GRPC_ERROR_CREATE("Server shutdown")); |
|
|
|
|
} |
|
|
|
|
t->StartConnectivityWatch(MakeOrphanable<TransportConnectivityWatcher>( |
|
|
|
|
t->RefAsSubclass<ServerTransport>(), Ref())); |
|
|
|
|
gpr_log(GPR_INFO, "Adding connection"); |
|
|
|
|
connections_.emplace(std::move(t)); |
|
|
|
|
++connections_open_; |
|
|
|
|
} else { |
|
|
|
|
CHECK(transport->filter_stack_transport() != nullptr); |
|
|
|
|
absl::StatusOr<OrphanablePtr<Channel>> channel = LegacyChannel::Create( |
|
|
|
|
"", args.SetObject(transport), GRPC_SERVER_CHANNEL); |
|
|
|
|
if (!channel.ok()) { |
|
|
|
|
return absl_status_to_grpc_error(channel.status()); |
|
|
|
|
} |
|
|
|
|
ChannelData* chand = static_cast<ChannelData*>( |
|
|
|
|
grpc_channel_stack_element((*channel)->channel_stack(), 0) |
|
|
|
|
->channel_data); |
|
|
|
|
// Set up CQs.
|
|
|
|
|
size_t cq_idx; |
|
|
|
|
for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) { |
|
|
|
|
if (grpc_cq_pollset(cqs_[cq_idx]) == accepting_pollset) break; |
|
|
|
|
} |
|
|
|
|
if (cq_idx == cqs_.size()) { |
|
|
|
|
// Completion queue not found. Pick a random one to publish new calls to.
|
|
|
|
|
cq_idx = static_cast<size_t>(rand()) % std::max<size_t>(1, cqs_.size()); |
|
|
|
|
} |
|
|
|
|
intptr_t channelz_socket_uuid = 0; |
|
|
|
|
if (socket_node != nullptr) { |
|
|
|
|
channelz_socket_uuid = socket_node->uuid(); |
|
|
|
|
channelz_node_->AddChildSocket(socket_node); |
|
|
|
|
} |
|
|
|
|
// Initialize chand.
|
|
|
|
|
chand->InitTransport(Ref(), std::move(*channel), cq_idx, transport, |
|
|
|
|
channelz_socket_uuid); |
|
|
|
|
} |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool Server::HasOpenConnections() { |
|
|
|
|
MutexLock lock(&mu_global_); |
|
|
|
|
return !channels_.empty(); |
|
|
|
|
return !channels_.empty() || !connections_.empty(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Server::SetRegisteredMethodAllocator( |
|
|
|
@ -1023,16 +1096,18 @@ void Server::MaybeFinishShutdown() { |
|
|
|
|
MutexLock lock(&mu_call_); |
|
|
|
|
KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown")); |
|
|
|
|
} |
|
|
|
|
if (!channels_.empty() || listeners_destroyed_ < listeners_.size()) { |
|
|
|
|
if (!channels_.empty() || connections_open_ > 0 || |
|
|
|
|
listeners_destroyed_ < listeners_.size()) { |
|
|
|
|
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), |
|
|
|
|
last_shutdown_message_time_), |
|
|
|
|
gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { |
|
|
|
|
last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME); |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"Waiting for %" PRIuPTR " channels and %" PRIuPTR "/%" PRIuPTR |
|
|
|
|
"Waiting for %" PRIuPTR " channels %" PRIuPTR |
|
|
|
|
" connections and %" PRIuPTR "/%" PRIuPTR |
|
|
|
|
" listeners to be destroyed before shutting down server", |
|
|
|
|
channels_.size(), listeners_.size() - listeners_destroyed_, |
|
|
|
|
listeners_.size()); |
|
|
|
|
channels_.size(), connections_open_, |
|
|
|
|
listeners_.size() - listeners_destroyed_, listeners_.size()); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -1095,6 +1170,7 @@ void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) { |
|
|
|
|
// -- Once there are no more calls in progress, the channel is closed.
|
|
|
|
|
void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) { |
|
|
|
|
ChannelBroadcaster broadcaster; |
|
|
|
|
absl::flat_hash_set<OrphanablePtr<ServerTransport>> removing_connections; |
|
|
|
|
{ |
|
|
|
|
// Wait for startup to be finished. Locks mu_global.
|
|
|
|
|
MutexLock lock(&mu_global_); |
|
|
|
@ -1114,6 +1190,7 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) { |
|
|
|
|
} |
|
|
|
|
last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME); |
|
|
|
|
broadcaster.FillChannelsLocked(GetChannelsLocked()); |
|
|
|
|
removing_connections.swap(connections_); |
|
|
|
|
// Collect all unregistered then registered calls.
|
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&mu_call_); |
|
|
|
@ -1300,17 +1377,6 @@ Server::ChannelData::~ChannelData() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Arena* Server::ChannelData::CreateArena() { return channel_->CreateArena(); } |
|
|
|
|
|
|
|
|
|
absl::StatusOr<CallInitiator> Server::ChannelData::CreateCall( |
|
|
|
|
ClientMetadataHandle client_initial_metadata, Arena* arena) { |
|
|
|
|
SetRegisteredMethodOnMetadata(*client_initial_metadata); |
|
|
|
|
auto call = MakeServerCall(std::move(client_initial_metadata), server_.get(), |
|
|
|
|
channel_.get(), arena); |
|
|
|
|
InitCall(call); |
|
|
|
|
return CallInitiator(std::move(call)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Server::ChannelData::InitTransport(RefCountedPtr<Server> server, |
|
|
|
|
OrphanablePtr<Channel> channel, |
|
|
|
|
size_t cq_idx, Transport* transport, |
|
|
|
@ -1327,22 +1393,15 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server, |
|
|
|
|
} |
|
|
|
|
// Start accept_stream transport op.
|
|
|
|
|
grpc_transport_op* op = grpc_make_transport_op(nullptr); |
|
|
|
|
int accept_stream_types = 0; |
|
|
|
|
if (transport->filter_stack_transport() != nullptr) { |
|
|
|
|
++accept_stream_types; |
|
|
|
|
op->set_accept_stream = true; |
|
|
|
|
op->set_accept_stream_fn = AcceptStream; |
|
|
|
|
op->set_registered_method_matcher_fn = [](void* arg, |
|
|
|
|
ClientMetadata* metadata) { |
|
|
|
|
static_cast<ChannelData*>(arg)->SetRegisteredMethodOnMetadata(*metadata); |
|
|
|
|
}; |
|
|
|
|
op->set_accept_stream_user_data = this; |
|
|
|
|
} |
|
|
|
|
if (transport->server_transport() != nullptr) { |
|
|
|
|
++accept_stream_types; |
|
|
|
|
transport->server_transport()->SetAcceptor(this); |
|
|
|
|
} |
|
|
|
|
CHECK_EQ(accept_stream_types, 1); |
|
|
|
|
CHECK(transport->filter_stack_transport() != nullptr); |
|
|
|
|
op->set_accept_stream = true; |
|
|
|
|
op->set_accept_stream_fn = AcceptStream; |
|
|
|
|
op->set_registered_method_matcher_fn = [](void* arg, |
|
|
|
|
ClientMetadata* metadata) { |
|
|
|
|
static_cast<ChannelData*>(arg)->server_->SetRegisteredMethodOnMetadata( |
|
|
|
|
*metadata); |
|
|
|
|
}; |
|
|
|
|
op->set_accept_stream_user_data = this; |
|
|
|
|
op->start_connectivity_watch = MakeOrphanable<ConnectivityWatcher>(this); |
|
|
|
|
if (server_->ShutdownCalled()) { |
|
|
|
|
op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); |
|
|
|
@ -1350,24 +1409,23 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server, |
|
|
|
|
transport->PerformOp(op); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Server::RegisteredMethod* Server::ChannelData::GetRegisteredMethod( |
|
|
|
|
Server::RegisteredMethod* Server::GetRegisteredMethod( |
|
|
|
|
const absl::string_view& host, const absl::string_view& path) { |
|
|
|
|
if (server_->registered_methods_.empty()) return nullptr; |
|
|
|
|
if (registered_methods_.empty()) return nullptr; |
|
|
|
|
// check for an exact match with host
|
|
|
|
|
auto it = server_->registered_methods_.find(std::make_pair(host, path)); |
|
|
|
|
if (it != server_->registered_methods_.end()) { |
|
|
|
|
auto it = registered_methods_.find(std::make_pair(host, path)); |
|
|
|
|
if (it != registered_methods_.end()) { |
|
|
|
|
return it->second.get(); |
|
|
|
|
} |
|
|
|
|
// check for wildcard method definition (no host set)
|
|
|
|
|
it = server_->registered_methods_.find(std::make_pair("", path)); |
|
|
|
|
if (it != server_->registered_methods_.end()) { |
|
|
|
|
it = registered_methods_.find(std::make_pair("", path)); |
|
|
|
|
if (it != registered_methods_.end()) { |
|
|
|
|
return it->second.get(); |
|
|
|
|
} |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Server::ChannelData::SetRegisteredMethodOnMetadata( |
|
|
|
|
ClientMetadata& metadata) { |
|
|
|
|
void Server::SetRegisteredMethodOnMetadata(ClientMetadata& metadata) { |
|
|
|
|
auto* authority = metadata.get_pointer(HttpAuthorityMetadata()); |
|
|
|
|
if (authority == nullptr) { |
|
|
|
|
authority = metadata.get_pointer(HostMetadata()); |
|
|
|
@ -1403,188 +1461,14 @@ void Server::ChannelData::AcceptStream(void* arg, Transport* /*transport*/, |
|
|
|
|
grpc_call* call; |
|
|
|
|
grpc_error_handle error = grpc_call_create(&args, &call); |
|
|
|
|
grpc_call_stack* call_stack = grpc_call_get_call_stack(call); |
|
|
|
|
if (call_stack == nullptr) { // Promise based calls do not have a call stack
|
|
|
|
|
CHECK(error.ok()); |
|
|
|
|
CHECK(IsPromiseBasedServerCallEnabled()); |
|
|
|
|
CHECK_NE(call_stack, nullptr); |
|
|
|
|
grpc_call_element* elem = grpc_call_stack_element(call_stack, 0); |
|
|
|
|
auto* calld = static_cast<Server::CallData*>(elem->call_data); |
|
|
|
|
if (!error.ok()) { |
|
|
|
|
calld->FailCallCreation(); |
|
|
|
|
return; |
|
|
|
|
} else { |
|
|
|
|
grpc_call_element* elem = grpc_call_stack_element(call_stack, 0); |
|
|
|
|
auto* calld = static_cast<Server::CallData*>(elem->call_data); |
|
|
|
|
if (!error.ok()) { |
|
|
|
|
calld->FailCallCreation(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
calld->Start(elem); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
auto CancelledDueToServerShutdown() { |
|
|
|
|
return [] { |
|
|
|
|
return ServerMetadataFromStatus(absl::CancelledError("Server shutdown")); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
void Server::ChannelData::InitCall(RefCountedPtr<CallSpineInterface> call) { |
|
|
|
|
call->SpawnGuarded("request_matcher", [this, call]() { |
|
|
|
|
return TrySeq( |
|
|
|
|
// Wait for initial metadata to pass through all filters
|
|
|
|
|
Map(call->PullClientInitialMetadata(), |
|
|
|
|
[](ValueOrFailure<ClientMetadataHandle> md) |
|
|
|
|
-> absl::StatusOr<ClientMetadataHandle> { |
|
|
|
|
if (!md.ok()) { |
|
|
|
|
return absl::InternalError("Missing metadata"); |
|
|
|
|
} |
|
|
|
|
if (!md.value()->get_pointer(HttpPathMetadata())) { |
|
|
|
|
return absl::InternalError("Missing :path header"); |
|
|
|
|
} |
|
|
|
|
if (!md.value()->get_pointer(HttpAuthorityMetadata())) { |
|
|
|
|
return absl::InternalError("Missing :authority header"); |
|
|
|
|
} |
|
|
|
|
return std::move(*md); |
|
|
|
|
}), |
|
|
|
|
// Match request with requested call
|
|
|
|
|
[this, call](ClientMetadataHandle md) { |
|
|
|
|
auto* registered_method = static_cast<RegisteredMethod*>( |
|
|
|
|
md->get(GrpcRegisteredMethod()).value_or(nullptr)); |
|
|
|
|
RequestMatcherInterface* rm; |
|
|
|
|
grpc_server_register_method_payload_handling payload_handling = |
|
|
|
|
GRPC_SRM_PAYLOAD_NONE; |
|
|
|
|
if (registered_method == nullptr) { |
|
|
|
|
rm = server_->unregistered_request_matcher_.get(); |
|
|
|
|
} else { |
|
|
|
|
payload_handling = registered_method->payload_handling; |
|
|
|
|
rm = registered_method->matcher.get(); |
|
|
|
|
} |
|
|
|
|
auto maybe_read_first_message = If( |
|
|
|
|
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, |
|
|
|
|
[call]() { return call->PullClientToServerMessage(); }, |
|
|
|
|
[]() -> ValueOrFailure<absl::optional<MessageHandle>> { |
|
|
|
|
return ValueOrFailure<absl::optional<MessageHandle>>( |
|
|
|
|
absl::nullopt); |
|
|
|
|
}); |
|
|
|
|
return TryJoin<absl::StatusOr>( |
|
|
|
|
std::move(maybe_read_first_message), rm->MatchRequest(cq_idx()), |
|
|
|
|
[md = std::move(md)]() mutable { |
|
|
|
|
return ValueOrFailure<ClientMetadataHandle>(std::move(md)); |
|
|
|
|
}); |
|
|
|
|
}, |
|
|
|
|
// Publish call to cq
|
|
|
|
|
[](std::tuple<absl::optional<MessageHandle>, |
|
|
|
|
RequestMatcherInterface::MatchResult, |
|
|
|
|
ClientMetadataHandle> |
|
|
|
|
r) { |
|
|
|
|
RequestMatcherInterface::MatchResult& mr = std::get<1>(r); |
|
|
|
|
auto md = std::move(std::get<2>(r)); |
|
|
|
|
auto* rc = mr.TakeCall(); |
|
|
|
|
rc->Complete(std::move(std::get<0>(r)), *md); |
|
|
|
|
auto* call_context = GetContext<CallContext>(); |
|
|
|
|
const auto* deadline = md->get_pointer(GrpcTimeoutMetadata()); |
|
|
|
|
if (deadline != nullptr) { |
|
|
|
|
GetContext<Call>()->UpdateDeadline(*deadline); |
|
|
|
|
} |
|
|
|
|
*rc->call = call_context->c_call(); |
|
|
|
|
grpc_call_ref(*rc->call); |
|
|
|
|
grpc_call_set_completion_queue(call_context->c_call(), |
|
|
|
|
rc->cq_bound_to_call); |
|
|
|
|
call_context->server_call_context()->PublishInitialMetadata( |
|
|
|
|
std::move(md), rc->initial_metadata); |
|
|
|
|
// TODO(ctiller): publish metadata
|
|
|
|
|
return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()), |
|
|
|
|
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) { |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise( |
|
|
|
|
grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) { |
|
|
|
|
auto* chand = static_cast<Server::ChannelData*>(elem->channel_data); |
|
|
|
|
auto* server = chand->server_.get(); |
|
|
|
|
if (server->ShutdownCalled()) return CancelledDueToServerShutdown(); |
|
|
|
|
auto cleanup_ref = |
|
|
|
|
absl::MakeCleanup([server] { server->ShutdownUnrefOnRequest(); }); |
|
|
|
|
if (!server->ShutdownRefOnRequest()) return CancelledDueToServerShutdown(); |
|
|
|
|
auto path_ptr = |
|
|
|
|
call_args.client_initial_metadata->get_pointer(HttpPathMetadata()); |
|
|
|
|
if (path_ptr == nullptr) { |
|
|
|
|
return [] { |
|
|
|
|
return ServerMetadataFromStatus( |
|
|
|
|
absl::InternalError("Missing :path header")); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
auto host_ptr = |
|
|
|
|
call_args.client_initial_metadata->get_pointer(HttpAuthorityMetadata()); |
|
|
|
|
if (host_ptr == nullptr) { |
|
|
|
|
return [] { |
|
|
|
|
return ServerMetadataFromStatus( |
|
|
|
|
absl::InternalError("Missing :authority header")); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
// Find request matcher.
|
|
|
|
|
RequestMatcherInterface* matcher; |
|
|
|
|
RegisteredMethod* rm = static_cast<RegisteredMethod*>( |
|
|
|
|
call_args.client_initial_metadata->get(GrpcRegisteredMethod()) |
|
|
|
|
.value_or(nullptr)); |
|
|
|
|
ArenaPromise<absl::StatusOr<NextResult<MessageHandle>>> |
|
|
|
|
maybe_read_first_message([] { return NextResult<MessageHandle>(); }); |
|
|
|
|
if (rm != nullptr) { |
|
|
|
|
matcher = rm->matcher.get(); |
|
|
|
|
switch (rm->payload_handling) { |
|
|
|
|
case GRPC_SRM_PAYLOAD_NONE: |
|
|
|
|
break; |
|
|
|
|
case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: |
|
|
|
|
maybe_read_first_message = |
|
|
|
|
Map(call_args.client_to_server_messages->Next(), |
|
|
|
|
[](NextResult<MessageHandle> msg) |
|
|
|
|
-> absl::StatusOr<NextResult<MessageHandle>> { |
|
|
|
|
return std::move(msg); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
matcher = server->unregistered_request_matcher_.get(); |
|
|
|
|
} |
|
|
|
|
return TrySeq( |
|
|
|
|
std::move(maybe_read_first_message), |
|
|
|
|
[cleanup_ref = std::move(cleanup_ref), matcher, |
|
|
|
|
chand](NextResult<MessageHandle> payload) mutable { |
|
|
|
|
return Map( |
|
|
|
|
[cleanup_ref = std::move(cleanup_ref), |
|
|
|
|
mr = matcher->MatchRequest(chand->cq_idx())]() mutable { |
|
|
|
|
return mr(); |
|
|
|
|
}, |
|
|
|
|
[payload = std::move(payload)]( |
|
|
|
|
absl::StatusOr<RequestMatcherInterface::MatchResult> mr) mutable |
|
|
|
|
-> absl::StatusOr<std::pair<RequestMatcherInterface::MatchResult, |
|
|
|
|
NextResult<MessageHandle>>> { |
|
|
|
|
if (!mr.ok()) return mr.status(); |
|
|
|
|
return std::make_pair(std::move(*mr), std::move(payload)); |
|
|
|
|
}); |
|
|
|
|
}, |
|
|
|
|
[call_args = |
|
|
|
|
std::move(call_args)](std::pair<RequestMatcherInterface::MatchResult, |
|
|
|
|
NextResult<MessageHandle>> |
|
|
|
|
r) mutable { |
|
|
|
|
auto& mr = r.first; |
|
|
|
|
auto& payload = r.second; |
|
|
|
|
auto* rc = mr.TakeCall(); |
|
|
|
|
auto* cq_for_new_request = mr.cq(); |
|
|
|
|
auto* server_call_context = |
|
|
|
|
GetContext<CallContext>()->server_call_context(); |
|
|
|
|
rc->Complete(std::move(payload), *call_args.client_initial_metadata); |
|
|
|
|
server_call_context->PublishInitialMetadata( |
|
|
|
|
std::move(call_args.client_initial_metadata), rc->initial_metadata); |
|
|
|
|
return server_call_context->MakeTopOfServerCallPromise( |
|
|
|
|
std::move(call_args), rc->cq_bound_to_call, |
|
|
|
|
[rc, cq_for_new_request](grpc_call* call) { |
|
|
|
|
*rc->call = call; |
|
|
|
|
grpc_cq_end_op(cq_for_new_request, rc->tag, absl::OkStatus(), |
|
|
|
|
Server::DoneRequestEvent, rc, &rc->completion, |
|
|
|
|
true); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
calld->Start(elem); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Server::ChannelData::FinishDestroy(void* arg, |
|
|
|
|