mirror of https://github.com/grpc/grpc.git
commit
f6b69e3048
1472 changed files with 9722 additions and 7362 deletions
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,126 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2024 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <cstddef> |
||||
#include <ostream> |
||||
#include <string> |
||||
|
||||
#include "absl/flags/flag.h" |
||||
#include "absl/flags/parse.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpcpp/ext/proto_server_reflection_plugin.h> |
||||
#include <grpcpp/grpcpp.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/protos/helloworld.grpc.pb.h" |
||||
#else |
||||
#include "helloworld.grpc.pb.h" |
||||
#endif |
||||
|
||||
ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); |
||||
|
||||
using grpc::CallbackServerContext; |
||||
using grpc::Channel; |
||||
using grpc::ClientContext; |
||||
using grpc::Server; |
||||
using grpc::ServerBuilder; |
||||
using grpc::ServerUnaryReactor; |
||||
using grpc::Status; |
||||
using helloworld::Greeter; |
||||
using helloworld::HelloReply; |
||||
using helloworld::HelloRequest; |
||||
|
||||
namespace { |
||||
|
||||
// Sends requests as quickly as possible and times how long it takes to perform
|
||||
// the write operation.
|
||||
class GreeterClientReactor final |
||||
: public grpc::ClientBidiReactor<helloworld::HelloRequest, |
||||
helloworld::HelloReply> { |
||||
public: |
||||
explicit GreeterClientReactor(int reqs, size_t req_size) : reqs_(reqs) { |
||||
req_.set_name(std::string(req_size, '*')); |
||||
} |
||||
|
||||
void Start() { |
||||
absl::MutexLock lock(&mu_); |
||||
StartCall(); |
||||
Write(); |
||||
} |
||||
|
||||
~GreeterClientReactor() override { |
||||
absl::MutexLock lock(&mu_); |
||||
mu_.Await(absl::Condition(+[](bool* done) { return *done; }, &done_)); |
||||
} |
||||
|
||||
void OnWriteDone(bool ok) override { |
||||
absl::MutexLock lock(&mu_); |
||||
std::cout << "Writing took " << absl::Now() - *time_ << std::endl; |
||||
time_ = absl::nullopt; |
||||
if (ok) { |
||||
Write(); |
||||
} |
||||
} |
||||
|
||||
void OnDone(const grpc::Status& status) override { |
||||
if (status.ok()) { |
||||
std::cout << "Done\n"; |
||||
} else { |
||||
std::cout << "Done with error: [" << status.error_code() << "] " |
||||
<< status.error_message() << "\n"; |
||||
} |
||||
absl::MutexLock lock(&mu_); |
||||
done_ = true; |
||||
} |
||||
|
||||
private: |
||||
void Write() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { |
||||
if (reqs_ == 0) { |
||||
StartWritesDone(); |
||||
return; |
||||
} |
||||
--reqs_; |
||||
StartWrite(&req_); |
||||
time_ = absl::Now(); |
||||
} |
||||
|
||||
absl::Mutex mu_; |
||||
bool done_ ABSL_GUARDED_BY(&mu_) = false; |
||||
HelloRequest req_; |
||||
size_t reqs_; |
||||
absl::optional<absl::Time> time_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
} // namespace
|
||||
|
||||
int main(int argc, char** argv) { |
||||
absl::ParseCommandLine(argc, argv); |
||||
grpc::ChannelArguments channel_arguments; |
||||
auto channel = grpc::CreateCustomChannel(absl::GetFlag(FLAGS_target), |
||||
grpc::InsecureChannelCredentials(), |
||||
channel_arguments); |
||||
auto stub = Greeter::NewStub(channel); |
||||
// Send 10 requests with 3Mb payload. This will eventually fill the buffer
|
||||
// and make
|
||||
GreeterClientReactor reactor(10, 3 * 1024 * 1024); |
||||
grpc::ClientContext context; |
||||
stub->async()->SayHelloBidiStream(&context, &reactor); |
||||
reactor.Start(); |
||||
return 0; |
||||
} |
@ -0,0 +1,111 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2021 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <cstddef> |
||||
#include <cstdint> |
||||
#include <iostream> |
||||
#include <string> |
||||
|
||||
#include "absl/flags/flag.h" |
||||
#include "absl/flags/parse.h" |
||||
#include "absl/strings/str_format.h" |
||||
|
||||
#include <grpcpp/ext/proto_server_reflection_plugin.h> |
||||
#include <grpcpp/grpcpp.h> |
||||
#include <grpcpp/health_check_service_interface.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/protos/helloworld.grpc.pb.h" |
||||
#else |
||||
#include "helloworld.grpc.pb.h" |
||||
#endif |
||||
|
||||
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); |
||||
ABSL_FLAG(size_t, quota, 20, "Resource quota, in megabytes"); |
||||
|
||||
namespace { |
||||
|
||||
//
|
||||
// Server reactor that is slow to read incoming messages, causing the buffers
|
||||
// to fill.
|
||||
//
|
||||
class SlowReadingBidiReactor final |
||||
: public grpc::ServerBidiReactor<helloworld::HelloRequest, |
||||
helloworld::HelloReply> { |
||||
public: |
||||
SlowReadingBidiReactor() { StartRead(&req_); } |
||||
|
||||
void OnReadDone(bool ok) override { |
||||
std::cout << "Recieved request with " << req_.name().length() |
||||
<< " bytes name\n"; |
||||
if (!ok) { |
||||
Finish(grpc::Status::OK); |
||||
return; |
||||
} |
||||
sleep(1); |
||||
StartRead(&req_); |
||||
} |
||||
|
||||
void OnDone() override { |
||||
std::cout << "Done\n"; |
||||
delete this; |
||||
} |
||||
|
||||
private: |
||||
absl::Mutex mu_; |
||||
helloworld::HelloRequest req_; |
||||
}; |
||||
|
||||
// Logic and data behind the server's behavior.
|
||||
class GreeterServiceImpl final : public helloworld::Greeter::CallbackService { |
||||
grpc::ServerBidiReactor<helloworld::HelloRequest, helloworld::HelloReply>* |
||||
SayHelloBidiStream(grpc::CallbackServerContext* /* context */) override { |
||||
return new SlowReadingBidiReactor(); |
||||
} |
||||
}; |
||||
|
||||
} // namespace
|
||||
|
||||
void RunServer(uint16_t port) { |
||||
std::string server_address = absl::StrFormat("0.0.0.0:%d", port); |
||||
GreeterServiceImpl service; |
||||
|
||||
grpc::EnableDefaultHealthCheckService(true); |
||||
grpc::reflection::InitProtoReflectionServerBuilderPlugin(); |
||||
grpc::ServerBuilder builder; |
||||
// Listen on the given address without any authentication mechanism.
|
||||
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); |
||||
// Register "service" as the instance through which we'll communicate with
|
||||
// clients. In this case it corresponds to an *synchronous* service.
|
||||
builder.RegisterService(&service); |
||||
grpc::ResourceQuota quota; |
||||
quota.Resize(absl::GetFlag(FLAGS_quota) * 1024 * 1024); |
||||
// Finally assemble the server.
|
||||
auto server = builder.BuildAndStart(); |
||||
std::cout << "Server listening on " << server_address << std::endl; |
||||
|
||||
// Wait for the server to shutdown. Note that some other thread must be
|
||||
// responsible for shutting down the server for this call to ever return.
|
||||
server->Wait(); |
||||
} |
||||
|
||||
int main(int argc, char** argv) { |
||||
absl::ParseCommandLine(argc, argv); |
||||
RunServer(absl::GetFlag(FLAGS_port)); |
||||
return 0; |
||||
} |
@ -0,0 +1,168 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/call/request_buffer.h" |
||||
|
||||
#include <cstdint> |
||||
|
||||
#include "absl/types/optional.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
ValueOrFailure<size_t> RequestBuffer::PushClientInitialMetadata( |
||||
ClientMetadataHandle md) { |
||||
MutexLock lock(&mu_); |
||||
if (absl::get_if<Cancelled>(&state_)) return Failure{}; |
||||
auto& buffering = absl::get<Buffering>(state_); |
||||
CHECK_EQ(buffering.initial_metadata.get(), nullptr); |
||||
buffering.initial_metadata = std::move(md); |
||||
buffering.buffered += buffering.initial_metadata->TransportSize(); |
||||
WakeupAsyncAllPullers(); |
||||
return buffering.buffered; |
||||
} |
||||
|
||||
Poll<ValueOrFailure<size_t>> RequestBuffer::PollPushMessage( |
||||
MessageHandle& message) { |
||||
MutexLock lock(&mu_); |
||||
if (absl::get_if<Cancelled>(&state_)) return Failure{}; |
||||
size_t buffered = 0; |
||||
if (auto* buffering = absl::get_if<Buffering>(&state_)) { |
||||
if (winner_ != nullptr) return PendingPush(); |
||||
buffering->buffered += message->payload()->Length(); |
||||
buffered = buffering->buffered; |
||||
buffering->messages.push_back(std::move(message)); |
||||
} else { |
||||
auto& streaming = absl::get<Streaming>(state_); |
||||
CHECK_EQ(streaming.end_of_stream, false); |
||||
if (streaming.message != nullptr) { |
||||
return PendingPush(); |
||||
} |
||||
streaming.message = std::move(message); |
||||
} |
||||
WakeupAsyncAllPullers(); |
||||
return buffered; |
||||
} |
||||
|
||||
StatusFlag RequestBuffer::FinishSends() { |
||||
MutexLock lock(&mu_); |
||||
if (absl::get_if<Cancelled>(&state_)) return Failure{}; |
||||
if (auto* buffering = absl::get_if<Buffering>(&state_)) { |
||||
Buffered buffered(std::move(buffering->initial_metadata), |
||||
std::move(buffering->messages)); |
||||
state_.emplace<Buffered>(std::move(buffered)); |
||||
} else { |
||||
auto& streaming = absl::get<Streaming>(state_); |
||||
CHECK_EQ(streaming.end_of_stream, false); |
||||
streaming.end_of_stream = true; |
||||
} |
||||
WakeupAsyncAllPullers(); |
||||
return Success{}; |
||||
} |
||||
|
||||
void RequestBuffer::Cancel(absl::Status error) { |
||||
MutexLock lock(&mu_); |
||||
if (absl::holds_alternative<Cancelled>(state_)) return; |
||||
state_.emplace<Cancelled>(std::move(error)); |
||||
WakeupAsyncAllPullers(); |
||||
} |
||||
|
||||
void RequestBuffer::Commit(Reader* winner) { |
||||
MutexLock lock(&mu_); |
||||
CHECK_EQ(winner_, nullptr); |
||||
winner_ = winner; |
||||
if (auto* buffering = absl::get_if<Buffering>(&state_)) { |
||||
if (buffering->initial_metadata != nullptr && |
||||
winner->message_index_ == buffering->messages.size() && |
||||
winner->pulled_client_initial_metadata_) { |
||||
state_.emplace<Streaming>(); |
||||
} |
||||
} else if (auto* buffered = absl::get_if<Buffered>(&state_)) { |
||||
CHECK_NE(buffered->initial_metadata.get(), nullptr); |
||||
if (winner->message_index_ == buffered->messages.size()) { |
||||
state_.emplace<Streaming>().end_of_stream = true; |
||||
} |
||||
} |
||||
WakeupAsyncAllPullersExcept(winner); |
||||
} |
||||
|
||||
void RequestBuffer::WakeupAsyncAllPullersExcept(Reader* except_reader) { |
||||
for (auto wakeup_reader : readers_) { |
||||
if (wakeup_reader == except_reader) continue; |
||||
wakeup_reader->pull_waker_.WakeupAsync(); |
||||
} |
||||
} |
||||
|
||||
Poll<ValueOrFailure<ClientMetadataHandle>> |
||||
RequestBuffer::Reader::PollPullClientInitialMetadata() { |
||||
MutexLock lock(&buffer_->mu_); |
||||
if (buffer_->winner_ != nullptr && buffer_->winner_ != this) { |
||||
error_ = absl::CancelledError("Another call was chosen"); |
||||
return Failure{}; |
||||
} |
||||
if (auto* buffering = absl::get_if<Buffering>(&buffer_->state_)) { |
||||
if (buffering->initial_metadata.get() == nullptr) { |
||||
return buffer_->PendingPull(this); |
||||
} |
||||
pulled_client_initial_metadata_ = true; |
||||
auto result = ClaimObject(buffering->initial_metadata); |
||||
buffer_->MaybeSwitchToStreaming(); |
||||
return result; |
||||
} |
||||
if (auto* buffered = absl::get_if<Buffered>(&buffer_->state_)) { |
||||
pulled_client_initial_metadata_ = true; |
||||
return ClaimObject(buffered->initial_metadata); |
||||
} |
||||
error_ = absl::get<Cancelled>(buffer_->state_).error; |
||||
return Failure{}; |
||||
} |
||||
|
||||
Poll<ValueOrFailure<absl::optional<MessageHandle>>> |
||||
RequestBuffer::Reader::PollPullMessage() { |
||||
ReleasableMutexLock lock(&buffer_->mu_); |
||||
if (buffer_->winner_ != nullptr && buffer_->winner_ != this) { |
||||
error_ = absl::CancelledError("Another call was chosen"); |
||||
return Failure{}; |
||||
} |
||||
if (auto* buffering = absl::get_if<Buffering>(&buffer_->state_)) { |
||||
if (message_index_ == buffering->messages.size()) { |
||||
return buffer_->PendingPull(this); |
||||
} |
||||
const auto idx = message_index_; |
||||
auto result = ClaimObject(buffering->messages[idx]); |
||||
++message_index_; |
||||
buffer_->MaybeSwitchToStreaming(); |
||||
return result; |
||||
} |
||||
if (auto* buffered = absl::get_if<Buffered>(&buffer_->state_)) { |
||||
if (message_index_ == buffered->messages.size()) return absl::nullopt; |
||||
const auto idx = message_index_; |
||||
++message_index_; |
||||
return ClaimObject(buffered->messages[idx]); |
||||
} |
||||
if (auto* streaming = absl::get_if<Streaming>(&buffer_->state_)) { |
||||
if (streaming->message == nullptr) { |
||||
if (streaming->end_of_stream) return absl::nullopt; |
||||
return buffer_->PendingPull(this); |
||||
} |
||||
auto msg = std::move(streaming->message); |
||||
auto waker = std::move(buffer_->push_waker_); |
||||
lock.Release(); |
||||
waker.Wakeup(); |
||||
return msg; |
||||
} |
||||
error_ = absl::get<Cancelled>(buffer_->state_).error; |
||||
return Failure{}; |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,182 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_CALL_REQUEST_BUFFER_H |
||||
#define GRPC_SRC_CORE_CALL_REQUEST_BUFFER_H |
||||
|
||||
#include <utility> |
||||
|
||||
#include "src/core/lib/transport/call_spine.h" |
||||
#include "src/core/lib/transport/message.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Outbound request buffer.
|
||||
// Collects client->server metadata and messages whilst in its initial buffering
|
||||
// mode. In buffering mode it can have zero or more Reader objects attached to
|
||||
// it.
|
||||
// The buffer can later be switched to committed mode, at which point it
|
||||
// will have exactly one Reader object attached to it.
|
||||
// Callers can choose to switch to committed mode based upon policy of their
|
||||
// choice.
|
||||
class RequestBuffer { |
||||
public: |
||||
// One reader of the request buffer.
|
||||
class Reader { |
||||
public: |
||||
explicit Reader(RequestBuffer* buffer) ABSL_LOCKS_EXCLUDED(buffer->mu_) |
||||
: buffer_(buffer) { |
||||
buffer->AddReader(this); |
||||
} |
||||
~Reader() ABSL_LOCKS_EXCLUDED(buffer_->mu_) { buffer_->RemoveReader(this); } |
||||
|
||||
Reader(const Reader&) = delete; |
||||
Reader& operator=(const Reader&) = delete; |
||||
|
||||
// Pull client initial metadata. Returns a promise that resolves to
|
||||
// ValueOrFailure<ClientMetadataHandle>.
|
||||
GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() { |
||||
return [this]() { return PollPullClientInitialMetadata(); }; |
||||
} |
||||
// Pull a message. Returns a promise that resolves to a
|
||||
// ValueOrFailure<absl::optional<MessageHandle>>.
|
||||
GRPC_MUST_USE_RESULT auto PullMessage() { |
||||
return [this]() { return PollPullMessage(); }; |
||||
} |
||||
|
||||
absl::Status TakeError() { return std::move(error_); } |
||||
|
||||
private: |
||||
friend class RequestBuffer; |
||||
|
||||
Poll<ValueOrFailure<ClientMetadataHandle>> PollPullClientInitialMetadata(); |
||||
Poll<ValueOrFailure<absl::optional<MessageHandle>>> PollPullMessage(); |
||||
|
||||
template <typename T> |
||||
T ClaimObject(T& object) ABSL_EXCLUSIVE_LOCKS_REQUIRED(buffer_->mu_) { |
||||
if (buffer_->winner_ == this) return std::move(object); |
||||
return CopyObject(object); |
||||
} |
||||
|
||||
ClientMetadataHandle CopyObject(const ClientMetadataHandle& md) { |
||||
return Arena::MakePooled<ClientMetadata>(md->Copy()); |
||||
} |
||||
|
||||
MessageHandle CopyObject(const MessageHandle& msg) { |
||||
return Arena::MakePooled<Message>(msg->payload()->Copy(), msg->flags()); |
||||
} |
||||
|
||||
RequestBuffer* const buffer_; |
||||
bool pulled_client_initial_metadata_ = false; |
||||
size_t message_index_ = 0; |
||||
absl::Status error_; |
||||
Waker pull_waker_; |
||||
}; |
||||
|
||||
// Push ClientInitialMetadata into the buffer.
|
||||
// This is instantaneous, and returns success with the amount of data
|
||||
// buffered, or failure.
|
||||
ValueOrFailure<size_t> PushClientInitialMetadata(ClientMetadataHandle md); |
||||
// Resolves to a ValueOrFailure<size_t> where the size_t is the amount of data
|
||||
// buffered (or 0 if we're in committed mode).
|
||||
GRPC_MUST_USE_RESULT auto PushMessage(MessageHandle message) { |
||||
return [this, message = std::move(message)]() mutable { |
||||
return PollPushMessage(message); |
||||
}; |
||||
} |
||||
// Push end of stream (client half-closure).
|
||||
StatusFlag FinishSends(); |
||||
// Cancel the request, propagate failure to all readers.
|
||||
void Cancel(absl::Status error = absl::CancelledError()); |
||||
|
||||
// Switch to committed mode - needs to be called exactly once with the winning
|
||||
// reader. All other readers will see failure.
|
||||
void Commit(Reader* winner); |
||||
|
||||
private: |
||||
// Buffering state: we're collecting metadata and messages.
|
||||
struct Buffering { |
||||
// Initial metadata, or nullptr if not yet received.
|
||||
ClientMetadataHandle initial_metadata; |
||||
// Buffered messages.
|
||||
absl::InlinedVector<MessageHandle, 1> messages; |
||||
// Amount of data buffered.
|
||||
size_t buffered = 0; |
||||
}; |
||||
// Buffered state: all messages have been collected (the client has finished
|
||||
// sending).
|
||||
struct Buffered { |
||||
Buffered(ClientMetadataHandle md, |
||||
absl::InlinedVector<MessageHandle, 1> msgs) |
||||
: initial_metadata(std::move(md)), messages(std::move(msgs)) {} |
||||
ClientMetadataHandle initial_metadata; |
||||
absl::InlinedVector<MessageHandle, 1> messages; |
||||
}; |
||||
// Streaming state: we're streaming messages to the server.
|
||||
// This implies winner_ is set.
|
||||
struct Streaming { |
||||
MessageHandle message; |
||||
bool end_of_stream = false; |
||||
}; |
||||
// Cancelled state: the request has been cancelled.
|
||||
struct Cancelled { |
||||
explicit Cancelled(absl::Status error) : error(std::move(error)) {} |
||||
absl::Status error; |
||||
}; |
||||
using State = absl::variant<Buffering, Buffered, Streaming, Cancelled>; |
||||
|
||||
Poll<ValueOrFailure<size_t>> PollPushMessage(MessageHandle& message); |
||||
Pending PendingPull(Reader* reader) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
||||
reader->pull_waker_ = Activity::current()->MakeOwningWaker(); |
||||
return Pending{}; |
||||
} |
||||
Pending PendingPush() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
||||
push_waker_ = Activity::current()->MakeOwningWaker(); |
||||
return Pending{}; |
||||
} |
||||
void MaybeSwitchToStreaming() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
||||
auto& buffering = absl::get<Buffering>(state_); |
||||
if (winner_ == nullptr) return; |
||||
if (winner_->message_index_ < buffering.messages.size()) return; |
||||
state_.emplace<Streaming>(); |
||||
push_waker_.Wakeup(); |
||||
} |
||||
|
||||
void WakeupAsyncAllPullers() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
||||
WakeupAsyncAllPullersExcept(nullptr); |
||||
} |
||||
void WakeupAsyncAllPullersExcept(Reader* except_reader) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
|
||||
void AddReader(Reader* reader) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
||||
readers_.insert(reader); |
||||
} |
||||
|
||||
void RemoveReader(Reader* reader) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
||||
readers_.erase(reader); |
||||
} |
||||
|
||||
Mutex mu_; |
||||
Reader* winner_ ABSL_GUARDED_BY(mu_){nullptr}; |
||||
State state_ ABSL_GUARDED_BY(mu_){Buffering{}}; |
||||
// TODO(ctiller): change this to an intrusively linked list to avoid
|
||||
// allocations.
|
||||
absl::flat_hash_set<Reader*> readers_ ABSL_GUARDED_BY(mu_); |
||||
Waker push_waker_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_CALL_REQUEST_BUFFER_H
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue