Impelment full logic of publish and subcribe.

pull/341/head
Chen Wang 10 years ago
parent 0010cdae47
commit b532ef8973
  1. 2
      examples/tips/empty.proto
  2. 2
      examples/tips/label.proto
  3. 49
      examples/tips/main.cc
  4. 9
      examples/tips/publisher.cc
  5. 10
      examples/tips/publisher.h
  6. 2
      examples/tips/pubsub.proto
  7. 19
      examples/tips/subscriber.cc
  8. 12
      examples/tips/subscriber.h
  9. 10
      examples/tips/subscriber_test.cc

@ -1,3 +1,5 @@
// This file will be moved to a new location.
syntax = "proto2";
package proto2;

@ -1,3 +1,5 @@
// This file will be moved to a new location.
// Labels provide a way to associate user-defined metadata with various
// objects. Labels may be used to organize objects into non-hierarchical
// groups; think metadata tags attached to mp3s.

@ -53,6 +53,7 @@
DEFINE_int32(server_port, 443, "Server port.");
DEFINE_string(server_host,
"pubsub-staging.googleapis.com", "Server host to connect to");
DEFINE_string(project_id, "", "GCE project id such as stoked-keyword-656");
DEFINE_string(service_account_key_file, "",
"Path to service account json key file.");
DEFINE_string(oauth_scope,
@ -61,9 +62,9 @@ DEFINE_string(oauth_scope,
namespace {
const char kTopic[] = "/topics/stoked-keyword-656/testtopics";
const char kSubscriptionName[] = "stoked-keyword-656/testsubscription";
const char kMessageData[] = "Message Data";
const char kTopic[] = "testtopics";
const char kSubscriptionName[] = "testsubscription";
const char kMessageData[] = "Test Data";
} // namespace
@ -83,10 +84,7 @@ int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
gpr_log(GPR_INFO, "Start TIPS client");
const int host_port_buf_size = 1024;
char host_port[host_port_buf_size];
snprintf(host_port, host_port_buf_size, "%s:%d", FLAGS_server_host.c_str(),
FLAGS_server_port);
std::ostringstream ss;
std::unique_ptr<grpc::Credentials> creds;
if (FLAGS_service_account_key_file != "") {
@ -97,9 +95,10 @@ int main(int argc, char** argv) {
creds = grpc::CredentialsFactory::ComputeEngineCredentials();
}
ss << FLAGS_server_host << ":" << FLAGS_server_port;
std::shared_ptr<grpc::ChannelInterface> channel(
grpc::CreateTestChannel(
host_port,
ss.str(),
FLAGS_server_host,
true, // enable SSL
true, // use prod roots
@ -108,8 +107,21 @@ int main(int argc, char** argv) {
grpc::examples::tips::Publisher publisher(channel);
grpc::examples::tips::Subscriber subscriber(channel);
grpc::string topic = kTopic;
GPR_ASSERT(FLAGS_project_id != "");
ss.str("");
ss << "/topics/" << FLAGS_project_id << "/" << kTopic;
grpc::string topic = ss.str();
ss.str("");
ss << FLAGS_project_id << "/" << kSubscriptionName;
grpc::string subscription_name = ss.str();
// Clean up test topic and subcription.
grpc::string subscription_topic;
if (subscriber.GetSubscription(
subscription_name, &subscription_topic).IsOk()) {
subscriber.DeleteSubscription(subscription_name);
}
if (publisher.GetTopic(topic).IsOk()) publisher.DeleteTopic(topic);
grpc::Status s = publisher.CreateTopic(topic);
@ -122,17 +134,26 @@ int main(int argc, char** argv) {
s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());
s = publisher.Publish(topic, kMessageData);
gpr_log(GPR_INFO, "Publish returns code %d, %s",
s = subscriber.CreateSubscription(topic, subscription_name);
gpr_log(GPR_INFO, "create subscrption returns code %d, %s",
s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());
s = subscriber.CreateSubscription(kTopic, kSubscriptionName);
gpr_log(GPR_INFO, "create subscrption returns code %d, %s",
s = publisher.Publish(topic, kMessageData);
gpr_log(GPR_INFO, "Publish %s returns code %d, %s",
kMessageData, s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());
grpc::string data;
s = subscriber.Pull(subscription_name, &data);
gpr_log(GPR_INFO, "Pull %s", data.c_str());
s = subscriber.DeleteSubscription(subscription_name);
gpr_log(GPR_INFO, "Delete subscription returns code %d, %s",
s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());
s = publisher.DeleteTopic(kTopic);
s = publisher.DeleteTopic(topic);
gpr_log(GPR_INFO, "Delete topic returns code %d, %s",
s.code(), s.details().c_str());
GPR_ASSERT(s.IsOk());

@ -56,7 +56,7 @@ void Publisher::Shutdown() {
stub_.reset();
}
Status Publisher::CreateTopic(grpc::string topic) {
Status Publisher::CreateTopic(const string& topic) {
Topic request;
Topic response;
request.set_name(topic);
@ -73,7 +73,7 @@ Status Publisher::ListTopics() {
return stub_->ListTopics(&context, request, &response);
}
Status Publisher::GetTopic(grpc::string topic) {
Status Publisher::GetTopic(const string& topic) {
GetTopicRequest request;
Topic response;
ClientContext context;
@ -83,7 +83,7 @@ Status Publisher::GetTopic(grpc::string topic) {
return stub_->GetTopic(&context, request, &response);
}
Status Publisher::DeleteTopic(grpc::string topic) {
Status Publisher::DeleteTopic(const string& topic) {
DeleteTopicRequest request;
proto2::Empty response;
ClientContext context;
@ -93,8 +93,7 @@ Status Publisher::DeleteTopic(grpc::string topic) {
return stub_->DeleteTopic(&context, request, &response);
}
Status Publisher::Publish(const grpc::string& topic,
const grpc::string& data) {
Status Publisher::Publish(const string& topic, const string& data) {
PublishRequest request;
proto2::Empty response;
ClientContext context;

@ -45,15 +45,15 @@ namespace tips {
class Publisher {
public:
Publisher(std::shared_ptr<grpc::ChannelInterface> channel);
Publisher(std::shared_ptr<ChannelInterface> channel);
void Shutdown();
Status CreateTopic(grpc::string topic);
Status GetTopic(grpc::string topic);
Status DeleteTopic(grpc::string topic);
Status CreateTopic(const string& topic);
Status GetTopic(const string& topic);
Status DeleteTopic(const string& topic);
Status ListTopics();
Status Publish(const grpc::string& topic, const grpc::string& data);
Status Publish(const string& topic, const string& data);
private:
std::unique_ptr<tech::pubsub::PublisherService::Stub> stub_;

@ -1,3 +1,5 @@
// This file will be moved to new location.
// Specification of the Pubsub API.
syntax = "proto2";

@ -56,8 +56,7 @@ void Subscriber::Shutdown() {
stub_.reset();
}
Status Subscriber::CreateSubscription(const grpc::string& topic,
const grpc::string& name) {
Status Subscriber::CreateSubscription(const string& topic, const string& name) {
tech::pubsub::Subscription request;
tech::pubsub::Subscription response;
ClientContext context;
@ -68,8 +67,7 @@ Status Subscriber::CreateSubscription(const grpc::string& topic,
return stub_->CreateSubscription(&context, request, &response);
}
Status Subscriber::GetSubscription(const grpc::string& name,
grpc::string* topic) {
Status Subscriber::GetSubscription(const string& name, string* topic) {
tech::pubsub::GetSubscriptionRequest request;
tech::pubsub::Subscription response;
ClientContext context;
@ -81,8 +79,17 @@ Status Subscriber::GetSubscription(const grpc::string& name,
return s;
}
Status Subscriber::Pull(const grpc::string& name,
grpc::string* data) {
Status Subscriber::DeleteSubscription(const string& name) {
tech::pubsub::DeleteSubscriptionRequest request;
proto2::Empty response;
ClientContext context;
request.set_subscription(name);
return stub_->DeleteSubscription(&context, request, &response);
}
Status Subscriber::Pull(const string& name, string* data) {
tech::pubsub::PullRequest request;
tech::pubsub::PullResponse response;
ClientContext context;

@ -45,15 +45,17 @@ namespace tips {
class Subscriber {
public:
Subscriber(std::shared_ptr<grpc::ChannelInterface> channel);
Subscriber(std::shared_ptr<ChannelInterface> channel);
void Shutdown();
Status CreateSubscription(const grpc::string& topic,
const grpc::string& name);
Status CreateSubscription(const string& topic,
const string& name);
Status GetSubscription(const grpc::string& name, grpc::string* topic);
Status GetSubscription(const string& name, string* topic);
Status Pull(const grpc::string& name, grpc::string* data);
Status DeleteSubscription(const string& name);
Status Pull(const string& name, string* data);
private:
std::unique_ptr<tech::pubsub::SubscriberService::Stub> stub_;

@ -73,6 +73,14 @@ class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service {
return Status::OK;
}
Status DeleteSubscription(
ServerContext* context,
const tech::pubsub::DeleteSubscriptionRequest* request,
proto2::Empty* response) override {
EXPECT_EQ(request->subscription(), kSubscriptionName);
return Status::OK;
}
Status Pull(ServerContext* context,
const tech::pubsub::PullRequest* request,
tech::pubsub::PullResponse* response) override {
@ -133,6 +141,8 @@ TEST_F(SubscriberTest, TestSubscriber) {
grpc::string data;
EXPECT_TRUE(subscriber_->Pull(kSubscriptionName,
&data).IsOk());
EXPECT_TRUE(subscriber_->DeleteSubscription(kSubscriptionName).IsOk());
}
} // namespace

Loading…
Cancel
Save