mirror of https://github.com/grpc/grpc.git
[Example] Added gRPC C++ cancellation example (#34239)
parent
2c574ed771
commit
d72ce236d8
6 changed files with 372 additions and 0 deletions
@ -0,0 +1,39 @@ |
||||
# Copyright 2023 the gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
licenses(["notice"]) |
||||
|
||||
cc_binary( |
||||
name = "client", |
||||
srcs = ["client.cc"], |
||||
defines = ["BAZEL_BUILD"], |
||||
deps = [ |
||||
"//:grpc++", |
||||
"//examples/protos:helloworld_cc_grpc", |
||||
"@com_google_absl//absl/flags:flag", |
||||
"@com_google_absl//absl/flags:parse", |
||||
], |
||||
) |
||||
|
||||
cc_binary( |
||||
name = "server", |
||||
srcs = ["server.cc"], |
||||
defines = ["BAZEL_BUILD"], |
||||
deps = [ |
||||
"//:grpc++", |
||||
"//examples/protos:helloworld_cc_grpc", |
||||
"@com_google_absl//absl/flags:flag", |
||||
"@com_google_absl//absl/flags:parse", |
||||
], |
||||
) |
@ -0,0 +1,71 @@ |
||||
# Copyright 2023 the gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
# |
||||
# cmake build file for C++ keyvaluestore example. |
||||
# Assumes protobuf and gRPC have been installed using cmake. |
||||
# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build |
||||
# that automatically builds all the dependencies before building keyvaluestore. |
||||
|
||||
cmake_minimum_required(VERSION 3.8) |
||||
|
||||
project(Cancellation C CXX) |
||||
|
||||
include(../cmake/common.cmake) |
||||
|
||||
# Proto files |
||||
get_filename_component(hw_proto "../../protos/helloworld.proto" ABSOLUTE) |
||||
get_filename_component(hw_proto_path "${hw_proto}" PATH) |
||||
|
||||
# Generated sources |
||||
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc") |
||||
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h") |
||||
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc") |
||||
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h") |
||||
add_custom_command( |
||||
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}" |
||||
COMMAND ${_PROTOBUF_PROTOC} |
||||
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" |
||||
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}" |
||||
-I "${hw_proto_path}" |
||||
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" |
||||
"${hw_proto}" |
||||
DEPENDS "${hw_proto}") |
||||
|
||||
# Include generated *.pb.h files |
||||
include_directories("${CMAKE_CURRENT_BINARY_DIR}") |
||||
|
||||
# hw_grpc_proto |
||||
add_library(hw_grpc_proto |
||||
${hw_grpc_srcs} |
||||
${hw_grpc_hdrs} |
||||
${hw_proto_srcs} |
||||
${hw_proto_hdrs}) |
||||
target_link_libraries(hw_grpc_proto |
||||
${_REFLECTION} |
||||
${_GRPC_GRPCPP} |
||||
${_PROTOBUF_LIBPROTOBUF}) |
||||
|
||||
# Targets greeter_(client|server) |
||||
foreach(_target |
||||
client server) |
||||
add_executable(${_target} "${_target}.cc") |
||||
target_link_libraries(${_target} |
||||
hw_grpc_proto |
||||
absl::flags |
||||
absl::flags_parse |
||||
absl::strings |
||||
${_REFLECTION} |
||||
${_GRPC_GRPCPP} |
||||
${_PROTOBUF_LIBPROTOBUF}) |
||||
endforeach() |
@ -0,0 +1,37 @@ |
||||
# Cancellation Example |
||||
|
||||
## Overview |
||||
|
||||
This example shows you how to cancel from the client and how to get informed on the server and the client. |
||||
|
||||
### Try it! |
||||
|
||||
Once you have working gRPC, you can build this example using either bazel or cmake. |
||||
|
||||
Run the server, which will listen on port 50051: |
||||
|
||||
```sh |
||||
$ ./server |
||||
``` |
||||
|
||||
Run the client (in a different terminal): |
||||
|
||||
```sh |
||||
$ ./client |
||||
``` |
||||
|
||||
If things go smoothly, you will see the client output: |
||||
|
||||
``` |
||||
Begin : Begin Ack |
||||
Count 1 : Count 1 Ack |
||||
Count 2 : Count 2 Ack |
||||
Count 3 : Count 3 Ack |
||||
Count 4 : Count 4 Ack |
||||
Count 5 : Count 5 Ack |
||||
Count 6 : Count 6 Ack |
||||
Count 7 : Count 7 Ack |
||||
Count 8 : Count 8 Ack |
||||
Count 9 : Count 9 Ack |
||||
RPC Cancelled! |
||||
``` |
@ -0,0 +1,119 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <condition_variable> |
||||
#include <iostream> |
||||
#include <memory> |
||||
#include <mutex> |
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "absl/flags/flag.h" |
||||
#include "absl/flags/parse.h" |
||||
#include "absl/strings/str_cat.h" |
||||
|
||||
#include <grpcpp/grpcpp.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/protos/helloworld.grpc.pb.h" |
||||
#else |
||||
#include "helloworld.grpc.pb.h" |
||||
#endif |
||||
|
||||
ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); |
||||
|
||||
using grpc::Channel; |
||||
using grpc::ClientContext; |
||||
using grpc::Status; |
||||
using grpc::StatusCode; |
||||
using helloworld::Greeter; |
||||
using helloworld::HelloReply; |
||||
using helloworld::HelloRequest; |
||||
|
||||
// Requests each key in the vector and displays the key and its corresponding
|
||||
// value as a pair.
|
||||
class KeyValueStoreClient |
||||
: public grpc::ClientBidiReactor<HelloRequest, HelloReply> { |
||||
public: |
||||
KeyValueStoreClient(std::shared_ptr<Channel> channel) |
||||
: stub_(Greeter::NewStub(channel)) { |
||||
stub_->async()->SayHelloBidiStream(&context_, this); |
||||
request_.set_name("Begin"); |
||||
StartWrite(&request_); |
||||
StartCall(); |
||||
} |
||||
|
||||
void OnReadDone(bool ok) override { |
||||
if (ok) { |
||||
std::cout << request_.name() << " : " << response_.message() << std::endl; |
||||
if (++counter_ < 10) { |
||||
request_.set_name(absl::StrCat("Count ", counter_)); |
||||
StartWrite(&request_); |
||||
} else { |
||||
// Cancel after sending 10 messages
|
||||
context_.TryCancel(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void OnWriteDone(bool ok) override { |
||||
if (ok) { |
||||
StartRead(&response_); |
||||
} |
||||
} |
||||
|
||||
void OnDone(const grpc::Status& status) override { |
||||
if (!status.ok()) { |
||||
if (status.error_code() == StatusCode::CANCELLED) { |
||||
// Eventually client will know here that call is cancelled.
|
||||
std::cout << "RPC Cancelled!" << std::endl; |
||||
} else { |
||||
std::cout << "RPC Failed: " << status.error_code() << ": " |
||||
<< status.error_message() << std::endl; |
||||
} |
||||
} |
||||
std::unique_lock<std::mutex> l(mu_); |
||||
done_ = true; |
||||
cv_.notify_all(); |
||||
} |
||||
|
||||
void Await() { |
||||
std::unique_lock<std::mutex> l(mu_); |
||||
while (!done_) { |
||||
cv_.wait(l); |
||||
} |
||||
} |
||||
|
||||
private: |
||||
std::unique_ptr<Greeter::Stub> stub_; |
||||
size_t counter_ = 0; |
||||
ClientContext context_; |
||||
bool done_ = false; |
||||
HelloRequest request_; |
||||
HelloReply response_; |
||||
std::mutex mu_; |
||||
std::condition_variable cv_; |
||||
}; |
||||
|
||||
int main(int argc, char** argv) { |
||||
absl::ParseCommandLine(argc, argv); |
||||
// Instantiate the client. It requires a channel, out of which the actual RPCs
|
||||
// are created. This channel models a connection to an endpoint specified by
|
||||
// the argument "--target=" which is the only expected argument.
|
||||
std::string target_str = absl::GetFlag(FLAGS_target); |
||||
KeyValueStoreClient client( |
||||
grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials())); |
||||
client.Await(); |
||||
return 0; |
||||
} |
@ -0,0 +1,104 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <iostream> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "absl/flags/flag.h" |
||||
#include "absl/flags/parse.h" |
||||
#include "absl/strings/str_format.h" |
||||
|
||||
#include <grpcpp/grpcpp.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/protos/helloworld.grpc.pb.h" |
||||
#else |
||||
#include "helloworld.grpc.pb.h" |
||||
#endif |
||||
|
||||
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); |
||||
|
||||
using grpc::CallbackServerContext; |
||||
using grpc::Server; |
||||
using grpc::ServerBidiReactor; |
||||
using grpc::ServerBuilder; |
||||
using grpc::Status; |
||||
using helloworld::Greeter; |
||||
using helloworld::HelloReply; |
||||
using helloworld::HelloRequest; |
||||
|
||||
// Logic behind the server's behavior.
|
||||
class KeyValueStoreServiceImpl final : public Greeter::CallbackService { |
||||
ServerBidiReactor<HelloRequest, HelloReply>* SayHelloBidiStream( |
||||
CallbackServerContext* context) override { |
||||
class Reactor : public ServerBidiReactor<HelloRequest, HelloReply> { |
||||
public: |
||||
explicit Reactor() { StartRead(&request_); } |
||||
|
||||
void OnReadDone(bool ok) override { |
||||
if (!ok) { |
||||
// Client cancelled it
|
||||
std::cout << "OnReadDone Cancelled!" << std::endl; |
||||
return Finish(grpc::Status::CANCELLED); |
||||
} |
||||
response_.set_message(absl::StrCat(request_.name(), " Ack")); |
||||
StartWrite(&response_); |
||||
} |
||||
|
||||
void OnWriteDone(bool ok) override { |
||||
if (!ok) { |
||||
// Client cancelled it
|
||||
std::cout << "OnWriteDone Cancelled!" << std::endl; |
||||
return Finish(grpc::Status::CANCELLED); |
||||
} |
||||
StartRead(&request_); |
||||
} |
||||
|
||||
void OnDone() override { delete this; } |
||||
|
||||
private: |
||||
HelloRequest request_; |
||||
HelloReply response_; |
||||
}; |
||||
|
||||
return new Reactor(); |
||||
} |
||||
}; |
||||
|
||||
void RunServer(uint16_t port) { |
||||
std::string server_address = absl::StrFormat("0.0.0.0:%d", port); |
||||
KeyValueStoreServiceImpl service; |
||||
|
||||
ServerBuilder builder; |
||||
// Listen on the given address without any authentication mechanism.
|
||||
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); |
||||
// Register "service" as the instance through which we'll communicate with
|
||||
// clients. In this case it corresponds to an *synchronous* service.
|
||||
builder.RegisterService(&service); |
||||
// Finally assemble the server.
|
||||
std::unique_ptr<Server> server(builder.BuildAndStart()); |
||||
std::cout << "Server listening on " << server_address << std::endl; |
||||
|
||||
// Wait for the server to shutdown. Note that some other thread must be
|
||||
// responsible for shutting down the server for this call to ever return.
|
||||
server->Wait(); |
||||
} |
||||
|
||||
int main(int argc, char** argv) { |
||||
absl::ParseCommandLine(argc, argv); |
||||
RunServer(absl::GetFlag(FLAGS_port)); |
||||
return 0; |
||||
} |
Loading…
Reference in new issue