[samples] Server flow control example (#37591)

Closes #37591

PiperOrigin-RevId: 670722178
pull/37604/head
Eugene Ostroukhov 6 months ago committed by Copybara-Service
parent ed3e08f136
commit 40870a9f26
  1. 41
      examples/cpp/flow_control/BUILD
  2. 73
      examples/cpp/flow_control/CMakeLists.txt
  3. 29
      examples/cpp/flow_control/README.md
  4. 104
      examples/cpp/flow_control/server_flow_control_client.cc
  5. 131
      examples/cpp/flow_control/server_flow_control_server.cc

@ -0,0 +1,41 @@
# Copyright 2024 the gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
licenses(["notice"])
cc_binary(
name = "server_flow_control_client",
srcs = ["server_flow_control_client.cc"],
defines = ["BAZEL_BUILD"],
deps = [
"//:grpc++",
"//:grpc++_reflection",
"//examples/protos:helloworld_cc_grpc",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
],
)
cc_binary(
name = "server_flow_control_server",
srcs = ["server_flow_control_server.cc"],
defines = ["BAZEL_BUILD"],
deps = [
"//:grpc++",
"//:grpc++_reflection",
"//examples/protos:helloworld_cc_grpc",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
],
)

@ -0,0 +1,73 @@
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cmake build file for C++ flow_control example.
# Assumes protobuf and gRPC have been installed using cmake.
# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build
# that automatically builds all the dependencies before building example.
cmake_minimum_required(VERSION 3.8)
project(HelloWorld C CXX)
include(../cmake/common.cmake)
# Proto file
get_filename_component(hw_proto "../../protos/helloworld.proto" ABSOLUTE)
get_filename_component(hw_proto_path "${hw_proto}" PATH)
# Generated sources
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc")
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h")
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc")
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h")
add_custom_command(
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${hw_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${hw_proto}"
DEPENDS "${hw_proto}")
# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# hw_grpc_proto
add_library(hw_grpc_proto
${hw_grpc_srcs}
${hw_grpc_hdrs}
${hw_proto_srcs}
${hw_proto_hdrs})
target_link_libraries(hw_grpc_proto
absl::check
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
# Targets greeter_[async_](client|server)
foreach(_target
server_flow_control_client server_flow_control_server)
add_executable(${_target} "${_target}.cc")
target_link_libraries(${_target}
hw_grpc_proto
absl::check
absl::flags
absl::flags_parse
absl::log
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
endforeach()

@ -0,0 +1,29 @@
gRPC Flow Control Example
=====================
# Overview
Flow control is relevant for streaming RPC calls.
The underlying layer will make the write wait when there is no space to write
the next message. This causes the request stream to go into a not ready state
and the method invocation waits.
# Server flow control
In server case, gRPC will pause the server implementation that is sending the
messages too fast. Server implementation is in
[server_flow_control_server.cc](server_flow_control_server.cc). It will write
a specified number of responses of a specified size as fast as possible.
As client-side buffer is filled, the write operation will block until the buffer
is freed.
A client implementation in [server_flow_control_client.cc](server_flow_control_client.cc)
will delay for 1s before starting a next read to simulate client that does not
have resources for handling the replies.
# Related information
Also see [gRPC Flow Control Users Guide][user guide]
[user guide]: https://grpc.io/docs/guides/flow-control

@ -0,0 +1,104 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <iostream>
#include <string>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include <grpc/grpc.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
ABSL_FLAG(std::string, target, "localhost:50051", "Server address");
ABSL_FLAG(size_t, quota, 20,
"Resource quota (in megabytes) that defines how much memory gRPC has "
"available for buffers");
namespace {
class Reader final : public grpc::ClientReadReactor<helloworld::HelloReply> {
public:
void Start() {
StartRead(&res_);
StartCall();
}
grpc::Status WaitForDone() {
absl::MutexLock lock(&mu_);
mu_.Await(absl::Condition(
+[](Reader* reader) { return reader->result_.has_value(); }, this));
return *result_;
}
void OnReadDone(bool ok) override {
if (!ok) {
std::cout << "Done reading\n";
return;
}
std::cout << "Read " << res_.message().length() << " bytes.\n";
res_.set_message("");
// A delay to slow down the client so it can't read responses quick enough
sleep(1);
StartRead(&res_);
}
void OnDone(const grpc::Status& status) override {
absl::MutexLock lock(&mu_);
result_ = status;
}
private:
absl::Mutex mu_;
absl::optional<grpc::Status> result_;
helloworld::HelloReply res_;
};
} // namespace
int main(int argc, char* argv[]) {
absl::ParseCommandLine(argc, argv);
grpc::ChannelArguments channel_arguments;
grpc::ResourceQuota quota;
quota.Resize(absl::GetFlag(FLAGS_quota) * 1024 * 1024);
channel_arguments.SetResourceQuota(quota);
auto channel = grpc::CreateCustomChannel(absl::GetFlag(FLAGS_target),
grpc::InsecureChannelCredentials(),
channel_arguments);
auto greeter = helloworld::Greeter::NewStub(channel);
grpc::ClientContext ctx;
helloworld::HelloRequest req;
req.set_name("World");
Reader reader;
greeter->async()->SayHelloStreamReply(&ctx, &req, &reader);
reader.Start();
auto status = reader.WaitForDone();
if (status.ok()) {
std::cout << "Success\n";
} else {
std::cerr << "Failed with error: " << status.error_message() << "\n";
}
return 0;
}

@ -0,0 +1,131 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <cstddef>
#include <iostream>
#include <memory>
#include <string>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/strings/str_cat.h"
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/support/status.h>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");
ABSL_FLAG(size_t, message_size, 3 * 1024 * 1024,
"Size of the messages to send");
ABSL_FLAG(uint32_t, to_send, 10,
"Messages to send in response to a single request");
using grpc::CallbackServerContext;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerUnaryReactor;
using grpc::Status;
namespace {
//
// Will write the replies as fast as it can, starting a new write as soon as
// previous one is done.
//
class HelloReactor final
: public grpc::ServerWriteReactor<helloworld::HelloReply> {
public:
HelloReactor(size_t message_size, size_t to_send)
: messages_to_send_(to_send) {
res_.set_message(std::string(message_size, '#'));
Write();
}
void Write() {
absl::MutexLock lock(&mu_);
StartWrite(&res_);
--messages_to_send_;
write_start_time_ = absl::Now();
}
void OnWriteDone(bool ok) override {
bool more = false;
{
absl::MutexLock lock(&mu_);
std::cout << "Write #" << messages_to_send_ << " done (Ok: " << ok
<< "): " << absl::Now() - *write_start_time_ << "\n";
write_start_time_ = absl::nullopt;
more = ok && messages_to_send_ > 0;
}
if (more) {
Write();
} else {
Finish(grpc::Status::OK);
std::cout << "Done sending messages\n";
}
}
void OnDone() override { delete this; }
private:
helloworld::HelloReply res_;
size_t messages_to_send_;
absl::optional<absl::Time> write_start_time_;
absl::Mutex mu_;
};
class GreeterService final : public helloworld::Greeter::CallbackService {
public:
GreeterService(size_t message_size, size_t to_send)
: message_size_(message_size), to_send_(to_send) {}
grpc::ServerWriteReactor<helloworld::HelloReply>* SayHelloStreamReply(
grpc::CallbackServerContext* /*context*/,
const helloworld::HelloRequest* request) override {
return new HelloReactor(message_size_, to_send_);
}
private:
size_t message_size_;
size_t to_send_;
};
} // namespace
int main(int argc, char* argv[]) {
absl::ParseCommandLine(argc, argv);
std::string server_address =
absl::StrCat("0.0.0.0:", absl::GetFlag(FLAGS_port));
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
GreeterService service(absl::GetFlag(FLAGS_message_size),
absl::GetFlag(FLAGS_to_send));
ServerBuilder builder;
builder.RegisterService(&service);
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
return 0;
}
Loading…
Cancel
Save