diff --git a/examples/cpp/flow_control/BUILD b/examples/cpp/flow_control/BUILD index 9cd66865c3c..981b7530838 100644 --- a/examples/cpp/flow_control/BUILD +++ b/examples/cpp/flow_control/BUILD @@ -14,6 +14,33 @@ licenses(["notice"]) +cc_binary( + name = "client_flow_control_client", + srcs = ["client_flow_control_client.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "//:grpc++", + "//:grpc++_reflection", + "//examples/protos:helloworld_cc_grpc", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + ], +) + +cc_binary( + name = "client_flow_control_server", + srcs = ["client_flow_control_server.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "//:grpc++", + "//:grpc++_reflection", + "//examples/protos:helloworld_cc_grpc", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + "@com_google_absl//absl/strings:str_format", + ], +) + cc_binary( name = "server_flow_control_client", srcs = ["server_flow_control_client.cc"], diff --git a/examples/cpp/flow_control/CMakeLists.txt b/examples/cpp/flow_control/CMakeLists.txt index c546af0a274..0d6d4353177 100644 --- a/examples/cpp/flow_control/CMakeLists.txt +++ b/examples/cpp/flow_control/CMakeLists.txt @@ -59,6 +59,7 @@ target_link_libraries(hw_grpc_proto # Targets greeter_[async_](client|server) foreach(_target + client_flow_control_client client_flow_control_server server_flow_control_client server_flow_control_server) add_executable(${_target} "${_target}.cc") target_link_libraries(${_target} diff --git a/examples/cpp/flow_control/client_flow_control_client.cc b/examples/cpp/flow_control/client_flow_control_client.cc new file mode 100644 index 00000000000..933489d7d04 --- /dev/null +++ b/examples/cpp/flow_control/client_flow_control_client.cc @@ -0,0 +1,126 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" + +#include +#include +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#endif + +ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); + +using grpc::CallbackServerContext; +using grpc::Channel; +using grpc::ClientContext; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerUnaryReactor; +using grpc::Status; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +namespace { + +// Sends requests as quickly as possible and times how long it takes to perform +// the write operation. +class GreeterClientReactor final + : public grpc::ClientBidiReactor { + public: + explicit GreeterClientReactor(int reqs, size_t req_size) : reqs_(reqs) { + req_.set_name(std::string(req_size, '*')); + } + + void Start() { + absl::MutexLock lock(&mu_); + StartCall(); + Write(); + } + + ~GreeterClientReactor() override { + absl::MutexLock lock(&mu_); + mu_.Await(absl::Condition(+[](bool* done) { return *done; }, &done_)); + } + + void OnWriteDone(bool ok) override { + absl::MutexLock lock(&mu_); + std::cout << "Writing took " << absl::Now() - *time_ << std::endl; + time_ = absl::nullopt; + if (ok) { + Write(); + } + } + + void OnDone(const grpc::Status& status) override { + if (status.ok()) { + std::cout << "Done\n"; + } else { + std::cout << "Done with error: [" << status.error_code() << "] " + << status.error_message() << "\n"; + } + absl::MutexLock lock(&mu_); + done_ = true; + } + + private: + void Write() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { + if (reqs_ == 0) { + StartWritesDone(); + return; + } + --reqs_; + StartWrite(&req_); + time_ = absl::Now(); + } + + absl::Mutex mu_; + bool done_ ABSL_GUARDED_BY(&mu_) = false; + HelloRequest req_; + size_t reqs_; + absl::optional time_ ABSL_GUARDED_BY(mu_); +}; + +} // namespace + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + grpc::ChannelArguments channel_arguments; + auto channel = grpc::CreateCustomChannel(absl::GetFlag(FLAGS_target), + grpc::InsecureChannelCredentials(), + channel_arguments); + auto stub = Greeter::NewStub(channel); + // Send 10 requests with 3Mb payload. This will eventually fill the buffer + // and make + GreeterClientReactor reactor(10, 3 * 1024 * 1024); + grpc::ClientContext context; + stub->async()->SayHelloBidiStream(&context, &reactor); + reactor.Start(); + return 0; +} diff --git a/examples/cpp/flow_control/client_flow_control_server.cc b/examples/cpp/flow_control/client_flow_control_server.cc new file mode 100644 index 00000000000..31de9275d7c --- /dev/null +++ b/examples/cpp/flow_control/client_flow_control_server.cc @@ -0,0 +1,111 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/strings/str_format.h" + +#include +#include +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#endif + +ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); +ABSL_FLAG(size_t, quota, 20, "Resource quota, in megabytes"); + +namespace { + +// +// Server reactor that is slow to read incoming messages, causing the buffers +// to fill. +// +class SlowReadingBidiReactor final + : public grpc::ServerBidiReactor { + public: + SlowReadingBidiReactor() { StartRead(&req_); } + + void OnReadDone(bool ok) override { + std::cout << "Recieved request with " << req_.name().length() + << " bytes name\n"; + if (!ok) { + Finish(grpc::Status::OK); + return; + } + sleep(1); + StartRead(&req_); + } + + void OnDone() override { + std::cout << "Done\n"; + delete this; + } + + private: + absl::Mutex mu_; + helloworld::HelloRequest req_; +}; + +// Logic and data behind the server's behavior. +class GreeterServiceImpl final : public helloworld::Greeter::CallbackService { + grpc::ServerBidiReactor* + SayHelloBidiStream(grpc::CallbackServerContext* /* context */) override { + return new SlowReadingBidiReactor(); + } +}; + +} // namespace + +void RunServer(uint16_t port) { + std::string server_address = absl::StrFormat("0.0.0.0:%d", port); + GreeterServiceImpl service; + + grpc::EnableDefaultHealthCheckService(true); + grpc::reflection::InitProtoReflectionServerBuilderPlugin(); + grpc::ServerBuilder builder; + // Listen on the given address without any authentication mechanism. + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + // Register "service" as the instance through which we'll communicate with + // clients. In this case it corresponds to an *synchronous* service. + builder.RegisterService(&service); + grpc::ResourceQuota quota; + quota.Resize(absl::GetFlag(FLAGS_quota) * 1024 * 1024); + // Finally assemble the server. + auto server = builder.BuildAndStart(); + std::cout << "Server listening on " << server_address << std::endl; + + // Wait for the server to shutdown. Note that some other thread must be + // responsible for shutting down the server for this call to ever return. + server->Wait(); +} + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + RunServer(absl::GetFlag(FLAGS_port)); + return 0; +}