From 6e94552a306e9cfcff834e39bc833bcb8055e6fe Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 10 Jan 2019 17:00:49 -0800 Subject: [PATCH] Add a caching interceptor to the keyvaluestore example --- examples/BUILD | 3 +- .../cpp/keyvaluestore/caching_interceptor.h | 128 ++++++++++++++++++ examples/cpp/keyvaluestore/client.cc | 19 ++- 3 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 examples/cpp/keyvaluestore/caching_interceptor.h diff --git a/examples/BUILD b/examples/BUILD index 4fee663bd9e..0a1ca94a649 100644 --- a/examples/BUILD +++ b/examples/BUILD @@ -101,7 +101,8 @@ cc_binary( cc_binary( name = "keyvaluestore_client", - srcs = ["cpp/keyvaluestore/client.cc"], + srcs = ["cpp/keyvaluestore/caching_interceptor.h", + "cpp/keyvaluestore/client.cc"], defines = ["BAZEL_BUILD"], deps = [":keyvaluestore", "//:grpc++"], ) diff --git a/examples/cpp/keyvaluestore/caching_interceptor.h b/examples/cpp/keyvaluestore/caching_interceptor.h new file mode 100644 index 00000000000..393212b83bb --- /dev/null +++ b/examples/cpp/keyvaluestore/caching_interceptor.h @@ -0,0 +1,128 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/keyvaluestore.grpc.pb.h" +#else +#include "keyvaluestore.grpc.pb.h" +#endif + +// This is a naive implementation of a cache. A new cache is for each call. For +// each new key request, the key is first searched in the map and if found. Only +// if the key is not found in the cache do we make a request. +class CachingInterceptor : public grpc::experimental::Interceptor { + public: + CachingInterceptor(grpc::experimental::ClientRpcInfo* info) {} + + void Intercept( + ::grpc::experimental::InterceptorBatchMethods* methods) override { + bool hijack = false; + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints:: + PRE_SEND_INITIAL_METADATA)) { + // Hijack all calls + hijack = true; + // Create a stream on which this interceptor can make requests + stub_ = keyvaluestore::KeyValueStore::NewStub( + methods->GetInterceptedChannel()); + stream_ = stub_->GetValues(&context_); + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { + // We know that clients perform a Read and a Write in a loop, so we don't + // need to maintain a list of the responses. + std::string requested_key; + const keyvaluestore::Request* req_msg = + static_cast(methods->GetSendMessage()); + if (req_msg != nullptr) { + requested_key = req_msg->key(); + } else { + // The non-serialized form would not be available in certain scenarios, + // so add a fallback + keyvaluestore::Request req_msg; + auto* buffer = methods->GetSerializedSendMessage(); + auto copied_buffer = *buffer; + GPR_ASSERT( + grpc::SerializationTraits::Deserialize( + &copied_buffer, &req_msg) + .ok()); + requested_key = req_msg.key(); + } + + // Check if the key is present in the map + auto search = cached_map_.find(requested_key); + if (search != cached_map_.end()) { + std::cout << "Key " << requested_key << "found in map"; + response_ = search->second; + } else { + std::cout << "Key " << requested_key << "not found in cache"; + // Key was not found in the cache, so make a request + keyvaluestore::Request req; + req.set_key(requested_key); + stream_->Write(req); + keyvaluestore::Response resp; + stream_->Read(&resp); + response_ = resp.value(); + // Insert the pair in the cache for future requests + cached_map_.insert({requested_key, response_}); + } + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) { + stream_->WritesDone(); + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) { + keyvaluestore::Response* resp = + static_cast(methods->GetRecvMessage()); + resp->set_value(response_); + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_RECV_STATUS)) { + auto* status = methods->GetRecvStatus(); + *status = grpc::Status::OK; + } + if (hijack) { + methods->Hijack(); + } else { + methods->Proceed(); + } + } + + private: + grpc::ClientContext context_; + std::unique_ptr stub_; + std::unique_ptr< + grpc::ClientReaderWriter> + stream_; + std::map cached_map_; + std::string response_; +}; + +class CachingInterceptorFactory + : public grpc::experimental::ClientInterceptorFactoryInterface { + public: + grpc::experimental::Interceptor* CreateClientInterceptor( + grpc::experimental::ClientRpcInfo* info) override { + return new CachingInterceptor(info); + } +}; \ No newline at end of file diff --git a/examples/cpp/keyvaluestore/client.cc b/examples/cpp/keyvaluestore/client.cc index 17e407c273b..57c451cadf3 100644 --- a/examples/cpp/keyvaluestore/client.cc +++ b/examples/cpp/keyvaluestore/client.cc @@ -23,6 +23,8 @@ #include +#include "caching_interceptor.h" + #ifdef BAZEL_BUILD #include "examples/protos/keyvaluestore.grpc.pb.h" #else @@ -77,9 +79,20 @@ int main(int argc, char** argv) { // are created. This channel models a connection to an endpoint (in this case, // localhost at port 50051). We indicate that the channel isn't authenticated // (use of InsecureChannelCredentials()). - KeyValueStoreClient client(grpc::CreateChannel( - "localhost:50051", grpc::InsecureChannelCredentials())); - std::vector keys = {"key1", "key2", "key3", "key4", "key5"}; + // In this example, we are using a cache which has been added in as an + // interceptor. + grpc::ChannelArguments args; + std::vector< + std::unique_ptr> + interceptor_creators; + interceptor_creators.push_back(std::unique_ptr( + new CachingInterceptorFactory())); + auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( + "localhost:50051", grpc::InsecureChannelCredentials(), args, + std::move(interceptor_creators)); + KeyValueStoreClient client(channel); + std::vector keys = {"key1", "key2", "key3", "key4", + "key5", "key1", "key2", "key4"}; client.GetValues(keys); return 0;