diff --git a/examples/cpp/flow_control/BUILD b/examples/cpp/flow_control/BUILD new file mode 100644 index 00000000000..9cd66865c3c --- /dev/null +++ b/examples/cpp/flow_control/BUILD @@ -0,0 +1,41 @@ +# Copyright 2024 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +licenses(["notice"]) + +cc_binary( + name = "server_flow_control_client", + srcs = ["server_flow_control_client.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "//:grpc++", + "//:grpc++_reflection", + "//examples/protos:helloworld_cc_grpc", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + ], +) + +cc_binary( + name = "server_flow_control_server", + srcs = ["server_flow_control_server.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "//:grpc++", + "//:grpc++_reflection", + "//examples/protos:helloworld_cc_grpc", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + ], +) diff --git a/examples/cpp/flow_control/CMakeLists.txt b/examples/cpp/flow_control/CMakeLists.txt new file mode 100644 index 00000000000..c546af0a274 --- /dev/null +++ b/examples/cpp/flow_control/CMakeLists.txt @@ -0,0 +1,73 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# cmake build file for C++ flow_control example. +# Assumes protobuf and gRPC have been installed using cmake. +# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build +# that automatically builds all the dependencies before building example. + +cmake_minimum_required(VERSION 3.8) + +project(HelloWorld C CXX) + +include(../cmake/common.cmake) + +# Proto file +get_filename_component(hw_proto "../../protos/helloworld.proto" ABSOLUTE) +get_filename_component(hw_proto_path "${hw_proto}" PATH) + +# Generated sources +set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc") +set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h") +set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc") +set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h") +add_custom_command( + OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${hw_proto_path}" + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${hw_proto}" + DEPENDS "${hw_proto}") + +# Include generated *.pb.h files +include_directories("${CMAKE_CURRENT_BINARY_DIR}") + +# hw_grpc_proto +add_library(hw_grpc_proto + ${hw_grpc_srcs} + ${hw_grpc_hdrs} + ${hw_proto_srcs} + ${hw_proto_hdrs}) +target_link_libraries(hw_grpc_proto + absl::check + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF}) + +# Targets greeter_[async_](client|server) +foreach(_target + server_flow_control_client server_flow_control_server) + add_executable(${_target} "${_target}.cc") + target_link_libraries(${_target} + hw_grpc_proto + absl::check + absl::flags + absl::flags_parse + absl::log + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF}) +endforeach() diff --git a/examples/cpp/flow_control/README.md b/examples/cpp/flow_control/README.md new file mode 100644 index 00000000000..4e8e1136c6b --- /dev/null +++ b/examples/cpp/flow_control/README.md @@ -0,0 +1,29 @@ +gRPC Flow Control Example +===================== + +# Overview + +Flow control is relevant for streaming RPC calls. + +The underlying layer will make the write wait when there is no space to write +the next message. This causes the request stream to go into a not ready state +and the method invocation waits. + +# Server flow control + +In server case, gRPC will pause the server implementation that is sending the +messages too fast. Server implementation is in +[server_flow_control_server.cc](server_flow_control_server.cc). It will write +a specified number of responses of a specified size as fast as possible. +As client-side buffer is filled, the write operation will block until the buffer +is freed. + +A client implementation in [server_flow_control_client.cc](server_flow_control_client.cc) +will delay for 1s before starting a next read to simulate client that does not +have resources for handling the replies. + +# Related information + +Also see [gRPC Flow Control Users Guide][user guide] + + [user guide]: https://grpc.io/docs/guides/flow-control \ No newline at end of file diff --git a/examples/cpp/flow_control/server_flow_control_client.cc b/examples/cpp/flow_control/server_flow_control_client.cc new file mode 100644 index 00000000000..2c9b5938a48 --- /dev/null +++ b/examples/cpp/flow_control/server_flow_control_client.cc @@ -0,0 +1,104 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" + +#include +#include +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#endif + +ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); +ABSL_FLAG(size_t, quota, 20, + "Resource quota (in megabytes) that defines how much memory gRPC has " + "available for buffers"); + +namespace { + +class Reader final : public grpc::ClientReadReactor { + public: + void Start() { + StartRead(&res_); + StartCall(); + } + + grpc::Status WaitForDone() { + absl::MutexLock lock(&mu_); + mu_.Await(absl::Condition( + +[](Reader* reader) { return reader->result_.has_value(); }, this)); + return *result_; + } + + void OnReadDone(bool ok) override { + if (!ok) { + std::cout << "Done reading\n"; + return; + } + std::cout << "Read " << res_.message().length() << " bytes.\n"; + res_.set_message(""); + // A delay to slow down the client so it can't read responses quick enough + sleep(1); + StartRead(&res_); + } + + void OnDone(const grpc::Status& status) override { + absl::MutexLock lock(&mu_); + result_ = status; + } + + private: + absl::Mutex mu_; + absl::optional result_; + helloworld::HelloReply res_; +}; + +} // namespace + +int main(int argc, char* argv[]) { + absl::ParseCommandLine(argc, argv); + grpc::ChannelArguments channel_arguments; + grpc::ResourceQuota quota; + quota.Resize(absl::GetFlag(FLAGS_quota) * 1024 * 1024); + channel_arguments.SetResourceQuota(quota); + auto channel = grpc::CreateCustomChannel(absl::GetFlag(FLAGS_target), + grpc::InsecureChannelCredentials(), + channel_arguments); + auto greeter = helloworld::Greeter::NewStub(channel); + grpc::ClientContext ctx; + helloworld::HelloRequest req; + req.set_name("World"); + Reader reader; + greeter->async()->SayHelloStreamReply(&ctx, &req, &reader); + reader.Start(); + auto status = reader.WaitForDone(); + if (status.ok()) { + std::cout << "Success\n"; + } else { + std::cerr << "Failed with error: " << status.error_message() << "\n"; + } + return 0; +} diff --git a/examples/cpp/flow_control/server_flow_control_server.cc b/examples/cpp/flow_control/server_flow_control_server.cc new file mode 100644 index 00000000000..75b71eb1e1a --- /dev/null +++ b/examples/cpp/flow_control/server_flow_control_server.cc @@ -0,0 +1,131 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/strings/str_cat.h" + +#include +#include +#include +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#endif + +ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); +ABSL_FLAG(size_t, message_size, 3 * 1024 * 1024, + "Size of the messages to send"); +ABSL_FLAG(uint32_t, to_send, 10, + "Messages to send in response to a single request"); + +using grpc::CallbackServerContext; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerUnaryReactor; +using grpc::Status; + +namespace { + +// +// Will write the replies as fast as it can, starting a new write as soon as +// previous one is done. +// +class HelloReactor final + : public grpc::ServerWriteReactor { + public: + HelloReactor(size_t message_size, size_t to_send) + : messages_to_send_(to_send) { + res_.set_message(std::string(message_size, '#')); + Write(); + } + + void Write() { + absl::MutexLock lock(&mu_); + StartWrite(&res_); + --messages_to_send_; + write_start_time_ = absl::Now(); + } + + void OnWriteDone(bool ok) override { + bool more = false; + { + absl::MutexLock lock(&mu_); + std::cout << "Write #" << messages_to_send_ << " done (Ok: " << ok + << "): " << absl::Now() - *write_start_time_ << "\n"; + write_start_time_ = absl::nullopt; + more = ok && messages_to_send_ > 0; + } + if (more) { + Write(); + } else { + Finish(grpc::Status::OK); + std::cout << "Done sending messages\n"; + } + } + + void OnDone() override { delete this; } + + private: + helloworld::HelloReply res_; + size_t messages_to_send_; + absl::optional write_start_time_; + absl::Mutex mu_; +}; + +class GreeterService final : public helloworld::Greeter::CallbackService { + public: + GreeterService(size_t message_size, size_t to_send) + : message_size_(message_size), to_send_(to_send) {} + + grpc::ServerWriteReactor* SayHelloStreamReply( + grpc::CallbackServerContext* /*context*/, + const helloworld::HelloRequest* request) override { + return new HelloReactor(message_size_, to_send_); + } + + private: + size_t message_size_; + size_t to_send_; +}; + +} // namespace + +int main(int argc, char* argv[]) { + absl::ParseCommandLine(argc, argv); + std::string server_address = + absl::StrCat("0.0.0.0:", absl::GetFlag(FLAGS_port)); + grpc::EnableDefaultHealthCheckService(true); + grpc::reflection::InitProtoReflectionServerBuilderPlugin(); + GreeterService service(absl::GetFlag(FLAGS_message_size), + absl::GetFlag(FLAGS_to_send)); + ServerBuilder builder; + builder.RegisterService(&service); + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + std::unique_ptr server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + server->Wait(); + return 0; +}