From d72ce236d8696ae3493d325edd5f33cda39a757f Mon Sep 17 00:00:00 2001 From: Esun Kim Date: Wed, 6 Sep 2023 09:07:33 -0700 Subject: [PATCH] [Example] Added gRPC C++ cancellation example (#34239) --- examples/cpp/cancellation/BUILD | 39 ++++++++ examples/cpp/cancellation/CMakeLists.txt | 71 ++++++++++++++ examples/cpp/cancellation/README.md | 37 +++++++ examples/cpp/cancellation/client.cc | 119 +++++++++++++++++++++++ examples/cpp/cancellation/server.cc | 104 ++++++++++++++++++++ examples/protos/helloworld.proto | 2 + 6 files changed, 372 insertions(+) create mode 100644 examples/cpp/cancellation/BUILD create mode 100644 examples/cpp/cancellation/CMakeLists.txt create mode 100644 examples/cpp/cancellation/README.md create mode 100644 examples/cpp/cancellation/client.cc create mode 100644 examples/cpp/cancellation/server.cc diff --git a/examples/cpp/cancellation/BUILD b/examples/cpp/cancellation/BUILD new file mode 100644 index 00000000000..03ef7700eed --- /dev/null +++ b/examples/cpp/cancellation/BUILD @@ -0,0 +1,39 @@ +# Copyright 2023 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +licenses(["notice"]) + +cc_binary( + name = "client", + srcs = ["client.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "//:grpc++", + "//examples/protos:helloworld_cc_grpc", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + ], +) + +cc_binary( + name = "server", + srcs = ["server.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "//:grpc++", + "//examples/protos:helloworld_cc_grpc", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + ], +) diff --git a/examples/cpp/cancellation/CMakeLists.txt b/examples/cpp/cancellation/CMakeLists.txt new file mode 100644 index 00000000000..c82111b5239 --- /dev/null +++ b/examples/cpp/cancellation/CMakeLists.txt @@ -0,0 +1,71 @@ +# Copyright 2023 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# cmake build file for C++ keyvaluestore example. +# Assumes protobuf and gRPC have been installed using cmake. +# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build +# that automatically builds all the dependencies before building keyvaluestore. + +cmake_minimum_required(VERSION 3.8) + +project(Cancellation C CXX) + +include(../cmake/common.cmake) + +# Proto files +get_filename_component(hw_proto "../../protos/helloworld.proto" ABSOLUTE) +get_filename_component(hw_proto_path "${hw_proto}" PATH) + +# Generated sources +set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc") +set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h") +set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc") +set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h") +add_custom_command( + OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${hw_proto_path}" + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${hw_proto}" + DEPENDS "${hw_proto}") + +# Include generated *.pb.h files +include_directories("${CMAKE_CURRENT_BINARY_DIR}") + +# hw_grpc_proto +add_library(hw_grpc_proto + ${hw_grpc_srcs} + ${hw_grpc_hdrs} + ${hw_proto_srcs} + ${hw_proto_hdrs}) +target_link_libraries(hw_grpc_proto + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF}) + +# Targets greeter_(client|server) +foreach(_target + client server) + add_executable(${_target} "${_target}.cc") + target_link_libraries(${_target} + hw_grpc_proto + absl::flags + absl::flags_parse + absl::strings + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF}) +endforeach() diff --git a/examples/cpp/cancellation/README.md b/examples/cpp/cancellation/README.md new file mode 100644 index 00000000000..79c7961a611 --- /dev/null +++ b/examples/cpp/cancellation/README.md @@ -0,0 +1,37 @@ +# Cancellation Example + +## Overview + +This example shows you how to cancel from the client and how to get informed on the server and the client. + +### Try it! + +Once you have working gRPC, you can build this example using either bazel or cmake. + +Run the server, which will listen on port 50051: + +```sh +$ ./server +``` + +Run the client (in a different terminal): + +```sh +$ ./client +``` + +If things go smoothly, you will see the client output: + +``` +Begin : Begin Ack +Count 1 : Count 1 Ack +Count 2 : Count 2 Ack +Count 3 : Count 3 Ack +Count 4 : Count 4 Ack +Count 5 : Count 5 Ack +Count 6 : Count 6 Ack +Count 7 : Count 7 Ack +Count 8 : Count 8 Ack +Count 9 : Count 9 Ack +RPC Cancelled! +``` diff --git a/examples/cpp/cancellation/client.cc b/examples/cpp/cancellation/client.cc new file mode 100644 index 00000000000..97c9f1fac48 --- /dev/null +++ b/examples/cpp/cancellation/client.cc @@ -0,0 +1,119 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/strings/str_cat.h" + +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#endif + +ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; +using grpc::StatusCode; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +// Requests each key in the vector and displays the key and its corresponding +// value as a pair. +class KeyValueStoreClient + : public grpc::ClientBidiReactor { + public: + KeyValueStoreClient(std::shared_ptr channel) + : stub_(Greeter::NewStub(channel)) { + stub_->async()->SayHelloBidiStream(&context_, this); + request_.set_name("Begin"); + StartWrite(&request_); + StartCall(); + } + + void OnReadDone(bool ok) override { + if (ok) { + std::cout << request_.name() << " : " << response_.message() << std::endl; + if (++counter_ < 10) { + request_.set_name(absl::StrCat("Count ", counter_)); + StartWrite(&request_); + } else { + // Cancel after sending 10 messages + context_.TryCancel(); + } + } + } + + void OnWriteDone(bool ok) override { + if (ok) { + StartRead(&response_); + } + } + + void OnDone(const grpc::Status& status) override { + if (!status.ok()) { + if (status.error_code() == StatusCode::CANCELLED) { + // Eventually client will know here that call is cancelled. + std::cout << "RPC Cancelled!" << std::endl; + } else { + std::cout << "RPC Failed: " << status.error_code() << ": " + << status.error_message() << std::endl; + } + } + std::unique_lock l(mu_); + done_ = true; + cv_.notify_all(); + } + + void Await() { + std::unique_lock l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + std::unique_ptr stub_; + size_t counter_ = 0; + ClientContext context_; + bool done_ = false; + HelloRequest request_; + HelloReply response_; + std::mutex mu_; + std::condition_variable cv_; +}; + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + // Instantiate the client. It requires a channel, out of which the actual RPCs + // are created. This channel models a connection to an endpoint specified by + // the argument "--target=" which is the only expected argument. + std::string target_str = absl::GetFlag(FLAGS_target); + KeyValueStoreClient client( + grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials())); + client.Await(); + return 0; +} diff --git a/examples/cpp/cancellation/server.cc b/examples/cpp/cancellation/server.cc new file mode 100644 index 00000000000..7c537976508 --- /dev/null +++ b/examples/cpp/cancellation/server.cc @@ -0,0 +1,104 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/strings/str_format.h" + +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#endif + +ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); + +using grpc::CallbackServerContext; +using grpc::Server; +using grpc::ServerBidiReactor; +using grpc::ServerBuilder; +using grpc::Status; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +// Logic behind the server's behavior. +class KeyValueStoreServiceImpl final : public Greeter::CallbackService { + ServerBidiReactor* SayHelloBidiStream( + CallbackServerContext* context) override { + class Reactor : public ServerBidiReactor { + public: + explicit Reactor() { StartRead(&request_); } + + void OnReadDone(bool ok) override { + if (!ok) { + // Client cancelled it + std::cout << "OnReadDone Cancelled!" << std::endl; + return Finish(grpc::Status::CANCELLED); + } + response_.set_message(absl::StrCat(request_.name(), " Ack")); + StartWrite(&response_); + } + + void OnWriteDone(bool ok) override { + if (!ok) { + // Client cancelled it + std::cout << "OnWriteDone Cancelled!" << std::endl; + return Finish(grpc::Status::CANCELLED); + } + StartRead(&request_); + } + + void OnDone() override { delete this; } + + private: + HelloRequest request_; + HelloReply response_; + }; + + return new Reactor(); + } +}; + +void RunServer(uint16_t port) { + std::string server_address = absl::StrFormat("0.0.0.0:%d", port); + KeyValueStoreServiceImpl service; + + ServerBuilder builder; + // Listen on the given address without any authentication mechanism. + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + // Register "service" as the instance through which we'll communicate with + // clients. In this case it corresponds to an *synchronous* service. + builder.RegisterService(&service); + // Finally assemble the server. + std::unique_ptr server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + + // Wait for the server to shutdown. Note that some other thread must be + // responsible for shutting down the server for this call to ever return. + server->Wait(); +} + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + RunServer(absl::GetFlag(FLAGS_port)); + return 0; +} diff --git a/examples/protos/helloworld.proto b/examples/protos/helloworld.proto index 7e50d0fc76d..9a0b59c0d3d 100644 --- a/examples/protos/helloworld.proto +++ b/examples/protos/helloworld.proto @@ -27,6 +27,8 @@ service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {} + + rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {} } // The request message containing the user's name.