pull/36509/head
Craig Tiller 10 months ago
parent 87fc5d95cb
commit ee3d8cb453
  1. 7
      src/core/ext/transport/chaotic_good/server_transport.cc
  2. 2
      src/core/lib/experiments/rollouts.yaml
  3. 2
      src/core/lib/surface/call.cc
  4. 64
      src/core/lib/surface/server.cc
  5. 1
      src/core/plugin_registry/grpc_plugin_registry.cc
  6. 1
      test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong_chaotic_good.cc

@ -355,12 +355,13 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
PromiseEndpoint data_endpoint,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
HPackParser hpack_parser, HPackCompressor hpack_encoder)
: outgoing_frames_(4),
call_arena_allocator_(MakeRefCounted<CallArenaAllocator>(
: call_arena_allocator_(MakeRefCounted<CallArenaAllocator>(
args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryAllocator("chaotic-good"),
1024)) {
1024)),
event_engine_(event_engine),
outgoing_frames_(4) {
auto transport = MakeRefCounted<ChaoticGoodTransport>(
std::move(control_endpoint), std::move(data_endpoint),
std::move(hpack_parser), std::move(hpack_encoder));

@ -96,8 +96,6 @@
ios: broken
windows: broken
posix: false
- name: promise_based_server_call
default: false
- name: rstpit
default: false
- name: schedule_cancellation_over_write

@ -3162,6 +3162,8 @@ class ServerCall final : public Call, public DualRefCounted<ServerCall> {
client_initial_metadata_stored_(std::move(client_initial_metadata)),
cq_(cq),
server_(server) {
call_handler_.legacy_context()[GRPC_CONTEXT_CALL].value =
static_cast<Call*>(this);
global_stats().IncrementServerCallsCreated();
}

@ -235,7 +235,8 @@ struct Server::RequestedCall {
template <typename OptionalPayload>
void Complete(OptionalPayload payload, ClientMetadata& md) {
Timestamp deadline = GetContext<Call>()->deadline();
Timestamp deadline =
md.get(GrpcTimeoutMetadata()).value_or(Timestamp::InfFuture());
switch (type) {
case RequestedCall::Type::BATCH_CALL:
GPR_ASSERT(!payload.has_value());
@ -426,8 +427,59 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
calld->Publish(cq_idx, rc);
}
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(size_t) override {
Crash("not implemented for filter stack request matcher");
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
size_t start_request_queue_index) override {
for (size_t i = 0; i < requests_per_cq_.size(); i++) {
size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
RequestedCall* rc =
reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
if (rc != nullptr) {
return Immediate(MatchResult(server(), cq_idx, rc));
}
}
// No cq to take the request found; queue it on the slow list.
// We need to ensure that all the queues are empty. We do this under
// the server mu_call_ lock to ensure that if something is added to
// an empty request queue, it will block until the call is actually
// added to the pending list.
RequestedCall* rc = nullptr;
size_t cq_idx = 0;
size_t loop_count;
{
std::vector<std::shared_ptr<ActivityWaiter>> removed_pending;
MutexLock lock(&server_->mu_call_);
while (!pending_promises_.empty() &&
pending_promises_.front()->Age() >
server_->max_time_in_pending_queue_) {
removed_pending.push_back(std::move(pending_promises_.front()));
pending_promises_.pop();
}
for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
cq_idx =
(start_request_queue_index + loop_count) % requests_per_cq_.size();
rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
if (rc != nullptr) break;
}
if (rc == nullptr) {
if (server_->pending_backlog_protector_.Reject(pending_promises_.size(),
server_->bitgen_)) {
return Immediate(absl::ResourceExhaustedError(
"Too many pending requests for this server"));
}
auto w = std::make_shared<ActivityWaiter>(
GetContext<Activity>()->MakeOwningWaker());
pending_promises_.push(w);
return OnCancel(
[w]() -> Poll<absl::StatusOr<MatchResult>> {
std::unique_ptr<absl::StatusOr<MatchResult>> r(
w->result.exchange(nullptr, std::memory_order_acq_rel));
if (r == nullptr) return Pending{};
return std::move(*r);
},
[w]() { w->Expire(); });
}
}
return Immediate(MatchResult(server(), cq_idx, rc));
}
Server* server() const final { return server_; }
@ -886,11 +938,15 @@ grpc_error_handle Server::SetupTransport(
channelz_node_->AddChildSocket(socket_node);
}
if (transport->server_transport() != nullptr) {
// Take ownership
// TODO(ctiller): post-v3-transition make this method take an
// OrphanablePtr<ServerTransport> directly.
OrphanablePtr<ServerTransport> t(transport->server_transport());
auto destination = MakeCallDestination(args.SetObject(transport));
if (!destination.ok()) {
return absl_status_to_grpc_error(destination.status());
}
transport->server_transport()->SetCallDestination(std::move(*destination));
t->SetCallDestination(std::move(*destination));
} else {
GPR_ASSERT(transport->filter_stack_transport() != nullptr);
absl::StatusOr<OrphanablePtr<Channel>> channel = LegacyChannel::Create(

@ -83,6 +83,7 @@ void RegisterBuiltins(CoreConfiguration::Builder* builder) {
.Terminal();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &Server::kServerTopFilter)
.SkipV3()
.BeforeAll();
}

@ -111,7 +111,6 @@ int main(int argc, char** argv) {
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);
grpc_core::ForceEnableExperiment("promise_based_client_call", true);
grpc_core::ForceEnableExperiment("promise_based_server_call", true);
grpc_core::ForceEnableExperiment("chaotic_good", true);
grpc::testing::TestEnvironment env(&argc, argv);
LibraryInitializer libInit;

Loading…
Cancel
Save