Implement both Publisher and Subscriber.

pull/341/head
Chen Wang 10 years ago
parent 6de65b01cb
commit 04f1aa809a
  1. 73
      Makefile
  2. 26
      build.json
  3. 49
      examples/tips/main.cc
  4. 30
      examples/tips/publisher.cc
  5. 14
      examples/tips/publisher.h
  6. 144
      examples/tips/publisher_test.cc
  7. 86
      examples/tips/subscriber.cc
  8. 64
      examples/tips/subscriber.h
  9. 51
      examples/tips/subscriber_test.cc
  10. 6
      tools/run_tests/tests.json

File diff suppressed because one or more lines are too long

@ -426,7 +426,8 @@
"examples/tips/label.proto",
"examples/tips/empty.proto",
"examples/tips/pubsub.proto",
"examples/tips/client.cc"
"examples/tips/publisher.cc",
"examples/tips/subscriber.cc"
],
"deps": [
"grpc++",
@ -1524,7 +1525,7 @@
"run": false,
"language": "c++",
"src": [
"examples/tips/client_main.cc"
"examples/tips/main.cc"
],
"deps": [
"tips_client_lib",
@ -1537,11 +1538,28 @@
]
},
{
"name": "tips_client_test",
"name": "tips_publisher_test",
"build": "test",
"language": "c++",
"src": [
"examples/tips/client_test.cc"
"examples/tips/publisher_test.cc"
],
"deps": [
"tips_client_lib",
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "tips_subscriber_test",
"build": "test",
"language": "c++",
"src": [
"examples/tips/subscriber_test.cc"
],
"deps": [
"tips_client_lib",

@ -46,7 +46,8 @@
#include <grpc++/credentials.h>
#include <grpc++/status.h>
#include "examples/tips/client.h"
#include "examples/tips/publisher.h"
#include "examples/tips/subscriber.h"
#include "test/cpp/util/create_test_channel.h"
DEFINE_int32(server_port, 443, "Server port.");
@ -54,7 +55,17 @@ DEFINE_string(server_host,
"pubsub-staging.googleapis.com", "Server host to connect to");
DEFINE_string(service_account_key_file, "",
"Path to service account json key file.");
DEFINE_string(oauth_scope, "", "Scope for OAuth tokens.");
DEFINE_string(oauth_scope,
"https://www.googleapis.com/auth/cloud-platform",
"Scope for OAuth tokens.");
namespace {
const char kTopic[] = "/topics/stoked-keyword-656/testtopics";
const char kSubscriptionName[] = "stoked-keyword-656/testsubscription";
const char kMessageData[] = "Message Data";
} // namespace
grpc::string GetServiceAccountJsonKey() {
static grpc::string json_key;
@ -94,20 +105,40 @@ int main(int argc, char** argv) {
true, // use prod roots
creds));
grpc::examples::tips::Client client(channel);
grpc::examples::tips::Publisher publisher(channel);
grpc::examples::tips::Subscriber subscriber(channel);
grpc::string topic = kTopic;
if (publisher.GetTopic(topic).IsOk()) publisher.DeleteTopic(topic);
grpc::Status s = publisher.CreateTopic(topic);
gpr_log(GPR_INFO, "Create topic returns code %d, %s",
s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());
s = publisher.GetTopic(topic);
gpr_log(GPR_INFO, "Get topic returns code %d, %s",
s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());
grpc::Status s = client.CreateTopic("/topics/stoked-keyword-656/testtopics");
gpr_log(GPR_INFO, "return code %d, %s", s.code(), s.details().c_str());
s = publisher.Publish(topic, kMessageData);
gpr_log(GPR_INFO, "Publish returns code %d, %s",
s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());
s = client.GetTopic("/topics/stoked-keyword-656/testtopics");
gpr_log(GPR_INFO, "return code %d, %s", s.code(), s.details().c_str());
s = subscriber.CreateSubscription(kTopic, kSubscriptionName);
gpr_log(GPR_INFO, "create subscrption returns code %d, %s",
s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());
s = client.DeleteTopic("/topics/stoked-keyword-656/testtopics");
gpr_log(GPR_INFO, "return code %d, %s", s.code(), s.details().c_str());
s = publisher.DeleteTopic(kTopic);
gpr_log(GPR_INFO, "Delete topic returns code %d, %s",
s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());
subscriber.Shutdown();
publisher.Shutdown();
channel.reset();
grpc_shutdown();
return 0;

@ -33,7 +33,7 @@
#include <grpc++/client_context.h>
#include "examples/tips/client.h"
#include "examples/tips/publisher.h"
using tech::pubsub::Topic;
using tech::pubsub::DeleteTopicRequest;
@ -41,16 +41,22 @@ using tech::pubsub::GetTopicRequest;
using tech::pubsub::PublisherService;
using tech::pubsub::ListTopicsRequest;
using tech::pubsub::ListTopicsResponse;
using tech::pubsub::PublishRequest;
using tech::pubsub::PubsubMessage;
namespace grpc {
namespace examples {
namespace tips {
Client::Client(std::shared_ptr<ChannelInterface> channel)
Publisher::Publisher(std::shared_ptr<ChannelInterface> channel)
: stub_(PublisherService::NewStub(channel)) {
}
Status Client::CreateTopic(grpc::string topic) {
void Publisher::Shutdown() {
stub_.reset();
}
Status Publisher::CreateTopic(grpc::string topic) {
Topic request;
Topic response;
request.set_name(topic);
@ -59,7 +65,7 @@ Status Client::CreateTopic(grpc::string topic) {
return stub_->CreateTopic(&context, request, &response);
}
Status Client::ListTopics() {
Status Publisher::ListTopics() {
ListTopicsRequest request;
ListTopicsResponse response;
ClientContext context;
@ -67,7 +73,7 @@ Status Client::ListTopics() {
return stub_->ListTopics(&context, request, &response);
}
Status Client::GetTopic(grpc::string topic) {
Status Publisher::GetTopic(grpc::string topic) {
GetTopicRequest request;
Topic response;
ClientContext context;
@ -77,7 +83,7 @@ Status Client::GetTopic(grpc::string topic) {
return stub_->GetTopic(&context, request, &response);
}
Status Client::DeleteTopic(grpc::string topic) {
Status Publisher::DeleteTopic(grpc::string topic) {
DeleteTopicRequest request;
proto2::Empty response;
ClientContext context;
@ -87,6 +93,18 @@ Status Client::DeleteTopic(grpc::string topic) {
return stub_->DeleteTopic(&context, request, &response);
}
Status Publisher::Publish(const grpc::string& topic,
const grpc::string& data) {
PublishRequest request;
proto2::Empty response;
ClientContext context;
request.mutable_message()->set_data(data);
request.set_topic(topic);
return stub_->Publish(&context, request, &response);
}
} // namespace tips
} // namespace examples
} // namespace grpc

@ -31,8 +31,8 @@
*
*/
#ifndef __GRPCPP_EXAMPLES_TIPS_CLIENT_H_
#define __GRPCPP_EXAMPLES_TIPS_CLIENT_H_
#ifndef __GRPCPP_EXAMPLES_TIPS_PUBLISHER_H_
#define __GRPCPP_EXAMPLES_TIPS_PUBLISHER_H_
#include <grpc++/channel_interface.h>
#include <grpc++/status.h>
@ -43,14 +43,18 @@ namespace grpc {
namespace examples {
namespace tips {
class Client {
class Publisher {
public:
Client(std::shared_ptr<grpc::ChannelInterface> channel);
Publisher(std::shared_ptr<grpc::ChannelInterface> channel);
void Shutdown();
Status CreateTopic(grpc::string topic);
Status GetTopic(grpc::string topic);
Status DeleteTopic(grpc::string topic);
Status ListTopics();
Status Publish(const grpc::string& topic, const grpc::string& data);
private:
std::unique_ptr<tech::pubsub::PublisherService::Stub> stub_;
};
@ -59,4 +63,4 @@ class Client {
} // namespace examples
} // namespace grpc
#endif // __GRPCPP_EXAMPLES_TIPS_CLIENT_H_
#endif // __GRPCPP_EXAMPLES_TIPS_PUBLISHER_H_

@ -0,0 +1,144 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/status.h>
#include <gtest/gtest.h>
#include "examples/tips/publisher.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
using grpc::ChannelInterface;
namespace grpc {
namespace testing {
namespace {
const char kTopic[] = "test topic";
const char kMessageData[] = "test message data";
class PublisherServiceImpl : public tech::pubsub::PublisherService::Service {
public:
Status CreateTopic(::grpc::ServerContext* context,
const ::tech::pubsub::Topic* request,
::tech::pubsub::Topic* response) override {
EXPECT_EQ(request->name(), kTopic);
return Status::OK;
}
Status Publish(ServerContext* context,
const ::tech::pubsub::PublishRequest* request,
::proto2::Empty* response) override {
EXPECT_EQ(request->message().data(), kMessageData);
return Status::OK;
}
Status GetTopic(ServerContext* context,
const ::tech::pubsub::GetTopicRequest* request,
::tech::pubsub::Topic* response) override {
EXPECT_EQ(request->topic(), kTopic);
return Status::OK;
}
Status ListTopics(ServerContext* context,
const ::tech::pubsub::ListTopicsRequest* request,
::tech::pubsub::ListTopicsResponse* response) override {
return Status::OK;
}
Status DeleteTopic(ServerContext* context,
const ::tech::pubsub::DeleteTopicRequest* request,
::proto2::Empty* response) override {
EXPECT_EQ(request->topic(), kTopic);
return Status::OK;
}
};
class PublisherTest : public ::testing::Test {
protected:
// Setup a server and a client for PublisherService.
void SetUp() override {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
ServerBuilder builder;
builder.AddPort(server_address_.str());
builder.RegisterService(service_.service());
server_ = builder.BuildAndStart();
channel_ = CreateChannel(server_address_.str(), ChannelArguments());
publisher_.reset(new grpc::examples::tips::Publisher(channel_));
}
void TearDown() override {
server_->Shutdown();
publisher_->Shutdown();
}
std::ostringstream server_address_;
std::unique_ptr<Server> server_;
PublisherServiceImpl service_;
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<grpc::examples::tips::Publisher> publisher_;
};
TEST_F(PublisherTest, TestPublisher) {
EXPECT_TRUE(publisher_->CreateTopic(kTopic).IsOk());
EXPECT_TRUE(publisher_->Publish(kTopic, kMessageData).IsOk());
EXPECT_TRUE(publisher_->GetTopic(kTopic).IsOk());
EXPECT_TRUE(publisher_->ListTopics().IsOk());
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
grpc_init();
::testing::InitGoogleTest(&argc, argv);
gpr_log(GPR_INFO, "Start test ...");
int result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}

@ -0,0 +1,86 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc++/client_context.h>
#include "examples/tips/subscriber.h"
using tech::pubsub::Topic;
using tech::pubsub::DeleteTopicRequest;
using tech::pubsub::GetTopicRequest;
using tech::pubsub::SubscriberService;
using tech::pubsub::ListTopicsRequest;
using tech::pubsub::ListTopicsResponse;
using tech::pubsub::PublishRequest;
using tech::pubsub::PubsubMessage;
namespace grpc {
namespace examples {
namespace tips {
Subscriber::Subscriber(std::shared_ptr<ChannelInterface> channel)
: stub_(SubscriberService::NewStub(channel)) {
}
void Subscriber::Shutdown() {
stub_.reset();
}
Status Subscriber::CreateSubscription(const grpc::string& topic,
const grpc::string& name) {
tech::pubsub::Subscription request;
tech::pubsub::Subscription response;
ClientContext context;
request.set_topic(topic);
request.set_name(name);
return stub_->CreateSubscription(&context, request, &response);
}
Status Subscriber::GetSubscription(const grpc::string& name,
grpc::string* topic) {
tech::pubsub::GetSubscriptionRequest request;
tech::pubsub::Subscription response;
ClientContext context;
request.set_subscription(name);
Status s = stub_->GetSubscription(&context, request, &response);
*topic = response.topic();
return s;
}
} // namespace tips
} // namespace examples
} // namespace grpc

@ -0,0 +1,64 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPCPP_EXAMPLES_TIPS_SUBSCRIBER_H_
#define __GRPCPP_EXAMPLES_TIPS_SUBSCRIBER_H_
#include <grpc++/channel_interface.h>
#include <grpc++/status.h>
#include "examples/tips/pubsub.pb.h"
namespace grpc {
namespace examples {
namespace tips {
class Subscriber {
public:
Subscriber(std::shared_ptr<grpc::ChannelInterface> channel);
void Shutdown();
Status CreateSubscription(const grpc::string& topic,
const grpc::string& name);
Status GetSubscription(const grpc::string& name, grpc::string* topic);
private:
std::unique_ptr<tech::pubsub::SubscriberService::Stub> stub_;
};
} // namespace tips
} // namespace examples
} // namespace grpc
#endif // __GRPCPP_EXAMPLES_TIPS_SUBSCRIBER_H_

@ -41,7 +41,7 @@
#include <grpc++/status.h>
#include <gtest/gtest.h>
#include "examples/tips/client.h"
#include "examples/tips/subscriber.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@ -52,43 +52,66 @@ namespace testing {
namespace {
const char kTopic[] = "test topic";
const char kSubscriptionName[] = "subscription name";
class PublishServiceImpl : public tech::pubsub::PublisherService::Service {
class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service {
public:
Status CreateTopic(::grpc::ServerContext* context,
const ::tech::pubsub::Topic* request,
::tech::pubsub::Topic* response) override {
EXPECT_EQ(request->name(), kTopic);
Status CreateSubscription(ServerContext* context,
const tech::pubsub::Subscription* request,
tech::pubsub::Subscription* response) override {
EXPECT_EQ(request->topic(), kTopic);
EXPECT_EQ(request->name(), kSubscriptionName);
return Status::OK;
}
Status GetSubscription(ServerContext* context,
const tech::pubsub::GetSubscriptionRequest* request,
tech::pubsub::Subscription* response) override {
EXPECT_EQ(request->subscription(), kSubscriptionName);
response->set_topic(kTopic);
return Status::OK;
}
};
class End2endTest : public ::testing::Test {
class SubscriberTest : public ::testing::Test {
protected:
// Setup a server and a client for SubscriberService.
void SetUp() override {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
ServerBuilder builder;
builder.AddPort(server_address_.str());
builder.RegisterService(service_.service());
server_ = builder.BuildAndStart();
channel_ = CreateChannel(server_address_.str(), ChannelArguments());
subscriber_.reset(new grpc::examples::tips::Subscriber(channel_));
}
void TearDown() override { server_->Shutdown(); }
void TearDown() override {
server_->Shutdown();
subscriber_->Shutdown();
}
std::unique_ptr<Server> server_;
std::ostringstream server_address_;
PublishServiceImpl service_;
std::unique_ptr<Server> server_;
SubscriberServiceImpl service_;
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<grpc::examples::tips::Subscriber> subscriber_;
};
TEST_F(End2endTest, CreateTopic) {
grpc::examples::tips::Client client(channel_);
client.CreateTopic(kTopic);
TEST_F(SubscriberTest, TestSubscriber) {
EXPECT_TRUE(subscriber_->CreateSubscription(kTopic,
kSubscriptionName).IsOk());
grpc::string topic;
EXPECT_TRUE(subscriber_->GetSubscription(kSubscriptionName,
&topic).IsOk());
EXPECT_EQ(topic, kTopic);
}
} // namespace

@ -263,7 +263,11 @@
},
{
"language": "c++",
"name": "tips_client_test"
"name": "tips_publisher_test"
},
{
"language": "c++",
"name": "tips_subscriber_test"
},
{
"language": "c++",

Loading…
Cancel
Save