|
|
|
@ -25,6 +25,7 @@ |
|
|
|
|
#include <algorithm> |
|
|
|
|
#include <atomic> |
|
|
|
|
#include <list> |
|
|
|
|
#include <memory> |
|
|
|
|
#include <new> |
|
|
|
|
#include <queue> |
|
|
|
|
#include <type_traits> |
|
|
|
@ -35,7 +36,6 @@ |
|
|
|
|
#include "absl/container/flat_hash_map.h" |
|
|
|
|
#include "absl/status/status.h" |
|
|
|
|
#include "absl/types/optional.h" |
|
|
|
|
#include "absl/types/variant.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/byte_buffer.h> |
|
|
|
|
#include <grpc/grpc.h> |
|
|
|
@ -55,12 +55,12 @@ |
|
|
|
|
#include "src/core/lib/gpr/useful.h" |
|
|
|
|
#include "src/core/lib/gprpp/crash.h" |
|
|
|
|
#include "src/core/lib/gprpp/debug_location.h" |
|
|
|
|
#include "src/core/lib/gprpp/match.h" |
|
|
|
|
#include "src/core/lib/gprpp/mpscq.h" |
|
|
|
|
#include "src/core/lib/gprpp/status_helper.h" |
|
|
|
|
#include "src/core/lib/iomgr/exec_ctx.h" |
|
|
|
|
#include "src/core/lib/iomgr/pollset_set.h" |
|
|
|
|
#include "src/core/lib/promise/activity.h" |
|
|
|
|
#include "src/core/lib/promise/cancel_callback.h" |
|
|
|
|
#include "src/core/lib/promise/context.h" |
|
|
|
|
#include "src/core/lib/promise/map.h" |
|
|
|
|
#include "src/core/lib/promise/pipe.h" |
|
|
|
@ -253,12 +253,12 @@ class Server::RequestMatcherInterface { |
|
|
|
|
// application to explicitly request RPCs and then matching those to incoming
|
|
|
|
|
// RPCs, along with a slow path by which incoming RPCs are put on a locked
|
|
|
|
|
// pending list if they aren't able to be matched to an application request.
|
|
|
|
|
class Server::RealRequestMatcher : public RequestMatcherInterface { |
|
|
|
|
class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface { |
|
|
|
|
public: |
|
|
|
|
explicit RealRequestMatcher(Server* server) |
|
|
|
|
explicit RealRequestMatcherFilterStack(Server* server) |
|
|
|
|
: server_(server), requests_per_cq_(server->cqs_.size()) {} |
|
|
|
|
|
|
|
|
|
~RealRequestMatcher() override { |
|
|
|
|
~RealRequestMatcherFilterStack() override { |
|
|
|
|
for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { |
|
|
|
|
GPR_ASSERT(queue.Pop() == nullptr); |
|
|
|
|
} |
|
|
|
@ -266,15 +266,8 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { |
|
|
|
|
|
|
|
|
|
void ZombifyPending() override { |
|
|
|
|
while (!pending_.empty()) { |
|
|
|
|
Match( |
|
|
|
|
pending_.front(), |
|
|
|
|
[](CallData* calld) { |
|
|
|
|
calld->SetState(CallData::CallState::ZOMBIED); |
|
|
|
|
calld->KillZombie(); |
|
|
|
|
}, |
|
|
|
|
[](const std::shared_ptr<ActivityWaiter>& w) { |
|
|
|
|
w->Finish(absl::InternalError("Server closed")); |
|
|
|
|
}); |
|
|
|
|
pending_.front().calld->SetState(CallData::CallState::ZOMBIED); |
|
|
|
|
pending_.front().calld->KillZombie(); |
|
|
|
|
pending_.pop(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -300,39 +293,34 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { |
|
|
|
|
// matching calls
|
|
|
|
|
struct NextPendingCall { |
|
|
|
|
RequestedCall* rc = nullptr; |
|
|
|
|
PendingCall pending; |
|
|
|
|
CallData* pending; |
|
|
|
|
}; |
|
|
|
|
auto pop_next_pending = [this, request_queue_index] { |
|
|
|
|
while (true) { |
|
|
|
|
NextPendingCall pending_call; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&server_->mu_call_); |
|
|
|
|
while (!pending_.empty() && |
|
|
|
|
pending_.front().Age() > server_->max_time_in_pending_queue_) { |
|
|
|
|
pending_.pop(); |
|
|
|
|
} |
|
|
|
|
if (!pending_.empty()) { |
|
|
|
|
pending_call.rc = reinterpret_cast<RequestedCall*>( |
|
|
|
|
requests_per_cq_[request_queue_index].Pop()); |
|
|
|
|
if (pending_call.rc != nullptr) { |
|
|
|
|
pending_call.pending = std::move(pending_.front()); |
|
|
|
|
pending_call.pending = pending_.front().calld; |
|
|
|
|
pending_.pop(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return pending_call; |
|
|
|
|
}; |
|
|
|
|
while (true) { |
|
|
|
|
NextPendingCall next_pending = pop_next_pending(); |
|
|
|
|
if (next_pending.rc == nullptr) break; |
|
|
|
|
Match( |
|
|
|
|
next_pending.pending, |
|
|
|
|
[&](CallData* calld) { |
|
|
|
|
if (!calld->MaybeActivate()) { |
|
|
|
|
// Zombied Call
|
|
|
|
|
calld->KillZombie(); |
|
|
|
|
} else { |
|
|
|
|
calld->Publish(request_queue_index, next_pending.rc); |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
[&](const std::shared_ptr<ActivityWaiter>& w) { |
|
|
|
|
w->Finish(server(), request_queue_index, next_pending.rc); |
|
|
|
|
}); |
|
|
|
|
if (pending_call.rc == nullptr) break; |
|
|
|
|
if (!pending_call.pending->MaybeActivate()) { |
|
|
|
|
// Zombied Call
|
|
|
|
|
pending_call.pending->KillZombie(); |
|
|
|
|
requests_per_cq_[request_queue_index].Push( |
|
|
|
|
&pending_call.rc->mpscq_node); |
|
|
|
|
} else { |
|
|
|
|
pending_call.pending->Publish(request_queue_index, pending_call.rc); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -369,7 +357,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { |
|
|
|
|
} |
|
|
|
|
if (rc == nullptr) { |
|
|
|
|
calld->SetState(CallData::CallState::PENDING); |
|
|
|
|
pending_.push(calld); |
|
|
|
|
pending_.push(PendingCall{calld}); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -377,6 +365,91 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { |
|
|
|
|
calld->Publish(cq_idx, rc); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(size_t) override { |
|
|
|
|
Crash("not implemented for filter stack request matcher"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Server* server() const final { return server_; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
Server* const server_; |
|
|
|
|
struct PendingCall { |
|
|
|
|
CallData* calld; |
|
|
|
|
Timestamp created = Timestamp::Now(); |
|
|
|
|
Duration Age() { return Timestamp::Now() - created; } |
|
|
|
|
}; |
|
|
|
|
std::queue<PendingCall> pending_; |
|
|
|
|
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class Server::RealRequestMatcherPromises : public RequestMatcherInterface { |
|
|
|
|
public: |
|
|
|
|
explicit RealRequestMatcherPromises(Server* server) |
|
|
|
|
: server_(server), requests_per_cq_(server->cqs_.size()) {} |
|
|
|
|
|
|
|
|
|
~RealRequestMatcherPromises() override { |
|
|
|
|
for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) { |
|
|
|
|
GPR_ASSERT(queue.Pop() == nullptr); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ZombifyPending() override { |
|
|
|
|
while (!pending_.empty()) { |
|
|
|
|
pending_.front()->Finish(absl::InternalError("Server closed")); |
|
|
|
|
pending_.pop(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void KillRequests(grpc_error_handle error) override { |
|
|
|
|
for (size_t i = 0; i < requests_per_cq_.size(); i++) { |
|
|
|
|
RequestedCall* rc; |
|
|
|
|
while ((rc = reinterpret_cast<RequestedCall*>( |
|
|
|
|
requests_per_cq_[i].Pop())) != nullptr) { |
|
|
|
|
server_->FailCall(i, rc, error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
size_t request_queue_count() const override { |
|
|
|
|
return requests_per_cq_.size(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RequestCallWithPossiblePublish(size_t request_queue_index, |
|
|
|
|
RequestedCall* call) override { |
|
|
|
|
if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) { |
|
|
|
|
// this was the first queued request: we need to lock and start
|
|
|
|
|
// matching calls
|
|
|
|
|
struct NextPendingCall { |
|
|
|
|
RequestedCall* rc = nullptr; |
|
|
|
|
PendingCall pending; |
|
|
|
|
}; |
|
|
|
|
while (true) { |
|
|
|
|
NextPendingCall pending_call; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&server_->mu_call_); |
|
|
|
|
if (!pending_.empty()) { |
|
|
|
|
pending_call.rc = reinterpret_cast<RequestedCall*>( |
|
|
|
|
requests_per_cq_[request_queue_index].Pop()); |
|
|
|
|
if (pending_call.rc != nullptr) { |
|
|
|
|
pending_call.pending = std::move(pending_.front()); |
|
|
|
|
pending_.pop(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (pending_call.rc == nullptr) break; |
|
|
|
|
if (!pending_call.pending->Finish(server(), request_queue_index, |
|
|
|
|
pending_call.rc)) { |
|
|
|
|
requests_per_cq_[request_queue_index].Push( |
|
|
|
|
&pending_call.rc->mpscq_node); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void MatchOrQueue(size_t, CallData*) override { |
|
|
|
|
Crash("not implemented for promises"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest( |
|
|
|
|
size_t start_request_queue_index) override { |
|
|
|
|
for (size_t i = 0; i < requests_per_cq_.size(); i++) { |
|
|
|
@ -396,25 +469,36 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { |
|
|
|
|
size_t cq_idx = 0; |
|
|
|
|
size_t loop_count; |
|
|
|
|
{ |
|
|
|
|
std::vector<std::shared_ptr<ActivityWaiter>> removed_pending; |
|
|
|
|
MutexLock lock(&server_->mu_call_); |
|
|
|
|
while (!pending_.empty() && |
|
|
|
|
pending_.front()->Age() > server_->max_time_in_pending_queue_) { |
|
|
|
|
removed_pending.push_back(std::move(pending_.front())); |
|
|
|
|
pending_.pop(); |
|
|
|
|
} |
|
|
|
|
for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) { |
|
|
|
|
cq_idx = |
|
|
|
|
(start_request_queue_index + loop_count) % requests_per_cq_.size(); |
|
|
|
|
rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop()); |
|
|
|
|
if (rc != nullptr) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
if (rc != nullptr) break; |
|
|
|
|
} |
|
|
|
|
if (rc == nullptr) { |
|
|
|
|
if (server_->pending_backlog_protector_.Reject(pending_.size(), |
|
|
|
|
server_->bitgen_)) { |
|
|
|
|
return Immediate(absl::ResourceExhaustedError( |
|
|
|
|
"Too many pending requests for this server")); |
|
|
|
|
} |
|
|
|
|
auto w = std::make_shared<ActivityWaiter>( |
|
|
|
|
Activity::current()->MakeOwningWaker()); |
|
|
|
|
pending_.push(w); |
|
|
|
|
return [w]() -> Poll<absl::StatusOr<MatchResult>> { |
|
|
|
|
std::unique_ptr<absl::StatusOr<MatchResult>> r( |
|
|
|
|
w->result.exchange(nullptr, std::memory_order_acq_rel)); |
|
|
|
|
if (r == nullptr) return Pending{}; |
|
|
|
|
return std::move(*r); |
|
|
|
|
}; |
|
|
|
|
return OnCancel( |
|
|
|
|
[w]() -> Poll<absl::StatusOr<MatchResult>> { |
|
|
|
|
std::unique_ptr<absl::StatusOr<MatchResult>> r( |
|
|
|
|
w->result.exchange(nullptr, std::memory_order_acq_rel)); |
|
|
|
|
if (r == nullptr) return Pending{}; |
|
|
|
|
return std::move(*r); |
|
|
|
|
}, |
|
|
|
|
[w]() { w->Expire(); }); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return Immediate(MatchResult(server(), cq_idx, rc)); |
|
|
|
@ -425,23 +509,40 @@ class Server::RealRequestMatcher : public RequestMatcherInterface { |
|
|
|
|
private: |
|
|
|
|
Server* const server_; |
|
|
|
|
struct ActivityWaiter { |
|
|
|
|
using ResultType = absl::StatusOr<MatchResult>; |
|
|
|
|
explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {} |
|
|
|
|
~ActivityWaiter() { delete result.load(std::memory_order_acquire); } |
|
|
|
|
void Finish(absl::Status status) { |
|
|
|
|
result.store(new absl::StatusOr<MatchResult>(std::move(status)), |
|
|
|
|
std::memory_order_release); |
|
|
|
|
waker.Wakeup(); |
|
|
|
|
delete result.exchange(new ResultType(std::move(status)), |
|
|
|
|
std::memory_order_acq_rel); |
|
|
|
|
waker.WakeupAsync(); |
|
|
|
|
} |
|
|
|
|
void Finish(Server* server, size_t cq_idx, RequestedCall* requested_call) { |
|
|
|
|
result.store(new absl::StatusOr<MatchResult>( |
|
|
|
|
MatchResult(server, cq_idx, requested_call)), |
|
|
|
|
std::memory_order_release); |
|
|
|
|
waker.Wakeup(); |
|
|
|
|
// Returns true if requested_call consumed, false otherwise.
|
|
|
|
|
GRPC_MUST_USE_RESULT bool Finish(Server* server, size_t cq_idx, |
|
|
|
|
RequestedCall* requested_call) { |
|
|
|
|
ResultType* expected = nullptr; |
|
|
|
|
ResultType* new_value = |
|
|
|
|
new ResultType(MatchResult(server, cq_idx, requested_call)); |
|
|
|
|
if (!result.compare_exchange_strong(expected, new_value, |
|
|
|
|
std::memory_order_acq_rel, |
|
|
|
|
std::memory_order_acquire)) { |
|
|
|
|
GPR_ASSERT(new_value->value().TakeCall() == requested_call); |
|
|
|
|
delete new_value; |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
waker.WakeupAsync(); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
void Expire() { |
|
|
|
|
delete result.exchange(new ResultType(absl::CancelledError()), |
|
|
|
|
std::memory_order_acq_rel); |
|
|
|
|
} |
|
|
|
|
Duration Age() { return Timestamp::Now() - created; } |
|
|
|
|
Waker waker; |
|
|
|
|
std::atomic<absl::StatusOr<MatchResult>*> result{nullptr}; |
|
|
|
|
std::atomic<ResultType*> result{nullptr}; |
|
|
|
|
const Timestamp created = Timestamp::Now(); |
|
|
|
|
}; |
|
|
|
|
using PendingCall = absl::variant<CallData*, std::shared_ptr<ActivityWaiter>>; |
|
|
|
|
using PendingCall = std::shared_ptr<ActivityWaiter>; |
|
|
|
|
std::queue<PendingCall> pending_; |
|
|
|
|
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_; |
|
|
|
|
}; |
|
|
|
@ -716,6 +817,15 @@ void Server::AddListener(OrphanablePtr<ListenerInterface> listener) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Server::Start() { |
|
|
|
|
auto make_real_request_matcher = |
|
|
|
|
[this]() -> std::unique_ptr<RequestMatcherInterface> { |
|
|
|
|
if (IsPromiseBasedServerCallEnabled()) { |
|
|
|
|
return std::make_unique<RealRequestMatcherPromises>(this); |
|
|
|
|
} else { |
|
|
|
|
return std::make_unique<RealRequestMatcherFilterStack>(this); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
started_ = true; |
|
|
|
|
for (grpc_completion_queue* cq : cqs_) { |
|
|
|
|
if (grpc_cq_can_listen(cq)) { |
|
|
|
@ -723,11 +833,11 @@ void Server::Start() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (unregistered_request_matcher_ == nullptr) { |
|
|
|
|
unregistered_request_matcher_ = std::make_unique<RealRequestMatcher>(this); |
|
|
|
|
unregistered_request_matcher_ = make_real_request_matcher(); |
|
|
|
|
} |
|
|
|
|
for (std::unique_ptr<RegisteredMethod>& rm : registered_methods_) { |
|
|
|
|
if (rm->matcher == nullptr) { |
|
|
|
|
rm->matcher = std::make_unique<RealRequestMatcher>(this); |
|
|
|
|
rm->matcher = make_real_request_matcher(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|