From 0010cdae47270a22fd4442835261c796ee565900 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Sun, 1 Feb 2015 20:44:33 -0800 Subject: [PATCH] Add Pull method to subscriber --- examples/tips/subscriber.cc | 23 +++++++++++++++++++++++ examples/tips/subscriber.h | 2 ++ examples/tips/subscriber_test.cc | 21 +++++++++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/examples/tips/subscriber.cc b/examples/tips/subscriber.cc index a482ad6263d..2e2370ee2da 100644 --- a/examples/tips/subscriber.cc +++ b/examples/tips/subscriber.cc @@ -81,6 +81,29 @@ Status Subscriber::GetSubscription(const grpc::string& name, return s; } +Status Subscriber::Pull(const grpc::string& name, + grpc::string* data) { + tech::pubsub::PullRequest request; + tech::pubsub::PullResponse response; + ClientContext context; + + request.set_subscription(name); + Status s = stub_->Pull(&context, request, &response); + if (s.IsOk()) { + tech::pubsub::PubsubEvent event = response.pubsub_event(); + if (event.has_message()) { + *data = event.message().data(); + } + tech::pubsub::AcknowledgeRequest ack; + proto2::Empty empty; + ClientContext ack_context; + ack.set_subscription(name); + ack.add_ack_id(response.ack_id()); + stub_->Acknowledge(&ack_context, ack, &empty); + } + return s; +} + } // namespace tips } // namespace examples } // namespace grpc diff --git a/examples/tips/subscriber.h b/examples/tips/subscriber.h index e0491ff919a..38345c0c5a9 100644 --- a/examples/tips/subscriber.h +++ b/examples/tips/subscriber.h @@ -53,6 +53,8 @@ class Subscriber { Status GetSubscription(const grpc::string& name, grpc::string* topic); + Status Pull(const grpc::string& name, grpc::string* data); + private: std::unique_ptr stub_; }; diff --git a/examples/tips/subscriber_test.cc b/examples/tips/subscriber_test.cc index 4894814252e..4ff93643ae4 100644 --- a/examples/tips/subscriber_test.cc +++ b/examples/tips/subscriber_test.cc @@ -53,6 +53,7 @@ namespace { const char kTopic[] = "test topic"; const char kSubscriptionName[] = "subscription name"; +const char kData[] = "Message data"; class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service { public: @@ -72,6 +73,21 @@ class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service { return Status::OK; } + Status Pull(ServerContext* context, + const tech::pubsub::PullRequest* request, + tech::pubsub::PullResponse* response) override { + EXPECT_EQ(request->subscription(), kSubscriptionName); + response->set_ack_id("1"); + response->mutable_pubsub_event()->mutable_message()->set_data(kData); + return Status::OK; + } + + Status Acknowledge(ServerContext* context, + const tech::pubsub::AcknowledgeRequest* request, + proto2::Empty* response) override { + return Status::OK; + } + }; class SubscriberTest : public ::testing::Test { @@ -108,10 +124,15 @@ TEST_F(SubscriberTest, TestSubscriber) { EXPECT_TRUE(subscriber_->CreateSubscription(kTopic, kSubscriptionName).IsOk()); + grpc::string topic; EXPECT_TRUE(subscriber_->GetSubscription(kSubscriptionName, &topic).IsOk()); EXPECT_EQ(topic, kTopic); + + grpc::string data; + EXPECT_TRUE(subscriber_->Pull(kSubscriptionName, + &data).IsOk()); } } // namespace