diff --git a/examples/cpp/keyvaluestore/BUILD b/examples/cpp/interceptors/BUILD similarity index 100% rename from examples/cpp/keyvaluestore/BUILD rename to examples/cpp/interceptors/BUILD diff --git a/examples/cpp/keyvaluestore/CMakeLists.txt b/examples/cpp/interceptors/CMakeLists.txt similarity index 100% rename from examples/cpp/keyvaluestore/CMakeLists.txt rename to examples/cpp/interceptors/CMakeLists.txt diff --git a/examples/cpp/interceptors/README.md b/examples/cpp/interceptors/README.md new file mode 100644 index 00000000000..26e374463be --- /dev/null +++ b/examples/cpp/interceptors/README.md @@ -0,0 +1,25 @@ +# gRPC C++ Interceptors Example + +The C++ Interceptors example shows how interceptors might be used with a simple key-value store. Note that the C++ Interception API is still experimental and subject to change. + +## Key Value Store + +The key-value store service is defined in [keyvaluestore.proto](https://github.com/grpc/grpc/blob/master/examples/protos/keyvaluestore.proto).It has a simple bidi streaming RPC where the request messages contain a key and the response messages contain a value. + +The example shows a very naive CachingInterceptor added on the client channel that caches the key-value pairs that it sees. If the client looks up a key present in the cache, the interceptor responds with its saved value and the server doesn't see the request for that key. + +On the server-side, a very simple logging interceptor is added that simply logs to stdout whenever a new RPC is received. + +## Running the example + +To run the server - + +``` +$ tools/bazel run examples/cpp/interceptors:server +``` + +To run the client (on a different terminal) - + +``` +$ tools/bazel run examples/cpp/interceptors:client +``` diff --git a/examples/cpp/keyvaluestore/caching_interceptor.h b/examples/cpp/interceptors/caching_interceptor.h similarity index 97% rename from examples/cpp/keyvaluestore/caching_interceptor.h rename to examples/cpp/interceptors/caching_interceptor.h index 5a31afe0f01..a91a48a8078 100644 --- a/examples/cpp/keyvaluestore/caching_interceptor.h +++ b/examples/cpp/interceptors/caching_interceptor.h @@ -72,10 +72,10 @@ class CachingInterceptor : public grpc::experimental::Interceptor { // Check if the key is present in the map auto search = cached_map_.find(requested_key); if (search != cached_map_.end()) { - std::cout << "Key " << requested_key << "found in map"; + std::cout << requested_key << " found in map" << std::endl; response_ = search->second; } else { - std::cout << "Key " << requested_key << "not found in cache"; + std::cout << requested_key << " not found in cache" << std::endl; // Key was not found in the cache, so make a request keyvaluestore::Request req; req.set_key(requested_key); diff --git a/examples/cpp/keyvaluestore/client.cc b/examples/cpp/interceptors/client.cc similarity index 58% rename from examples/cpp/keyvaluestore/client.cc rename to examples/cpp/interceptors/client.cc index 75c09a0af0f..9c63c86c172 100644 --- a/examples/cpp/keyvaluestore/client.cc +++ b/examples/cpp/interceptors/client.cc @@ -16,8 +16,10 @@ * */ +#include #include #include +#include #include #include @@ -38,40 +40,66 @@ using keyvaluestore::KeyValueStore; using keyvaluestore::Request; using keyvaluestore::Response; -class KeyValueStoreClient { +// Requests each key in the vector and displays the key and its corresponding +// value as a pair. +class KeyValueStoreClient : public grpc::ClientBidiReactor { public: - KeyValueStoreClient(std::shared_ptr channel) - : stub_(KeyValueStore::NewStub(channel)) {} + KeyValueStoreClient(std::shared_ptr channel, + std::vector keys) + : stub_(KeyValueStore::NewStub(channel)), keys_(std::move(keys)) { + stub_->async()->GetValues(&context_, this); + assert(!keys_.empty()); + request_.set_key(keys_[0]); + StartWrite(&request_); + StartCall(); + } - // Requests each key in the vector and displays the key and its corresponding - // value as a pair - void GetValues(const std::vector& keys) { - // Context for the client. It could be used to convey extra information to - // the server and/or tweak certain RPC behaviors. - ClientContext context; - auto stream = stub_->GetValues(&context); - for (const auto& key : keys) { - // Key we are sending to the server. - Request request; - request.set_key(key); - stream->Write(request); + void OnReadDone(bool ok) override { + if (ok) { + std::cout << request_.key() << " : " << response_.value() << std::endl; + if (++counter_ < keys_.size()) { + request_.set_key(keys_[counter_]); + StartWrite(&request_); + } else { + StartWritesDone(); + } + } + } - // Get the value for the sent key - Response response; - stream->Read(&response); - std::cout << key << " : " << response.value() << "\n"; + void OnWriteDone(bool ok) override { + if (ok) { + StartRead(&response_); } - stream->WritesDone(); - Status status = stream->Finish(); + } + + void OnDone(const grpc::Status& status) override { if (!status.ok()) { std::cout << status.error_code() << ": " << status.error_message() << std::endl; std::cout << "RPC failed"; } + std::unique_lock l(mu_); + done_ = true; + cv_.notify_all(); + } + + void Await() { + std::unique_lock l(mu_); + while (!done_) { + cv_.wait(l); + } } private: std::unique_ptr stub_; + std::vector keys_; + size_t counter_ = 0; + ClientContext context_; + bool done_ = false; + Request request_; + Response response_; + std::mutex mu_; + std::condition_variable cv_; }; int main(int argc, char** argv) { @@ -85,15 +113,13 @@ int main(int argc, char** argv) { std::vector< std::unique_ptr> interceptor_creators; - interceptor_creators.push_back(std::unique_ptr( - new CachingInterceptorFactory())); + interceptor_creators.push_back(std::make_unique()); auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( "localhost:50051", grpc::InsecureChannelCredentials(), args, std::move(interceptor_creators)); - KeyValueStoreClient client(channel); std::vector keys = {"key1", "key2", "key3", "key4", "key5", "key1", "key2", "key4"}; - client.GetValues(keys); - + KeyValueStoreClient client(channel, keys); + client.Await(); return 0; } diff --git a/examples/cpp/keyvaluestore/server.cc b/examples/cpp/interceptors/server.cc similarity index 51% rename from examples/cpp/keyvaluestore/server.cc rename to examples/cpp/interceptors/server.cc index e75da9c62d1..e865bbe02fe 100644 --- a/examples/cpp/keyvaluestore/server.cc +++ b/examples/cpp/interceptors/server.cc @@ -22,6 +22,7 @@ #include #include +#include #ifdef BAZEL_BUILD #include "examples/protos/keyvaluestore.grpc.pb.h" @@ -29,15 +30,40 @@ #include "keyvaluestore.grpc.pb.h" #endif +using grpc::CallbackServerContext; using grpc::Server; +using grpc::ServerBidiReactor; using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::ServerReaderWriter; using grpc::Status; +using grpc::experimental::InterceptionHookPoints; +using grpc::experimental::Interceptor; +using grpc::experimental::InterceptorBatchMethods; +using grpc::experimental::ServerInterceptorFactoryInterface; +using grpc::experimental::ServerRpcInfo; using keyvaluestore::KeyValueStore; using keyvaluestore::Request; using keyvaluestore::Response; +// This is a simple interceptor that logs whenever it gets a request, which on +// the server side happens when initial metadata is received. +class LoggingInterceptor : public Interceptor { + public: + void Intercept(InterceptorBatchMethods* methods) override { + if (methods->QueryInterceptionHookPoint( + InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) { + std::cout << "Got a new streaming RPC" << std::endl; + } + methods->Proceed(); + } +}; + +class LoggingInterceptorFactory : public ServerInterceptorFactoryInterface { + public: + Interceptor* CreateServerInterceptor(ServerRpcInfo* info) override { + return new LoggingInterceptor(); + } +}; + struct kv_pair { const char* key; const char* value; @@ -57,17 +83,37 @@ const char* get_value_from_map(const char* key) { return ""; } -// Logic and data behind the server's behavior. -class KeyValueStoreServiceImpl final : public KeyValueStore::Service { - Status GetValues(ServerContext* context, - ServerReaderWriter* stream) override { - Request request; - while (stream->Read(&request)) { - Response response; - response.set_value(get_value_from_map(request.key().c_str())); - stream->Write(response); - } - return Status::OK; +// Logic behind the server's behavior. +class KeyValueStoreServiceImpl final : public KeyValueStore::CallbackService { + ServerBidiReactor* GetValues( + CallbackServerContext* context) override { + class Reactor : public ServerBidiReactor { + public: + explicit Reactor() { StartRead(&request_); } + + void OnReadDone(bool ok) override { + if (!ok) { + return Finish(grpc::Status::CANCELLED); + } + response_.set_value(get_value_from_map(request_.key().c_str())); + StartWrite(&response_); + } + + void OnWriteDone(bool ok) override { + if (!ok) { + return Finish(grpc::Status::CANCELLED); + } + StartRead(&request_); + } + + void OnDone() override { delete this; } + + private: + Request request_; + Response response_; + }; + + return new Reactor(); } }; @@ -81,6 +127,10 @@ void RunServer() { // Register "service" as the instance through which we'll communicate with // clients. In this case, it corresponds to an *synchronous* service. builder.RegisterService(&service); + std::vector> creators; + creators.push_back(std::unique_ptr( + new LoggingInterceptorFactory())); + builder.experimental().SetInterceptorCreators(std::move(creators)); // Finally assemble the server. std::unique_ptr server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl;