[example] Clean up the interceptors example (#33169)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/33179/head
Yash Tibrewal 2 years ago committed by GitHub
parent 6e1f46fda7
commit 57cd079310
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 0
      examples/cpp/interceptors/BUILD
  2. 0
      examples/cpp/interceptors/CMakeLists.txt
  3. 25
      examples/cpp/interceptors/README.md
  4. 4
      examples/cpp/interceptors/caching_interceptor.h
  5. 78
      examples/cpp/interceptors/client.cc
  6. 76
      examples/cpp/interceptors/server.cc

@ -0,0 +1,25 @@
# gRPC C++ Interceptors Example
The C++ Interceptors example shows how interceptors might be used with a simple key-value store. Note that the C++ Interception API is still experimental and subject to change.
## Key Value Store
The key-value store service is defined in [keyvaluestore.proto](https://github.com/grpc/grpc/blob/master/examples/protos/keyvaluestore.proto).It has a simple bidi streaming RPC where the request messages contain a key and the response messages contain a value.
The example shows a very naive CachingInterceptor added on the client channel that caches the key-value pairs that it sees. If the client looks up a key present in the cache, the interceptor responds with its saved value and the server doesn't see the request for that key.
On the server-side, a very simple logging interceptor is added that simply logs to stdout whenever a new RPC is received.
## Running the example
To run the server -
```
$ tools/bazel run examples/cpp/interceptors:server
```
To run the client (on a different terminal) -
```
$ tools/bazel run examples/cpp/interceptors:client
```

@ -72,10 +72,10 @@ class CachingInterceptor : public grpc::experimental::Interceptor {
// Check if the key is present in the map
auto search = cached_map_.find(requested_key);
if (search != cached_map_.end()) {
std::cout << "Key " << requested_key << "found in map";
std::cout << requested_key << " found in map" << std::endl;
response_ = search->second;
} else {
std::cout << "Key " << requested_key << "not found in cache";
std::cout << requested_key << " not found in cache" << std::endl;
// Key was not found in the cache, so make a request
keyvaluestore::Request req;
req.set_key(requested_key);

@ -16,8 +16,10 @@
*
*/
#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
@ -38,40 +40,66 @@ using keyvaluestore::KeyValueStore;
using keyvaluestore::Request;
using keyvaluestore::Response;
class KeyValueStoreClient {
// Requests each key in the vector and displays the key and its corresponding
// value as a pair.
class KeyValueStoreClient : public grpc::ClientBidiReactor<Request, Response> {
public:
KeyValueStoreClient(std::shared_ptr<Channel> channel)
: stub_(KeyValueStore::NewStub(channel)) {}
KeyValueStoreClient(std::shared_ptr<Channel> channel,
std::vector<std::string> keys)
: stub_(KeyValueStore::NewStub(channel)), keys_(std::move(keys)) {
stub_->async()->GetValues(&context_, this);
assert(!keys_.empty());
request_.set_key(keys_[0]);
StartWrite(&request_);
StartCall();
}
// Requests each key in the vector and displays the key and its corresponding
// value as a pair
void GetValues(const std::vector<std::string>& keys) {
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
auto stream = stub_->GetValues(&context);
for (const auto& key : keys) {
// Key we are sending to the server.
Request request;
request.set_key(key);
stream->Write(request);
void OnReadDone(bool ok) override {
if (ok) {
std::cout << request_.key() << " : " << response_.value() << std::endl;
if (++counter_ < keys_.size()) {
request_.set_key(keys_[counter_]);
StartWrite(&request_);
} else {
StartWritesDone();
}
}
}
// Get the value for the sent key
Response response;
stream->Read(&response);
std::cout << key << " : " << response.value() << "\n";
void OnWriteDone(bool ok) override {
if (ok) {
StartRead(&response_);
}
stream->WritesDone();
Status status = stream->Finish();
}
void OnDone(const grpc::Status& status) override {
if (!status.ok()) {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
std::cout << "RPC failed";
}
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_all();
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
}
private:
std::unique_ptr<KeyValueStore::Stub> stub_;
std::vector<std::string> keys_;
size_t counter_ = 0;
ClientContext context_;
bool done_ = false;
Request request_;
Response response_;
std::mutex mu_;
std::condition_variable cv_;
};
int main(int argc, char** argv) {
@ -85,15 +113,13 @@ int main(int argc, char** argv) {
std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_creators;
interceptor_creators.push_back(std::unique_ptr<CachingInterceptorFactory>(
new CachingInterceptorFactory()));
interceptor_creators.push_back(std::make_unique<CachingInterceptorFactory>());
auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
"localhost:50051", grpc::InsecureChannelCredentials(), args,
std::move(interceptor_creators));
KeyValueStoreClient client(channel);
std::vector<std::string> keys = {"key1", "key2", "key3", "key4",
"key5", "key1", "key2", "key4"};
client.GetValues(keys);
KeyValueStoreClient client(channel, keys);
client.Await();
return 0;
}

@ -22,6 +22,7 @@
#include <vector>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/server_interceptor.h>
#ifdef BAZEL_BUILD
#include "examples/protos/keyvaluestore.grpc.pb.h"
@ -29,15 +30,40 @@
#include "keyvaluestore.grpc.pb.h"
#endif
using grpc::CallbackServerContext;
using grpc::Server;
using grpc::ServerBidiReactor;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerReaderWriter;
using grpc::Status;
using grpc::experimental::InterceptionHookPoints;
using grpc::experimental::Interceptor;
using grpc::experimental::InterceptorBatchMethods;
using grpc::experimental::ServerInterceptorFactoryInterface;
using grpc::experimental::ServerRpcInfo;
using keyvaluestore::KeyValueStore;
using keyvaluestore::Request;
using keyvaluestore::Response;
// This is a simple interceptor that logs whenever it gets a request, which on
// the server side happens when initial metadata is received.
class LoggingInterceptor : public Interceptor {
public:
void Intercept(InterceptorBatchMethods* methods) override {
if (methods->QueryInterceptionHookPoint(
InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) {
std::cout << "Got a new streaming RPC" << std::endl;
}
methods->Proceed();
}
};
class LoggingInterceptorFactory : public ServerInterceptorFactoryInterface {
public:
Interceptor* CreateServerInterceptor(ServerRpcInfo* info) override {
return new LoggingInterceptor();
}
};
struct kv_pair {
const char* key;
const char* value;
@ -57,17 +83,37 @@ const char* get_value_from_map(const char* key) {
return "";
}
// Logic and data behind the server's behavior.
class KeyValueStoreServiceImpl final : public KeyValueStore::Service {
Status GetValues(ServerContext* context,
ServerReaderWriter<Response, Request>* stream) override {
Request request;
while (stream->Read(&request)) {
Response response;
response.set_value(get_value_from_map(request.key().c_str()));
stream->Write(response);
}
return Status::OK;
// Logic behind the server's behavior.
class KeyValueStoreServiceImpl final : public KeyValueStore::CallbackService {
ServerBidiReactor<Request, Response>* GetValues(
CallbackServerContext* context) override {
class Reactor : public ServerBidiReactor<Request, Response> {
public:
explicit Reactor() { StartRead(&request_); }
void OnReadDone(bool ok) override {
if (!ok) {
return Finish(grpc::Status::CANCELLED);
}
response_.set_value(get_value_from_map(request_.key().c_str()));
StartWrite(&response_);
}
void OnWriteDone(bool ok) override {
if (!ok) {
return Finish(grpc::Status::CANCELLED);
}
StartRead(&request_);
}
void OnDone() override { delete this; }
private:
Request request_;
Response response_;
};
return new Reactor();
}
};
@ -81,6 +127,10 @@ void RunServer() {
// Register "service" as the instance through which we'll communicate with
// clients. In this case, it corresponds to an *synchronous* service.
builder.RegisterService(&service);
std::vector<std::unique_ptr<ServerInterceptorFactoryInterface>> creators;
creators.push_back(std::unique_ptr<ServerInterceptorFactoryInterface>(
new LoggingInterceptorFactory()));
builder.experimental().SetInterceptorCreators(std::move(creators));
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
Loading…
Cancel
Save