From f0dcac330ffd635924a7a384a6d441b7fd0c8595 Mon Sep 17 00:00:00 2001 From: Esun Kim Date: Thu, 14 Dec 2023 15:48:53 -0800 Subject: [PATCH] [Example] Added the deadline example (#35217) Added the deadline example Closes #35217 PiperOrigin-RevId: 591074224 --- examples/cpp/deadline/BUILD | 39 ++++++++ examples/cpp/deadline/CMakeLists.txt | 70 +++++++++++++++ examples/cpp/deadline/README.md | 35 ++++++++ examples/cpp/deadline/client.cc | 106 ++++++++++++++++++++++ examples/cpp/deadline/server.cc | 130 +++++++++++++++++++++++++++ 5 files changed, 380 insertions(+) create mode 100644 examples/cpp/deadline/BUILD create mode 100644 examples/cpp/deadline/CMakeLists.txt create mode 100644 examples/cpp/deadline/README.md create mode 100644 examples/cpp/deadline/client.cc create mode 100644 examples/cpp/deadline/server.cc diff --git a/examples/cpp/deadline/BUILD b/examples/cpp/deadline/BUILD new file mode 100644 index 00000000000..03ef7700eed --- /dev/null +++ b/examples/cpp/deadline/BUILD @@ -0,0 +1,39 @@ +# Copyright 2023 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +licenses(["notice"]) + +cc_binary( + name = "client", + srcs = ["client.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "//:grpc++", + "//examples/protos:helloworld_cc_grpc", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + ], +) + +cc_binary( + name = "server", + srcs = ["server.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "//:grpc++", + "//examples/protos:helloworld_cc_grpc", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + ], +) diff --git a/examples/cpp/deadline/CMakeLists.txt b/examples/cpp/deadline/CMakeLists.txt new file mode 100644 index 00000000000..1cf3462e31c --- /dev/null +++ b/examples/cpp/deadline/CMakeLists.txt @@ -0,0 +1,70 @@ +# Copyright 2023 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Assumes protobuf and gRPC have been installed using cmake. +# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build +# that automatically builds all the dependencies before building this example. + +cmake_minimum_required(VERSION 3.8) + +project(Deadline C CXX) + +include(../cmake/common.cmake) + +# Proto files +get_filename_component(hw_proto "../../protos/helloworld.proto" ABSOLUTE) +get_filename_component(hw_proto_path "${hw_proto}" PATH) + +# Generated sources +set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc") +set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h") +set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc") +set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h") +add_custom_command( + OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${hw_proto_path}" + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${hw_proto}" + DEPENDS "${hw_proto}") + +# Include generated *.pb.h files +include_directories("${CMAKE_CURRENT_BINARY_DIR}") + +# hw_grpc_proto +add_library(hw_grpc_proto + ${hw_grpc_srcs} + ${hw_grpc_hdrs} + ${hw_proto_srcs} + ${hw_proto_hdrs}) +target_link_libraries(hw_grpc_proto + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF}) + +# Targets greeter_(client|server) +foreach(_target + client server) + add_executable(${_target} "${_target}.cc") + target_link_libraries(${_target} + hw_grpc_proto + absl::flags + absl::flags_parse + absl::strings + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF}) +endforeach() diff --git a/examples/cpp/deadline/README.md b/examples/cpp/deadline/README.md new file mode 100644 index 00000000000..cfa8ee9b720 --- /dev/null +++ b/examples/cpp/deadline/README.md @@ -0,0 +1,35 @@ +# Deadline Example + +## Overview + +This example shows you how to use deadline when calling calls. + +### Try it! + +Once you have working gRPC, you can build this example using either bazel or cmake. + +Run the server, which will listen on port 50051: + +```sh +$ ./server +``` + +Run the client (in a different terminal): + +```sh +$ ./client +``` + +To simulate the test scenario, the test server implements following functionalities: +- Response Delay: The server intentionally delays its response for `delay` request messages to induce timeout conditions. +- Deadline Propagation: Upon receiving a request with the `[propagate me]` prefix, the server forwards it back to itselt. + This simulates the propagation of deadlines within the system. + +If things go smoothly, you will see the client output: + +``` +[Successful request] wanted = 0, got = 0 +[Exceeds deadline] wanted = 4, got = 4 +[Successful request with propagated deadline] wanted = 0, got = 0 +[Exceeds propagated deadline] wanted = 4, got = 4 +``` diff --git a/examples/cpp/deadline/client.cc b/examples/cpp/deadline/client.cc new file mode 100644 index 00000000000..398be745733 --- /dev/null +++ b/examples/cpp/deadline/client.cc @@ -0,0 +1,106 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/strings/str_cat.h" + +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#endif + +ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; +using grpc::StatusCode; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +void unaryCall(std::shared_ptr channel, std::string label, + std::string message, grpc::StatusCode expected_code) { + std::unique_ptr stub = Greeter::NewStub(channel); + + // Data we are sending to the server. + HelloRequest request; + request.set_name(message); + + // Container for the data we expect from the server. + HelloReply reply; + + // Context for the client. It could be used to convey extra information to + // the server and/or tweak certain RPC behaviors. + ClientContext context; + + // Set 1 second timeout + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::seconds(1)); + + // The actual RPC. + std::mutex mu; + std::condition_variable cv; + bool done = false; + Status status; + stub->async()->SayHello(&context, &request, &reply, + [&mu, &cv, &done, &status](Status s) { + status = std::move(s); + std::lock_guard lock(mu); + done = true; + cv.notify_one(); + }); + + std::unique_lock lock(mu); + while (!done) { + cv.wait(lock); + } + + // Act upon its status. + std::cout << "[" << label << "] wanted = " << expected_code + << ", got = " << status.error_code() << std::endl; +} + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + // Instantiate the client. It requires a channel, out of which the actual RPCs + // are created. This channel models a connection to an endpoint specified by + // the argument "--target=" which is the only expected argument. + std::string target_str = absl::GetFlag(FLAGS_target); + // We indicate that the channel isn't authenticated (use of + // InsecureChannelCredentials()). + std::shared_ptr channel = + grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()); + // Making test calls + unaryCall(channel, "Successful request", "world", grpc::StatusCode::OK); + unaryCall(channel, "Exceeds deadline", "delay", + grpc::StatusCode::DEADLINE_EXCEEDED); + unaryCall(channel, "Successful request with propagated deadline", + "[propagate me]world", grpc::StatusCode::OK); + unaryCall(channel, "Exceeds propagated deadline", + "[propagate me][propagate me]world", + grpc::StatusCode::DEADLINE_EXCEEDED); + return 0; +} diff --git a/examples/cpp/deadline/server.cc b/examples/cpp/deadline/server.cc new file mode 100644 index 00000000000..eae5c0e1d08 --- /dev/null +++ b/examples/cpp/deadline/server.cc @@ -0,0 +1,130 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/strings/match.h" +#include "absl/strings/str_format.h" + +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#endif + +ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); + +using grpc::CallbackServerContext; +using grpc::Channel; +using grpc::ClientContext; + +using grpc::Server; +using grpc::ServerBidiReactor; +using grpc::ServerBuilder; +using grpc::ServerUnaryReactor; +using grpc::Status; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +// Logic behind the server's behavior. +class GreeterServiceImpl final : public Greeter::CallbackService { + public: + GreeterServiceImpl(const std::string& self_address) { + self_channel_ = + grpc::CreateChannel(self_address, grpc::InsecureChannelCredentials()); + } + + private: + ServerUnaryReactor* SayHello(CallbackServerContext* context, + const HelloRequest* request, + HelloReply* reply) override { + if (absl::StartsWith(request->name(), "[propagate me]")) { + std::unique_ptr stub = Greeter::NewStub(self_channel_); + std::this_thread::sleep_for(std::chrono::milliseconds(800)); + // Forwarding this call to the self as a different call + HelloRequest new_request; + new_request.set_name(request->name().substr(14)); + std::unique_ptr new_context = + ClientContext::FromCallbackServerContext(*context); + std::mutex mu; + std::condition_variable cv; + bool done = false; + Status status; + stub->async()->SayHello(new_context.get(), &new_request, reply, + [&mu, &cv, &done, &status](Status s) { + status = std::move(s); + std::lock_guard lock(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock lock(mu); + while (!done) { + cv.wait(lock); + } + ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(status); + return reactor; + } + + if (request->name() == "delay") { + // Intentionally delay for 1.5 seconds so that + // the client will see deadline_exceeded. + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); + } + + reply->set_message(request->name()); + + ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(Status::OK); + return reactor; + } + + std::shared_ptr self_channel_; +}; + +void RunServer(uint16_t port) { + std::string server_address = absl::StrFormat("0.0.0.0:%d", port); + GreeterServiceImpl service(server_address); + + ServerBuilder builder; + // Listen on the given address without any authentication mechanism. + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + // Register "service" as the instance through which we'll communicate with + // clients. In this case it corresponds to an *synchronous* service. + builder.RegisterService(&service); + // Finally assemble the server. + std::unique_ptr server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + + // Wait for the server to shutdown. Note that some other thread must be + // responsible for shutting down the server for this call to ever return. + server->Wait(); +} + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + RunServer(absl::GetFlag(FLAGS_port)); + return 0; +}