Merge branch 'vjpai-qps-stream' into qps-stream

pull/1113/head
Vijay Pai 10 years ago
commit 96d3b8af32
  1. 44
      examples/pubsub/main.cc
  2. 7
      examples/pubsub/publisher.cc
  3. 31
      examples/pubsub/publisher_test.cc
  4. 7
      examples/pubsub/subscriber.cc
  5. 19
      examples/pubsub/subscriber_test.cc
  6. 22
      src/compiler/python_generator.cc
  7. 48
      src/compiler/ruby_generator.cc
  8. 10
      src/compiler/ruby_generator.h
  9. 12
      src/compiler/ruby_generator_helpers-inl.h
  10. 13
      src/compiler/ruby_generator_map-inl.h
  11. 43
      src/compiler/ruby_generator_string-inl.h
  12. 30
      src/compiler/ruby_plugin.cc
  13. 34
      src/core/iomgr/iocp_windows.c
  14. 1
      src/core/iomgr/iocp_windows.h
  15. 11
      src/core/iomgr/iomgr.c
  16. 7
      src/core/iomgr/socket_windows.c
  17. 6
      src/core/iomgr/socket_windows.h
  18. 4
      src/core/tsi/ssl_transport_security.c
  19. 2
      src/cpp/client/insecure_credentials.cc
  20. 2
      src/cpp/client/secure_credentials.cc
  21. 2
      src/cpp/common/call.cc
  22. 9
      src/cpp/server/secure_server_credentials.cc
  23. 4
      src/cpp/server/server.cc
  24. 18
      src/cpp/server/server_builder.cc
  25. 4
      src/cpp/server/thread_pool.cc
  26. 136
      src/node/examples/qps_test.js
  27. 8
      src/node/index.js
  28. 53
      src/node/src/client.js
  29. 23
      src/node/src/common.js
  30. 99
      src/node/src/server.js
  31. 53
      src/node/test/surface_test.js
  32. 531
      src/php/ext/grpc/call.c
  33. 17
      src/php/ext/grpc/call.h
  34. 23
      src/php/ext/grpc/channel.c
  35. 170
      src/php/ext/grpc/completion_queue.c
  36. 62
      src/php/ext/grpc/completion_queue.h
  37. 2
      src/php/ext/grpc/config.m4
  38. 150
      src/php/ext/grpc/event.c
  39. 51
      src/php/ext/grpc/event.h
  40. 36
      src/php/ext/grpc/php_grpc.c
  41. 76
      src/php/ext/grpc/server.c
  42. 1
      src/php/ext/grpc/server.h
  43. 31
      src/php/lib/Grpc/ActiveCall.php
  44. 62
      src/php/tests/unit_tests/CallTest.php
  45. 46
      src/php/tests/unit_tests/CompletionQueueTest.php
  46. 186
      src/php/tests/unit_tests/EndToEndTest.php
  47. 197
      src/php/tests/unit_tests/SecureEndToEndTest.php
  48. 53
      src/ruby/Rakefile
  49. 17
      src/ruby/lib/grpc/generic/service.rb
  50. 4
      src/ruby/spec/generic/active_call_spec.rb
  51. 8
      src/ruby/spec/generic/client_stub_spec.rb
  52. 42
      src/ruby/spec/generic/rpc_server_spec.rb
  53. 10
      test/core/tsi/transport_security_test.c
  54. 1
      test/cpp/end2end/async_end2end_test.cc
  55. 1
      test/cpp/end2end/end2end_test.cc
  56. 5
      test/cpp/end2end/generic_end2end_test.cc
  57. 11
      test/cpp/qps/client_async.cc
  58. 44
      test/cpp/qps/driver.cc
  59. 4
      test/cpp/qps/server_async.cc
  60. 4
      test/cpp/qps/stats.h
  61. 2
      tools/dockerfile/grpc_java_base/Dockerfile

@ -51,14 +51,14 @@
#include "examples/pubsub/subscriber.h"
DEFINE_int32(server_port, 443, "Server port.");
DEFINE_string(server_host,
"pubsub-staging.googleapis.com", "Server host to connect to");
DEFINE_string(server_host, "pubsub-staging.googleapis.com",
"Server host to connect to");
DEFINE_string(project_id, "", "GCE project id such as stoked-keyword-656");
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google { }
namespace gflags { }
namespace google {}
namespace gflags {}
using namespace google;
using namespace gflags;
@ -92,32 +92,32 @@ int main(int argc, char** argv) {
grpc::string topic = ss.str();
ss.str("");
ss << FLAGS_project_id << "/" << kSubscriptionName;
ss << FLAGS_project_id << "/" << kSubscriptionName;
grpc::string subscription_name = ss.str();
// Clean up test topic and subcription if they exist before.
grpc::string subscription_topic;
if (subscriber.GetSubscription(
subscription_name, &subscription_topic).IsOk()) {
if (subscriber.GetSubscription(subscription_name, &subscription_topic)
.IsOk()) {
subscriber.DeleteSubscription(subscription_name);
}
if (publisher.GetTopic(topic).IsOk()) publisher.DeleteTopic(topic);
grpc::Status s = publisher.CreateTopic(topic);
gpr_log(GPR_INFO, "Create topic returns code %d, %s",
s.code(), s.details().c_str());
gpr_log(GPR_INFO, "Create topic returns code %d, %s", s.code(),
s.details().c_str());
GPR_ASSERT(s.IsOk());
s = publisher.GetTopic(topic);
gpr_log(GPR_INFO, "Get topic returns code %d, %s",
s.code(), s.details().c_str());
gpr_log(GPR_INFO, "Get topic returns code %d, %s", s.code(),
s.details().c_str());
GPR_ASSERT(s.IsOk());
std::vector<grpc::string> topics;
s = publisher.ListTopics(FLAGS_project_id, &topics);
gpr_log(GPR_INFO, "List topic returns code %d, %s",
s.code(), s.details().c_str());
gpr_log(GPR_INFO, "List topic returns code %d, %s", s.code(),
s.details().c_str());
bool topic_found = false;
for (unsigned int i = 0; i < topics.size(); i++) {
if (topics[i] == topic) topic_found = true;
@ -127,27 +127,27 @@ int main(int argc, char** argv) {
GPR_ASSERT(topic_found);
s = subscriber.CreateSubscription(topic, subscription_name);
gpr_log(GPR_INFO, "create subscrption returns code %d, %s",
s.code(), s.details().c_str());
gpr_log(GPR_INFO, "create subscrption returns code %d, %s", s.code(),
s.details().c_str());
GPR_ASSERT(s.IsOk());
s = publisher.Publish(topic, kMessageData);
gpr_log(GPR_INFO, "Publish %s returns code %d, %s",
kMessageData, s.code(), s.details().c_str());
gpr_log(GPR_INFO, "Publish %s returns code %d, %s", kMessageData, s.code(),
s.details().c_str());
GPR_ASSERT(s.IsOk());
grpc::string data;
s = subscriber.Pull(subscription_name, &data);
gpr_log(GPR_INFO, "Pull %s", data.c_str());
s = subscriber.DeleteSubscription(subscription_name);
gpr_log(GPR_INFO, "Delete subscription returns code %d, %s",
s.code(), s.details().c_str());
s = subscriber.DeleteSubscription(subscription_name);
gpr_log(GPR_INFO, "Delete subscription returns code %d, %s", s.code(),
s.details().c_str());
GPR_ASSERT(s.IsOk());
s = publisher.DeleteTopic(topic);
gpr_log(GPR_INFO, "Delete topic returns code %d, %s",
s.code(), s.details().c_str());
gpr_log(GPR_INFO, "Delete topic returns code %d, %s", s.code(),
s.details().c_str());
GPR_ASSERT(s.IsOk());
subscriber.Shutdown();

@ -51,12 +51,9 @@ namespace examples {
namespace pubsub {
Publisher::Publisher(std::shared_ptr<ChannelInterface> channel)
: stub_(PublisherService::NewStub(channel)) {
}
: stub_(PublisherService::NewStub(channel)) {}
void Publisher::Shutdown() {
stub_.reset();
}
void Publisher::Shutdown() { stub_.reset(); }
Status Publisher::CreateTopic(const grpc::string& topic) {
Topic request;

@ -31,8 +31,6 @@
*
*/
#include <google/protobuf/stubs/common.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
@ -84,20 +82,19 @@ class PublisherServiceImpl : public tech::pubsub::PublisherService::Service {
Status ListTopics(
ServerContext* context, const ::tech::pubsub::ListTopicsRequest* request,
::tech::pubsub::ListTopicsResponse* response) GRPC_OVERRIDE {
std::ostringstream ss;
ss << "cloud.googleapis.com/project in (/projects/" << kProjectId << ")";
EXPECT_EQ(request->query(), ss.str());
response->add_topic()->set_name(kTopic);
return Status::OK;
}
Status DeleteTopic(ServerContext* context,
const ::tech::pubsub::DeleteTopicRequest* request,
::proto2::Empty* response) GRPC_OVERRIDE {
EXPECT_EQ(request->topic(), kTopic);
std::ostringstream ss;
ss << "cloud.googleapis.com/project in (/projects/" << kProjectId << ")";
EXPECT_EQ(request->query(), ss.str());
response->add_topic()->set_name(kTopic);
return Status::OK;
}
}
Status DeleteTopic(ServerContext* context,
const ::tech::pubsub::DeleteTopicRequest* request,
::proto2::Empty* response) GRPC_OVERRIDE {
EXPECT_EQ(request->topic(), kTopic);
return Status::OK;
}
};
class PublisherTest : public ::testing::Test {
@ -107,11 +104,13 @@ class PublisherTest : public ::testing::Test {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials());
builder.AddListeningPort(server_address_.str(),
grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
channel_ = CreateChannel(server_address_.str(), grpc::InsecureCredentials(), ChannelArguments());
channel_ = CreateChannel(server_address_.str(), grpc::InsecureCredentials(),
ChannelArguments());
publisher_.reset(new grpc::examples::pubsub::Publisher(channel_));
}

@ -49,12 +49,9 @@ namespace examples {
namespace pubsub {
Subscriber::Subscriber(std::shared_ptr<ChannelInterface> channel)
: stub_(SubscriberService::NewStub(channel)) {
}
: stub_(SubscriberService::NewStub(channel)) {}
void Subscriber::Shutdown() {
stub_.reset();
}
void Subscriber::Shutdown() { stub_.reset(); }
Status Subscriber::CreateSubscription(const grpc::string& topic,
const grpc::string& name) {

@ -31,8 +31,6 @@
*
*/
#include <google/protobuf/stubs/common.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
@ -95,7 +93,6 @@ class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service {
proto2::Empty* response) GRPC_OVERRIDE {
return Status::OK;
}
};
class SubscriberTest : public ::testing::Test {
@ -105,11 +102,13 @@ class SubscriberTest : public ::testing::Test {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials());
builder.AddListeningPort(server_address_.str(),
grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
channel_ = CreateChannel(server_address_.str(), grpc::InsecureCredentials(), ChannelArguments());
channel_ = CreateChannel(server_address_.str(), grpc::InsecureCredentials(),
ChannelArguments());
subscriber_.reset(new grpc::examples::pubsub::Subscriber(channel_));
}
@ -129,17 +128,15 @@ class SubscriberTest : public ::testing::Test {
};
TEST_F(SubscriberTest, TestSubscriber) {
EXPECT_TRUE(subscriber_->CreateSubscription(kTopic,
kSubscriptionName).IsOk());
EXPECT_TRUE(
subscriber_->CreateSubscription(kTopic, kSubscriptionName).IsOk());
grpc::string topic;
EXPECT_TRUE(subscriber_->GetSubscription(kSubscriptionName,
&topic).IsOk());
EXPECT_TRUE(subscriber_->GetSubscription(kSubscriptionName, &topic).IsOk());
EXPECT_EQ(topic, kTopic);
grpc::string data;
EXPECT_TRUE(subscriber_->Pull(kSubscriptionName,
&data).IsOk());
EXPECT_TRUE(subscriber_->Pull(kSubscriptionName, &data).IsOk());
EXPECT_TRUE(subscriber_->DeleteSubscription(kSubscriptionName).IsOk());
}

@ -309,17 +309,20 @@ bool PrintServerFactory(const grpc::string& package_qualified_service_name,
make_pair(method->name(), output_message_module_and_class));
}
out->Print("method_service_descriptions = {\n");
for (auto& name_and_description_constructor :
method_description_constructors) {
for (auto name_and_description_constructor =
method_description_constructors.begin();
name_and_description_constructor !=
method_description_constructors.end();
name_and_description_constructor++) {
IndentScope raii_descriptions_indent(out);
const grpc::string method_name = name_and_description_constructor.first;
const grpc::string method_name = name_and_description_constructor->first;
auto input_message_module_and_class =
input_message_modules_and_classes.find(method_name);
auto output_message_module_and_class =
output_message_modules_and_classes.find(method_name);
out->Print("\"$Method$\": utilities.$Constructor$(\n", "Method",
method_name, "Constructor",
name_and_description_constructor.second);
name_and_description_constructor->second);
{
IndentScope raii_description_arguments_indent(out);
out->Print("servicer.$Method$,\n", "Method", method_name);
@ -387,17 +390,20 @@ bool PrintStubFactory(const grpc::string& package_qualified_service_name,
make_pair(method->name(), output_message_module_and_class));
}
out->Print("method_invocation_descriptions = {\n");
for (auto& name_and_description_constructor :
method_description_constructors) {
for (auto name_and_description_constructor =
method_description_constructors.begin();
name_and_description_constructor !=
method_description_constructors.end();
name_and_description_constructor++) {
IndentScope raii_descriptions_indent(out);
const grpc::string method_name = name_and_description_constructor.first;
const grpc::string method_name = name_and_description_constructor->first;
auto input_message_module_and_class =
input_message_modules_and_classes.find(method_name);
auto output_message_module_and_class =
output_message_modules_and_classes.find(method_name);
out->Print("\"$Method$\": utilities.$Constructor$(\n", "Method",
method_name, "Constructor",
name_and_description_constructor.second);
name_and_description_constructor->second);
{
IndentScope raii_description_arguments_indent(out);
out->Print(

@ -32,24 +32,20 @@
*/
#include <cctype>
#include <string>
#include <map>
#include <vector>
#include "src/compiler/config.h"
#include "src/compiler/ruby_generator.h"
#include "src/compiler/ruby_generator_helpers-inl.h"
#include "src/compiler/ruby_generator_map-inl.h"
#include "src/compiler/ruby_generator_string-inl.h"
#include <google/protobuf/io/printer.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <google/protobuf/descriptor.pb.h>
#include <google/protobuf/descriptor.h>
using google::protobuf::FileDescriptor;
using google::protobuf::ServiceDescriptor;
using google::protobuf::MethodDescriptor;
using google::protobuf::io::Printer;
using google::protobuf::io::StringOutputStream;
using grpc::protobuf::FileDescriptor;
using grpc::protobuf::ServiceDescriptor;
using grpc::protobuf::MethodDescriptor;
using grpc::protobuf::io::Printer;
using grpc::protobuf::io::StringOutputStream;
using std::map;
using std::vector;
@ -57,38 +53,38 @@ namespace grpc_ruby_generator {
namespace {
// Prints out the method using the ruby gRPC DSL.
void PrintMethod(const MethodDescriptor *method, const std::string &package,
void PrintMethod(const MethodDescriptor *method, const grpc::string &package,
Printer *out) {
std::string input_type = RubyTypeOf(method->input_type()->name(), package);
grpc::string input_type = RubyTypeOf(method->input_type()->name(), package);
if (method->client_streaming()) {
input_type = "stream(" + input_type + ")";
}
std::string output_type = RubyTypeOf(method->output_type()->name(), package);
grpc::string output_type = RubyTypeOf(method->output_type()->name(), package);
if (method->server_streaming()) {
output_type = "stream(" + output_type + ")";
}
std::map<std::string, std::string> method_vars =
std::map<grpc::string, grpc::string> method_vars =
ListToDict({"mth.name", method->name(), "input.type", input_type,
"output.type", output_type, });
out->Print(method_vars, "rpc :$mth.name$, $input.type$, $output.type$\n");
}
// Prints out the service using the ruby gRPC DSL.
void PrintService(const ServiceDescriptor *service, const std::string &package,
void PrintService(const ServiceDescriptor *service, const grpc::string &package,
Printer *out) {
if (service->method_count() == 0) {
return;
}
// Begin the service module
std::map<std::string, std::string> module_vars =
std::map<grpc::string, grpc::string> module_vars =
ListToDict({"module.name", CapitalizeFirst(service->name()), });
out->Print(module_vars, "module $module.name$\n");
out->Indent();
// TODO(temiola): add documentation
std::string doc = "TODO: add proto service documentation here";
std::map<std::string, std::string> template_vars =
grpc::string doc = "TODO: add proto service documentation here";
std::map<grpc::string, grpc::string> template_vars =
ListToDict({"Documentation", doc, });
out->Print("\n");
out->Print(template_vars, "# $Documentation$\n");
@ -101,7 +97,7 @@ void PrintService(const ServiceDescriptor *service, const std::string &package,
out->Print("\n");
out->Print("self.marshal_class_method = :encode\n");
out->Print("self.unmarshal_class_method = :decode\n");
std::map<std::string, std::string> pkg_vars =
std::map<grpc::string, grpc::string> pkg_vars =
ListToDict({"service.name", service->name(), "pkg.name", package, });
out->Print(pkg_vars, "self.service_name = '$pkg.name$.$service.name$'\n");
out->Print("\n");
@ -121,8 +117,8 @@ void PrintService(const ServiceDescriptor *service, const std::string &package,
} // namespace
std::string GetServices(const FileDescriptor *file) {
std::string output;
grpc::string GetServices(const FileDescriptor *file) {
grpc::string output;
StringOutputStream output_stream(&output);
Printer out(&output_stream, '$');
@ -133,7 +129,7 @@ std::string GetServices(const FileDescriptor *file) {
}
// Write out a file header.
std::map<std::string, std::string> header_comment_vars = ListToDict(
std::map<grpc::string, grpc::string> header_comment_vars = ListToDict(
{"file.name", file->name(), "file.package", file->package(), });
out.Print("# Generated by the protocol buffer compiler. DO NOT EDIT!\n");
out.Print(header_comment_vars,
@ -144,15 +140,15 @@ std::string GetServices(const FileDescriptor *file) {
// Write out require statemment to import the separately generated file
// that defines the messages used by the service. This is generated by the
// main ruby plugin.
std::map<std::string, std::string> dep_vars =
std::map<grpc::string, grpc::string> dep_vars =
ListToDict({"dep.name", MessagesRequireName(file), });
out.Print(dep_vars, "require '$dep.name$'\n");
// Write out services within the modules
out.Print("\n");
std::vector<std::string> modules = Split(file->package(), '.');
std::vector<grpc::string> modules = Split(file->package(), '.');
for (size_t i = 0; i < modules.size(); ++i) {
std::map<std::string, std::string> module_vars =
std::map<grpc::string, grpc::string> module_vars =
ListToDict({"module.name", CapitalizeFirst(modules[i]), });
out.Print(module_vars, "module $module.name$\n");
out.Indent();

@ -34,17 +34,11 @@
#ifndef GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_H
#define GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_H
#include <string>
namespace google {
namespace protobuf {
class FileDescriptor;
} // namespace protobuf
} // namespace google
#include "src/compiler/config.h"
namespace grpc_ruby_generator {
std::string GetServices(const google::protobuf::FileDescriptor *file);
grpc::string GetServices(const grpc::protobuf::FileDescriptor *file);
} // namespace grpc_ruby_generator

@ -34,15 +34,13 @@
#ifndef GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_HELPERS_INL_H
#define GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_HELPERS_INL_H
#include <string>
#include <google/protobuf/descriptor.h>
#include "src/compiler/config.h"
#include "src/compiler/ruby_generator_string-inl.h"
namespace grpc_ruby_generator {
inline bool ServicesFilename(const google::protobuf::FileDescriptor *file,
std::string *file_name_or_error) {
inline bool ServicesFilename(const grpc::protobuf::FileDescriptor *file,
grpc::string *file_name_or_error) {
// Get output file name.
static const unsigned proto_suffix_length = 6; // length of ".proto"
if (file->name().size() > proto_suffix_length &&
@ -57,8 +55,8 @@ inline bool ServicesFilename(const google::protobuf::FileDescriptor *file,
}
}
inline std::string MessagesRequireName(
const google::protobuf::FileDescriptor *file) {
inline grpc::string MessagesRequireName(
const grpc::protobuf::FileDescriptor *file) {
return Replace(file->name(), ".proto", "");
}

@ -34,11 +34,12 @@
#ifndef GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_MAP_INL_H
#define GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_MAP_INL_H
#include "src/compiler/config.h"
#include <iostream>
#include <initializer_list>
#include <map>
#include <ostream> // NOLINT
#include <string>
#include <vector>
using std::initializer_list;
@ -49,18 +50,18 @@ namespace grpc_ruby_generator {
// Converts an initializer list of the form { key0, value0, key1, value1, ... }
// into a map of key* to value*. Is merely a readability helper for later code.
inline std::map<std::string, std::string> ListToDict(
const initializer_list<std::string> &values) {
inline std::map<grpc::string, grpc::string> ListToDict(
const initializer_list<grpc::string> &values) {
if (values.size() % 2 != 0) {
std::cerr << "Not every 'key' has a value in `values`."
<< std::endl;
}
std::map<std::string, std::string> value_map;
std::map<grpc::string, grpc::string> value_map;
auto value_iter = values.begin();
for (unsigned i = 0; i < values.size() / 2; ++i) {
std::string key = *value_iter;
grpc::string key = *value_iter;
++value_iter;
std::string value = *value_iter;
grpc::string value = *value_iter;
value_map[key] = value;
++value_iter;
}

@ -34,8 +34,9 @@
#ifndef GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_STRING_INL_H
#define GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_STRING_INL_H
#include "src/compiler/config.h"
#include <algorithm>
#include <string>
#include <sstream>
#include <vector>
@ -45,10 +46,10 @@ using std::transform;
namespace grpc_ruby_generator {
// Split splits a string using char into elems.
inline std::vector<std::string> &Split(const std::string &s, char delim,
std::vector<std::string> *elems) {
inline std::vector<grpc::string> &Split(const grpc::string &s, char delim,
std::vector<grpc::string> *elems) {
std::stringstream ss(s);
std::string item;
grpc::string item;
while (getline(ss, item, delim)) {
elems->push_back(item);
}
@ -56,17 +57,17 @@ inline std::vector<std::string> &Split(const std::string &s, char delim,
}
// Split splits a string using char, returning the result in a vector.
inline std::vector<std::string> Split(const std::string &s, char delim) {
std::vector<std::string> elems;
inline std::vector<grpc::string> Split(const grpc::string &s, char delim) {
std::vector<grpc::string> elems;
Split(s, delim, &elems);
return elems;
}
// Replace replaces from with to in s.
inline std::string Replace(std::string s, const std::string &from,
const std::string &to) {
inline grpc::string Replace(grpc::string s, const grpc::string &from,
const grpc::string &to) {
size_t start_pos = s.find(from);
if (start_pos == std::string::npos) {
if (start_pos == grpc::string::npos) {
return s;
}
s.replace(start_pos, from.length(), to);
@ -74,10 +75,10 @@ inline std::string Replace(std::string s, const std::string &from,
}
// ReplaceAll replaces all instances of search with replace in s.
inline std::string ReplaceAll(std::string s, const std::string &search,
const std::string &replace) {
inline grpc::string ReplaceAll(grpc::string s, const grpc::string &search,
const grpc::string &replace) {
size_t pos = 0;
while ((pos = s.find(search, pos)) != std::string::npos) {
while ((pos = s.find(search, pos)) != grpc::string::npos) {
s.replace(pos, search.length(), replace);
pos += replace.length();
}
@ -85,10 +86,10 @@ inline std::string ReplaceAll(std::string s, const std::string &search,
}
// ReplacePrefix replaces from with to in s if search is a prefix of s.
inline bool ReplacePrefix(std::string *s, const std::string &from,
const std::string &to) {
inline bool ReplacePrefix(grpc::string *s, const grpc::string &from,
const grpc::string &to) {
size_t start_pos = s->find(from);
if (start_pos == std::string::npos || start_pos != 0) {
if (start_pos == grpc::string::npos || start_pos != 0) {
return false;
}
s->replace(start_pos, from.length(), to);
@ -96,7 +97,7 @@ inline bool ReplacePrefix(std::string *s, const std::string &from,
}
// CapitalizeFirst capitalizes the first char in a string.
inline std::string CapitalizeFirst(std::string s) {
inline grpc::string CapitalizeFirst(grpc::string s) {
if (s.empty()) {
return s;
}
@ -105,15 +106,15 @@ inline std::string CapitalizeFirst(std::string s) {
}
// RubyTypeOf updates a proto type to the required ruby equivalent.
inline std::string RubyTypeOf(const std::string &a_type,
const std::string &package) {
std::string res(a_type);
inline grpc::string RubyTypeOf(const grpc::string &a_type,
const grpc::string &package) {
grpc::string res(a_type);
ReplacePrefix(&res, package, ""); // remove the leading package if present
ReplacePrefix(&res, ".", ""); // remove the leading . (no package)
if (res.find('.') == std::string::npos) {
if (res.find('.') == grpc::string::npos) {
return res;
} else {
std::vector<std::string> prefixes_and_type = Split(res, '.');
std::vector<grpc::string> prefixes_and_type = Split(res, '.');
for (unsigned int i = 0; i < prefixes_and_type.size(); ++i) {
if (i != 0) {
res += "::"; // switch '.' to the ruby module delim

@ -32,43 +32,35 @@
*/
// Generates Ruby gRPC service interface out of Protobuf IDL.
//
// This is a Proto2 compiler plugin. See net/proto2/compiler/proto/plugin.proto
// and net/proto2/compiler/public/plugin.h for more information on plugins.
#include <memory>
#include <string>
#include "src/compiler/config.h"
#include "src/compiler/ruby_generator.h"
#include "src/compiler/ruby_generator_helpers-inl.h"
#include <google/protobuf/compiler/code_generator.h>
#include <google/protobuf/compiler/plugin.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/descriptor.h>
class RubyGrpcGenerator : public google::protobuf::compiler::CodeGenerator {
class RubyGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
public:
RubyGrpcGenerator() {}
~RubyGrpcGenerator() {}
bool Generate(const google::protobuf::FileDescriptor *file,
const std::string &parameter,
google::protobuf::compiler::GeneratorContext *context,
std::string *error) const {
std::string code = grpc_ruby_generator::GetServices(file);
bool Generate(const grpc::protobuf::FileDescriptor *file,
const grpc::string &parameter,
grpc::protobuf::compiler::GeneratorContext *context,
grpc::string *error) const {
grpc::string code = grpc_ruby_generator::GetServices(file);
if (code.size() == 0) {
return true; // don't generate a file if there are no services
}
// Get output file name.
std::string file_name;
grpc::string file_name;
if (!grpc_ruby_generator::ServicesFilename(file, &file_name)) {
return false;
}
std::unique_ptr<google::protobuf::io::ZeroCopyOutputStream> output(
std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
context->Open(file_name));
google::protobuf::io::CodedOutputStream coded_out(output.get());
grpc::protobuf::io::CodedOutputStream coded_out(output.get());
coded_out.WriteRaw(code.data(), code.size());
return true;
}
@ -76,5 +68,5 @@ class RubyGrpcGenerator : public google::protobuf::compiler::CodeGenerator {
int main(int argc, char *argv[]) {
RubyGrpcGenerator generator;
return google::protobuf::compiler::PluginMain(argc, argv, &generator);
return grpc::protobuf::compiler::PluginMain(argc, argv, &generator);
}

@ -52,10 +52,11 @@ static OVERLAPPED g_iocp_custom_overlap;
static gpr_event g_shutdown_iocp;
static gpr_event g_iocp_done;
static gpr_atm g_orphans = 0;
static HANDLE g_iocp;
static int do_iocp_work() {
static void do_iocp_work() {
BOOL success;
DWORD bytes = 0;
DWORD flags = 0;
@ -71,14 +72,14 @@ static int do_iocp_work() {
gpr_time_to_millis(wait_time));
if (!success && !overlapped) {
/* The deadline got attained. */
return 0;
return;
}
GPR_ASSERT(completion_key && overlapped);
if (overlapped == &g_iocp_custom_overlap) {
if (completion_key == (ULONG_PTR) &g_iocp_kick_token) {
/* We were awoken from a kick. */
gpr_log(GPR_DEBUG, "do_iocp_work - got a kick");
return 1;
return;
}
gpr_log(GPR_ERROR, "Unknown custom completion key.");
abort();
@ -97,8 +98,13 @@ static int do_iocp_work() {
}
success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
FALSE, &flags);
gpr_log(GPR_DEBUG, "bytes: %u, flags: %u - op %s", bytes, flags,
success ? "succeeded" : "failed");
gpr_log(GPR_DEBUG, "bytes: %u, flags: %u - op %s %s", bytes, flags,
success ? "succeeded" : "failed", socket->orphan ? "orphan" : "");
if (socket->orphan) {
grpc_winsocket_destroy(socket);
gpr_atm_full_fetch_add(&g_orphans, -1);
return;
}
info->bytes_transfered = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
GPR_ASSERT(overlapped == &info->overlapped);
@ -113,12 +119,10 @@ static int do_iocp_work() {
}
gpr_mu_unlock(&socket->state_mu);
if (f) f(opaque, 1);
return 1;
}
static void iocp_loop(void *p) {
while (!gpr_event_get(&g_shutdown_iocp)) {
while (gpr_atm_acq_load(&g_orphans) || !gpr_event_get(&g_shutdown_iocp)) {
grpc_maybe_call_delayed_callbacks(NULL, 1);
do_iocp_work();
}
@ -138,13 +142,19 @@ void grpc_iocp_init(void) {
gpr_thd_new(&id, iocp_loop, NULL, NULL);
}
void grpc_iocp_shutdown(void) {
void grpc_iocp_kick(void) {
BOOL success;
gpr_event_set(&g_shutdown_iocp, (void *)1);
success = PostQueuedCompletionStatus(g_iocp, 0,
(ULONG_PTR) &g_iocp_kick_token,
&g_iocp_custom_overlap);
GPR_ASSERT(success);
}
void grpc_iocp_shutdown(void) {
BOOL success;
gpr_event_set(&g_shutdown_iocp, (void *)1);
grpc_iocp_kick();
gpr_event_wait(&g_iocp_done, gpr_inf_future);
success = CloseHandle(g_iocp);
GPR_ASSERT(success);
@ -166,6 +176,10 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
GPR_ASSERT(ret == g_iocp);
}
void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
gpr_atm_full_fetch_add(&g_orphans, 1);
}
static void socket_notify_on_iocp(grpc_winsocket *socket,
void(*cb)(void *, int), void *opaque,
grpc_winsocket_callback_info *info) {

@ -42,6 +42,7 @@
void grpc_iocp_init(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
void grpc_iocp_socket_orphan(grpc_winsocket *);
void grpc_socket_notify_on_write(grpc_winsocket *, void(*cb)(void *, int success),
void *opaque);

@ -117,7 +117,16 @@ void grpc_iomgr_shutdown(void) {
gpr_mu_lock(&g_mu);
}
if (g_refs) {
if (gpr_cv_wait(&g_rcv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) {
int timeout = 0;
gpr_timespec short_deadline = gpr_time_add(gpr_now(),
gpr_time_from_millis(100));
while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
if (gpr_time_cmp(gpr_now(), shutdown_deadline) > 0) {
timeout = 1;
break;
}
}
if (timeout) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",

@ -55,7 +55,7 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
return r;
}
void shutdown_op(grpc_winsocket_callback_info *info) {
static void shutdown_op(grpc_winsocket_callback_info *info) {
if (!info->cb) return;
grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0);
}
@ -68,8 +68,13 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) {
void grpc_winsocket_orphan(grpc_winsocket *socket) {
gpr_log(GPR_DEBUG, "grpc_winsocket_orphan");
grpc_iocp_socket_orphan(socket);
socket->orphan = 1;
grpc_iomgr_unref();
closesocket(socket->socket);
}
void grpc_winsocket_destroy(grpc_winsocket *socket) {
gpr_mu_destroy(&socket->state_mu);
gpr_free(socket);
}

@ -57,12 +57,13 @@ typedef struct grpc_winsocket_callback_info {
typedef struct grpc_winsocket {
SOCKET socket;
int added_to_iocp;
grpc_winsocket_callback_info write_info;
grpc_winsocket_callback_info read_info;
gpr_mu state_mu;
int added_to_iocp;
int orphan;
} grpc_winsocket;
/* Create a wrapped windows handle.
@ -71,5 +72,6 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket);
void grpc_winsocket_shutdown(grpc_winsocket *socket);
void grpc_winsocket_orphan(grpc_winsocket *socket);
void grpc_winsocket_destroy(grpc_winsocket *socket);
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */

@ -567,7 +567,8 @@ static tsi_result populate_ssl_context(
EC_KEY* ecdh = EC_KEY_new_by_curve_name(NID_X9_62_prime256v1);
if (!SSL_CTX_set_tmp_ecdh(context, ecdh)) {
gpr_log(GPR_ERROR, "Could not set ephemeral ECDH key.");
result = TSI_INTERNAL_ERROR;
EC_KEY_free(ecdh);
return TSI_INTERNAL_ERROR;
}
SSL_CTX_set_options(context, SSL_OP_SINGLE_ECDH_USE);
EC_KEY_free(ecdh);
@ -604,6 +605,7 @@ static tsi_result build_alpn_protocol_name_list(
unsigned char* current;
*protocol_name_list = NULL;
*protocol_name_list_length = 0;
if (num_alpn_protocols == 0) return TSI_INVALID_ARGUMENT;
for (i = 0; i < num_alpn_protocols; i++) {
if (alpn_protocols_lengths[i] == 0) {
gpr_log(GPR_ERROR, "Invalid 0-length protocol name.");

@ -31,8 +31,6 @@
*
*/
#include <string>
#include <grpc/grpc.h>
#include <grpc/support/log.h>

@ -31,8 +31,6 @@
*
*/
#include <string>
#include <grpc/grpc_security.h>
#include <grpc/support/log.h>

@ -148,7 +148,7 @@ void FillMetadataMap(grpc_metadata_array* arr,
// TODO(yangg) handle duplicates?
metadata->insert(std::pair<grpc::string, grpc::string>(
arr->metadata[i].key,
{arr->metadata[i].value, arr->metadata[i].value_length}));
grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
}
grpc_metadata_array_destroy(arr);
grpc_metadata_array_init(arr);

@ -59,9 +59,12 @@ class SecureServerCredentials GRPC_FINAL : public ServerCredentials {
std::shared_ptr<ServerCredentials> SslServerCredentials(
const SslServerCredentialsOptions& options) {
std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs;
for (const auto& key_cert_pair : options.pem_key_cert_pairs) {
pem_key_cert_pairs.push_back(
{key_cert_pair.private_key.c_str(), key_cert_pair.cert_chain.c_str()});
for (auto key_cert_pair = options.pem_key_cert_pairs.begin();
key_cert_pair != options.pem_key_cert_pairs.end();
key_cert_pair++) {
grpc_ssl_pem_key_cert_pair p = {key_cert_pair->private_key.c_str(),
key_cert_pair->cert_chain.c_str()};
pem_key_cert_pairs.push_back(p);
}
grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create(
options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(),

@ -247,8 +247,8 @@ bool Server::Start() {
// Start processing rpcs.
if (!sync_methods_.empty()) {
for (auto& m : sync_methods_) {
m.Request(server_);
for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
m->Request(server_);
}
ScheduleCallback();

@ -86,24 +86,26 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_owned = true;
}
std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned));
for (auto* service : services_) {
if (!server->RegisterService(service)) {
for (auto service = services_.begin(); service != services_.end();
service++) {
if (!server->RegisterService(*service)) {
return nullptr;
}
}
for (auto* service : async_services_) {
if (!server->RegisterAsyncService(service)) {
for (auto service = async_services_.begin();
service != async_services_.end(); service++) {
if (!server->RegisterAsyncService(*service)) {
return nullptr;
}
}
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
}
for (auto& port : ports_) {
int r = server->AddListeningPort(port.addr, port.creds.get());
for (auto port = ports_.begin(); port != ports_.end(); port++) {
int r = server->AddListeningPort(port->addr, port->creds.get());
if (!r) return nullptr;
if (port.selected_port != nullptr) {
*port.selected_port = r;
if (port->selected_port != nullptr) {
*port->selected_port = r;
}
}
if (!server->Start()) {

@ -66,8 +66,8 @@ ThreadPool::~ThreadPool() {
shutdown_ = true;
cv_.notify_all();
}
for (auto& t : threads_) {
t.join();
for (auto t = threads_.begin(); t != threads_.end(); t++) {
t->join();
}
}

@ -0,0 +1,136 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/**
* This script runs a QPS test. It sends requests for a specified length of time
* with a specified number pending at any one time. It then outputs the measured
* QPS. Usage:
* node qps_test.js [--concurrent=count] [--time=seconds]
* concurrent defaults to 100 and time defaults to 10
*/
'use strict';
var async = require('async');
var parseArgs = require('minimist');
var grpc = require('..');
var testProto = grpc.load(__dirname + '/../interop/test.proto').grpc.testing;
var interop_server = require('../interop/interop_server.js');
/**
* Runs the QPS test. Sends requests constantly for the given number of seconds,
* and keeps concurrent_calls requests pending at all times. When the test ends,
* the callback is called with the number of calls that completed within the
* time limit.
* @param {number} concurrent_calls The number of calls to have pending
* simultaneously
* @param {number} seconds The number of seconds to run the test for
* @param {function(Error, number)} callback Callback for test completion
*/
function runTest(concurrent_calls, seconds, callback) {
var testServer = interop_server.getServer(0, false);
testServer.server.listen();
var client = new testProto.TestService('localhost:' + testServer.port);
var warmup_num = 100;
/**
* Warms up the client to avoid counting startup time in the test result
* @param {function(Error)} callback Called when warmup is complete
*/
function warmUp(callback) {
var pending = warmup_num;
function startCall() {
client.emptyCall({}, function(err, resp) {
if (err) {
callback(err);
return;
}
pending--;
if (pending === 0) {
callback(null);
}
});
}
for (var i = 0; i < warmup_num; i++) {
startCall();
}
}
/**
* Run the QPS test. Starts concurrent_calls requests, then starts a new
* request whenever one completes until time runs out.
* @param {function(Error, number)} callback Called when the test is complete.
* The second argument is the number of calls that finished within the
* time limit
*/
function run(callback) {
var running = 0;
var count = 0;
var start = process.hrtime();
function responseCallback(err, resp) {
if (process.hrtime(start)[0] < seconds) {
count += 1;
client.emptyCall({}, responseCallback);
} else {
running -= 1;
if (running <= 0) {
callback(null, count);
}
}
}
for (var i = 0; i < concurrent_calls; i++) {
running += 1;
client.emptyCall({}, responseCallback);
}
}
async.waterfall([warmUp, run], function(err, count) {
testServer.server.shutdown();
callback(err, count);
});
}
if (require.main === module) {
var argv = parseArgs(process.argv.slice(2), {
default: {'concurrent': 100,
'time': 10}
});
runTest(argv.concurrent, argv.time, function(err, count) {
if (err) {
throw err;
}
console.log('Concurrent calls:', argv.concurrent);
console.log('Time:', argv.time, 'seconds');
console.log('QPS:', (count/argv.time));
});
}

@ -56,7 +56,7 @@ function loadObject(value) {
});
return result;
} else if (value.className === 'Service') {
return client.makeClientConstructor(value);
return client.makeProtobufClientConstructor(value);
} else if (value.className === 'Message' || value.className === 'Enum') {
return value.build();
} else {
@ -119,7 +119,7 @@ exports.load = load;
/**
* See docs for server.makeServerConstructor
*/
exports.buildServer = server.makeServerConstructor;
exports.buildServer = server.makeProtobufServerConstructor;
/**
* Status name to code number mapping
@ -141,3 +141,7 @@ exports.Credentials = grpc.Credentials;
exports.ServerCredentials = grpc.ServerCredentials;
exports.getGoogleAuthDelegate = getGoogleAuthDelegate;
exports.makeGenericClientConstructor = client.makeClientConstructor;
exports.makeGenericServerConstructor = server.makeServerConstructor;

@ -35,9 +35,6 @@
var _ = require('underscore');
var capitalize = require('underscore.string/capitalize');
var decapitalize = require('underscore.string/decapitalize');
var grpc = require('bindings')('grpc.node');
var common = require('./common.js');
@ -463,13 +460,18 @@ var requester_makers = {
};
/**
* Creates a constructor for clients for the given service
* @param {ProtoBuf.Reflect.Service} service The service to generate a client
* for
* Creates a constructor for a client with the given methods. The methods object
* maps method name to an object with the following keys:
* path: The path on the server for accessing the method. For example, for
* protocol buffers, we use "/service_name/method_name"
* requestStream: bool indicating whether the client sends a stream
* resonseStream: bool indicating whether the server sends a stream
* requestSerialize: function to serialize request objects
* responseDeserialize: function to deserialize response objects
* @param {Object} methods An object mapping method names to method attributes
* @return {function(string, Object)} New client constructor
*/
function makeClientConstructor(service) {
var prefix = '/' + common.fullyQualifiedName(service) + '/';
function makeClientConstructor(methods) {
/**
* Create a client with the given methods
* @constructor
@ -489,30 +491,41 @@ function makeClientConstructor(service) {
this.channel = new grpc.Channel(address, options);
}
_.each(service.children, function(method) {
_.each(methods, function(attrs, name) {
var method_type;
if (method.requestStream) {
if (method.responseStream) {
if (attrs.requestStream) {
if (attrs.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (method.responseStream) {
if (attrs.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
var serialize = common.serializeCls(method.resolvedRequestType.build());
var deserialize = common.deserializeCls(
method.resolvedResponseType.build());
Client.prototype[decapitalize(method.name)] = requester_makers[method_type](
prefix + capitalize(method.name), serialize, deserialize);
Client.prototype[decapitalize(method.name)].serialize = serialize;
Client.prototype[decapitalize(method.name)].deserialize = deserialize;
var serialize = attrs.requestSerialize;
var deserialize = attrs.responseDeserialize;
Client.prototype[name] = requester_makers[method_type](
attrs.path, serialize, deserialize);
Client.prototype[name].serialize = serialize;
Client.prototype[name].deserialize = deserialize;
});
return Client;
}
/**
* Creates a constructor for clients for the given service
* @param {ProtoBuf.Reflect.Service} service The service to generate a client
* for
* @return {function(string, Object)} New client constructor
*/
function makeProtobufClientConstructor(service) {
var method_attrs = common.getProtobufServiceAttrs(service);
var Client = makeClientConstructor(method_attrs);
Client.service = service;
return Client;
@ -520,6 +533,8 @@ function makeClientConstructor(service) {
exports.makeClientConstructor = makeClientConstructor;
exports.makeProtobufClientConstructor = makeProtobufClientConstructor;
/**
* See docs for client.status
*/

@ -36,6 +36,7 @@
var _ = require('underscore');
var capitalize = require('underscore.string/capitalize');
var decapitalize = require('underscore.string/decapitalize');
/**
* Get a function that deserializes a specific type of protobuf.
@ -109,6 +110,26 @@ function wrapIgnoreNull(func) {
};
}
/**
* Return a map from method names to method attributes for the service.
* @param {ProtoBuf.Reflect.Service} service The service to get attributes for
* @return {Object} The attributes map
*/
function getProtobufServiceAttrs(service) {
var prefix = '/' + fullyQualifiedName(service) + '/';
return _.object(_.map(service.children, function(method) {
return [decapitalize(method.name), {
path: prefix + capitalize(method.name),
requestStream: method.requestStream,
responseStream: method.responseStream,
requestSerialize: serializeCls(method.resolvedRequestType.build()),
requestDeserialize: deserializeCls(method.resolvedRequestType.build()),
responseSerialize: serializeCls(method.resolvedResponseType.build()),
responseDeserialize: deserializeCls(method.resolvedResponseType.build())
}];
}));
}
/**
* See docs for deserializeCls
*/
@ -128,3 +149,5 @@ exports.fullyQualifiedName = fullyQualifiedName;
* See docs for wrapIgnoreNull
*/
exports.wrapIgnoreNull = wrapIgnoreNull;
exports.getProtobufServiceAttrs = getProtobufServiceAttrs;

@ -35,9 +35,6 @@
var _ = require('underscore');
var capitalize = require('underscore.string/capitalize');
var decapitalize = require('underscore.string/decapitalize');
var grpc = require('bindings')('grpc.node');
var common = require('./common');
@ -532,26 +529,20 @@ Server.prototype.bind = function(port, creds) {
};
/**
* Creates a constructor for servers with a service defined by the methods
* object. The methods object has string keys and values of this form:
* {serialize: function, deserialize: function, client_stream: bool,
* server_stream: bool}
* @param {Object} methods Method descriptor for each method the server should
* expose
* @param {string} prefix The prefex to prepend to each method name
* @return {function(Object, Object)} New server constructor
* Create a constructor for servers with services defined by service_attr_map.
* That is an object that maps (namespaced) service names to objects that in
* turn map method names to objects with the following keys:
* path: The path on the server for accessing the method. For example, for
* protocol buffers, we use "/service_name/method_name"
* requestStream: bool indicating whether the client sends a stream
* resonseStream: bool indicating whether the server sends a stream
* requestDeserialize: function to deserialize request objects
* responseSerialize: function to serialize response objects
* @param {Object} service_attr_map An object mapping service names to method
* attribute map objects
* @return {function(Object, function, Object=)} New server constructor
*/
function makeServerConstructor(services) {
var qual_names = [];
_.each(services, function(service) {
_.each(service.children, function(method) {
var name = common.fullyQualifiedName(method);
if (_.indexOf(qual_names, name) !== -1) {
throw new Error('Method ' + name + ' exposed by more than one service');
}
qual_names.push(name);
});
});
function makeServerConstructor(service_attr_map) {
/**
* Create a server with the given handlers for all of the methods.
* @constructor
@ -565,41 +556,34 @@ function makeServerConstructor(services) {
function SurfaceServer(service_handlers, getMetadata, options) {
var server = new Server(getMetadata, options);
this.inner_server = server;
_.each(services, function(service) {
var service_name = common.fullyQualifiedName(service);
_.each(service_attr_map, function(service_attrs, service_name) {
if (service_handlers[service_name] === undefined) {
throw new Error('Handlers for service ' +
service_name + ' not provided.');
}
var prefix = '/' + common.fullyQualifiedName(service) + '/';
_.each(service.children, function(method) {
_.each(service_attrs, function(attrs, name) {
var method_type;
if (method.requestStream) {
if (method.responseStream) {
if (attrs.requestStream) {
if (attrs.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
if (method.responseStream) {
if (attrs.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
if (service_handlers[service_name][decapitalize(method.name)] ===
undefined) {
throw new Error('Method handler for ' +
common.fullyQualifiedName(method) + ' not provided.');
if (service_handlers[service_name][name] === undefined) {
throw new Error('Method handler for ' + attrs.path +
' not provided.');
}
var serialize = common.serializeCls(
method.resolvedResponseType.build());
var deserialize = common.deserializeCls(
method.resolvedRequestType.build());
server.register(
prefix + capitalize(method.name),
service_handlers[service_name][decapitalize(method.name)],
serialize, deserialize, method_type);
var serialize = attrs.responseSerialize;
var deserialize = attrs.requestDeserialize;
server.register(attrs.path, service_handlers[service_name][name],
serialize, deserialize, method_type);
});
}, this);
}
@ -635,7 +619,40 @@ function makeServerConstructor(services) {
return SurfaceServer;
}
/**
* Create a constructor for servers that serve the given services.
* @param {Array<ProtoBuf.Reflect.Service>} services The services that the
* servers will serve
* @return {function(Object, function, Object=)} New server constructor
*/
function makeProtobufServerConstructor(services) {
var qual_names = [];
var service_attr_map = {};
_.each(services, function(service) {
var service_name = common.fullyQualifiedName(service);
_.each(service.children, function(method) {
var name = common.fullyQualifiedName(method);
if (_.indexOf(qual_names, name) !== -1) {
throw new Error('Method ' + name + ' exposed by more than one service');
}
qual_names.push(name);
});
var method_attrs = common.getProtobufServiceAttrs(service);
if (!service_attr_map.hasOwnProperty(service_name)) {
service_attr_map[service_name] = {};
}
service_attr_map[service_name] = _.extend(service_attr_map[service_name],
method_attrs);
});
return makeServerConstructor(service_attr_map);
}
/**
* See documentation for makeServerConstructor
*/
exports.makeServerConstructor = makeServerConstructor;
/**
* See documentation for makeProtobufServerConstructor
*/
exports.makeProtobufServerConstructor = makeProtobufServerConstructor;

@ -45,6 +45,8 @@ var math_proto = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
var mathService = math_proto.lookup('math.Math');
var capitalize = require('underscore.string/capitalize');
describe('Surface server constructor', function() {
it('Should fail with conflicting method names', function() {
assert.throws(function() {
@ -75,6 +77,55 @@ describe('Surface server constructor', function() {
}, /math.Math/);
});
});
describe('Generic client and server', function() {
function toString(val) {
return val.toString();
}
function toBuffer(str) {
return new Buffer(str);
}
var string_service_attrs = {
'capitalize' : {
path: '/string/capitalize',
requestStream: false,
responseStream: false,
requestSerialize: toBuffer,
requestDeserialize: toString,
responseSerialize: toBuffer,
responseDeserialize: toString
}
};
describe('String client and server', function() {
var client;
var server;
before(function() {
var Server = grpc.makeGenericServerConstructor({
string: string_service_attrs
});
server = new Server({
string: {
capitalize: function(call, callback) {
callback(null, capitalize(call.request));
}
}
});
var port = server.bind('localhost:0');
server.listen();
var Client = grpc.makeGenericClientConstructor(string_service_attrs);
client = new Client('localhost:' + port);
});
after(function() {
server.shutdown();
});
it('Should respond with a capitalized string', function(done) {
client.capitalize('abc', function(err, response) {
assert.ifError(err);
assert.strictEqual(response, 'Abc');
done();
});
});
});
});
describe('Cancelling surface client', function() {
var client;
var server;
@ -89,7 +140,7 @@ describe('Cancelling surface client', function() {
}
});
var port = server.bind('localhost:0');
var Client = surface_client.makeClientConstructor(mathService);
var Client = surface_client.makeProtobufClientConstructor(mathService);
client = new Client('localhost:' + port);
});
after(function() {

@ -49,11 +49,11 @@
#include <stdbool.h>
#include "grpc/support/log.h"
#include "grpc/support/alloc.h"
#include "grpc/grpc.h"
#include "timeval.h"
#include "channel.h"
#include "completion_queue.h"
#include "byte_buffer.h"
zend_class_entry *grpc_ce_call;
@ -61,7 +61,19 @@ zend_class_entry *grpc_ce_call;
/* Frees and destroys an instance of wrapped_grpc_call */
void free_wrapped_grpc_call(void *object TSRMLS_DC) {
wrapped_grpc_call *call = (wrapped_grpc_call *)object;
grpc_event *event;
if (call->owned && call->wrapped != NULL) {
if (call->queue != NULL) {
grpc_completion_queue_shutdown(call->queue);
event = grpc_completion_queue_next(call->queue, gpr_inf_future);
while (event != NULL) {
if (event->type == GRPC_QUEUE_SHUTDOWN) {
break;
}
event = grpc_completion_queue_next(call->queue, gpr_inf_future);
}
grpc_completion_queue_destroy(call->queue);
}
grpc_call_destroy(call->wrapped);
}
efree(call);
@ -88,17 +100,23 @@ zend_object_value create_wrapped_grpc_call(zend_class_entry *class_type
/* Wraps a grpc_call struct in a PHP object. Owned indicates whether the struct
should be destroyed at the end of the object's lifecycle */
zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned) {
zval *grpc_php_wrap_call(grpc_call *wrapped, grpc_completion_queue *queue,
bool owned) {
zval *call_object;
MAKE_STD_ZVAL(call_object);
object_init_ex(call_object, grpc_ce_call);
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(call_object TSRMLS_CC);
call->wrapped = wrapped;
call->queue = queue;
return call_object;
}
zval *grpc_call_create_metadata_array(int count, grpc_metadata *elements) {
/* Creates and returns a PHP array object with the data in a
* grpc_metadata_array. Returns NULL on failure */
zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array) {
int count = metadata_array->count;
grpc_metadata *elements = metadata_array->metadata;
int i;
zval *array;
zval **data = NULL;
@ -139,6 +157,64 @@ zval *grpc_call_create_metadata_array(int count, grpc_metadata *elements) {
return array;
}
/* Populates a grpc_metadata_array with the data in a PHP array object.
Returns true on success and false on failure */
bool create_metadata_array(zval *array, grpc_metadata_array *metadata) {
zval **inner_array;
zval **value;
HashTable *array_hash;
HashPosition array_pointer;
HashTable *inner_array_hash;
HashPosition inner_array_pointer;
char *key;
uint key_len;
ulong index;
if (Z_TYPE_P(array) != IS_ARRAY) {
return false;
}
grpc_metadata_array_init(metadata);
array_hash = Z_ARRVAL_P(array);
for (zend_hash_internal_pointer_reset_ex(array_hash, &array_pointer);
zend_hash_get_current_data_ex(array_hash, (void**)&inner_array,
&array_pointer) == SUCCESS;
zend_hash_move_forward_ex(array_hash, &array_pointer)) {
if (zend_hash_get_current_key_ex(array_hash, &key, &key_len, &index, 0,
&array_pointer) != HASH_KEY_IS_STRING) {
return false;
}
if (Z_TYPE_P(*inner_array) != IS_ARRAY) {
return false;
}
inner_array_hash = Z_ARRVAL_P(*inner_array);
metadata->capacity += zend_hash_num_elements(inner_array_hash);
}
metadata->metadata = gpr_malloc(metadata->capacity * sizeof(grpc_metadata));
for (zend_hash_internal_pointer_reset_ex(array_hash, &array_pointer);
zend_hash_get_current_data_ex(array_hash, (void**)&inner_array,
&array_pointer) == SUCCESS;
zend_hash_move_forward_ex(array_hash, &array_pointer)) {
if (zend_hash_get_current_key_ex(array_hash, &key, &key_len, &index, 0,
&array_pointer) != HASH_KEY_IS_STRING) {
return false;
}
inner_array_hash = Z_ARRVAL_P(*inner_array);
for (zend_hash_internal_pointer_reset_ex(inner_array_hash,
&inner_array_pointer);
zend_hash_get_current_data_ex(inner_array_hash, (void**)&value,
&inner_array_pointer) == SUCCESS;
zend_hash_move_forward_ex(inner_array_hash, &inner_array_pointer)) {
if (Z_TYPE_P(*value) != IS_STRING) {
return false;
}
metadata->metadata[metadata->count].key = key;
metadata->metadata[metadata->count].value = Z_STRVAL_P(*value);
metadata->metadata[metadata->count].value_length = Z_STRLEN_P(*value);
metadata->count += 1;
}
}
return true;
}
/**
* Constructs a new instance of the Call class.
* @param Channel $channel The channel to associate the call with. Must not be
@ -157,9 +233,10 @@ PHP_METHOD(Call, __construct) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO", &channel_obj,
grpc_ce_channel, &method, &method_len,
&deadline_obj, grpc_ce_timeval) == FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Call expects a Channel, a String, and a Timeval",
1 TSRMLS_CC);
zend_throw_exception(
spl_ce_InvalidArgumentException,
"Call expects a Channel, a String, and a Timeval",
1 TSRMLS_CC);
return;
}
wrapped_grpc_channel *channel =
@ -175,289 +252,235 @@ PHP_METHOD(Call, __construct) {
wrapped_grpc_timeval *deadline =
(wrapped_grpc_timeval *)zend_object_store_get_object(
deadline_obj TSRMLS_CC);
call->wrapped = grpc_channel_create_call_old(
channel->wrapped, method, channel->target, deadline->wrapped);
call->queue = grpc_completion_queue_create();
call->wrapped = grpc_channel_create_call(
channel->wrapped, call->queue, method, channel->target,
deadline->wrapped);
}
/**
* Add metadata to the call. All array keys must be strings. If the value is a
* string, it is added as a key/value pair. If it is an array, each value is
* added paired with the same string
* @param array $metadata The metadata to add
* @param long $flags A bitwise combination of the Grpc\WRITE_* constants
* (optional)
* @return Void
* Start a batch of RPC actions.
* @param array batch Array of actions to take
* @return object Object with results of all actions
*/
PHP_METHOD(Call, add_metadata) {
PHP_METHOD(Call, start_batch) {
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
grpc_metadata metadata;
grpc_call_error error_code;
grpc_op ops[8];
size_t op_num = 0;
zval *array;
zval **inner_array;
zval **value;
zval **inner_value;
HashTable *array_hash;
HashPosition array_pointer;
HashTable *inner_array_hash;
HashPosition inner_array_pointer;
HashTable *status_hash;
char *key;
uint key_len;
ulong index;
long flags = 0;
/* "a|l" == 1 array, 1 optional long */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a|l", &array, &flags) ==
grpc_metadata_array metadata;
grpc_metadata_array trailing_metadata;
grpc_metadata_array recv_metadata;
grpc_metadata_array recv_trailing_metadata;
grpc_status_code status;
char *status_details = NULL;
size_t status_details_capacity = 0;
grpc_byte_buffer *message;
int cancelled;
grpc_call_error error;
grpc_event *event;
zval *result;
char *message_str;
size_t message_len;
zval *recv_status;
grpc_metadata_array_init(&metadata);
grpc_metadata_array_init(&trailing_metadata);
grpc_metadata_array_init(&recv_metadata);
grpc_metadata_array_init(&recv_trailing_metadata);
MAKE_STD_ZVAL(result);
object_init(result);
/* "a" == 1 array */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a", &array) ==
FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"add_metadata expects an array and an optional long",
1 TSRMLS_CC);
return;
"start_batch expects an array", 1 TSRMLS_CC);
goto cleanup;
}
array_hash = Z_ARRVAL_P(array);
for (zend_hash_internal_pointer_reset_ex(array_hash, &array_pointer);
zend_hash_get_current_data_ex(array_hash, (void**)&inner_array,
zend_hash_get_current_data_ex(array_hash, (void**)&value,
&array_pointer) == SUCCESS;
zend_hash_move_forward_ex(array_hash, &array_pointer)) {
if (zend_hash_get_current_key_ex(array_hash, &key, &key_len, &index, 0,
&array_pointer) != HASH_KEY_IS_STRING) {
&array_pointer) != HASH_KEY_IS_LONG) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"metadata keys must be strings", 1 TSRMLS_CC);
return;
"batch keys must be integers", 1 TSRMLS_CC);
goto cleanup;
}
if (Z_TYPE_P(*inner_array) != IS_ARRAY) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"metadata values must be arrays",
1 TSRMLS_CC);
return;
}
inner_array_hash = Z_ARRVAL_P(*inner_array);
for (zend_hash_internal_pointer_reset_ex(inner_array_hash,
&inner_array_pointer);
zend_hash_get_current_data_ex(inner_array_hash, (void**)&value,
&inner_array_pointer) == SUCCESS;
zend_hash_move_forward_ex(inner_array_hash, &inner_array_pointer)) {
if (Z_TYPE_P(*value) != IS_STRING) {
switch(index) {
case GRPC_OP_SEND_INITIAL_METADATA:
if (!create_metadata_array(*value, &metadata)) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Bad metadata value given", 1 TSRMLS_CC);
goto cleanup;
}
ops[op_num].data.send_initial_metadata.count =
metadata.count;
ops[op_num].data.send_initial_metadata.metadata =
metadata.metadata;
break;
case GRPC_OP_SEND_MESSAGE:
if (Z_TYPE_PP(value) != IS_STRING) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Expected a string for send message",
1 TSRMLS_CC);
}
ops[op_num].data.send_message =
string_to_byte_buffer(Z_STRVAL_PP(value), Z_STRLEN_PP(value));
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
status_hash = Z_ARRVAL_PP(value);
if (zend_hash_find(status_hash, "metadata", sizeof("metadata"),
(void **)&inner_value) == SUCCESS) {
if (!create_metadata_array(*inner_value, &trailing_metadata)) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Bad trailing metadata value given",
1 TSRMLS_CC);
goto cleanup;
}
ops[op_num].data.send_status_from_server.trailing_metadata =
trailing_metadata.metadata;
ops[op_num].data.send_status_from_server.trailing_metadata_count =
trailing_metadata.count;
}
if (zend_hash_find(status_hash, "code", sizeof("code"),
(void**)&inner_value) == SUCCESS) {
if (Z_TYPE_PP(inner_value) != IS_LONG) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Status code must be an integer",
1 TSRMLS_CC);
goto cleanup;
}
ops[op_num].data.send_status_from_server.status =
Z_LVAL_PP(inner_value);
} else {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Integer status code is required",
1 TSRMLS_CC);
goto cleanup;
}
if (zend_hash_find(status_hash, "details", sizeof("details"),
(void**)&inner_value) == SUCCESS) {
if (Z_TYPE_PP(inner_value) != IS_STRING) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Status details must be a string",
1 TSRMLS_CC);
goto cleanup;
}
ops[op_num].data.send_status_from_server.status_details =
Z_STRVAL_PP(inner_value);
} else {
zend_throw_exception(spl_ce_InvalidArgumentException,
"String status details is required",
1 TSRMLS_CC);
goto cleanup;
}
break;
case GRPC_OP_RECV_INITIAL_METADATA:
ops[op_num].data.recv_initial_metadata = &recv_metadata;
break;
case GRPC_OP_RECV_MESSAGE:
ops[op_num].data.recv_message = &message;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
ops[op_num].data.recv_status_on_client.trailing_metadata =
&recv_trailing_metadata;
ops[op_num].data.recv_status_on_client.status = &status;
ops[op_num].data.recv_status_on_client.status_details =
&status_details;
ops[op_num].data.recv_status_on_client.status_details_capacity =
&status_details_capacity;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
ops[op_num].data.recv_close_on_server.cancelled = &cancelled;
break;
default:
zend_throw_exception(spl_ce_InvalidArgumentException,
"metadata values must be arrays of strings",
1 TSRMLS_CC);
return;
}
metadata.key = key;
metadata.value = Z_STRVAL_P(*value);
metadata.value_length = Z_STRLEN_P(*value);
error_code = grpc_call_add_metadata_old(call->wrapped, &metadata, 0u);
MAYBE_THROW_CALL_ERROR(add_metadata, error_code);
"Unrecognized key in batch", 1 TSRMLS_CC);
goto cleanup;
}
ops[op_num].op = (grpc_op_type)index;
op_num++;
}
}
/**
* Invoke the RPC. Starts sending metadata and request headers over the wire
* @param CompletionQueue $queue The completion queue to use with this call
* @param long $metadata_tag The tag to associate with returned metadata
* @param long $finished_tag The tag to associate with the finished event
* @param long $flags A bitwise combination of the Grpc\WRITE_* constants
* (optional)
* @return Void
*/
PHP_METHOD(Call, invoke) {
grpc_call_error error_code;
long tag1;
long tag2;
zval *queue_obj;
long flags = 0;
/* "Oll|l" == 1 Object, 3 mandatory longs, 1 optional long */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Oll|l", &queue_obj,
grpc_ce_completion_queue, &tag1, &tag2,
&flags) == FAILURE) {
zend_throw_exception(
spl_ce_InvalidArgumentException,
"invoke needs a CompletionQueue, 2 longs, and an optional long",
1 TSRMLS_CC);
return;
}
add_property_zval(getThis(), "completion_queue", queue_obj);
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
wrapped_grpc_completion_queue *queue =
(wrapped_grpc_completion_queue *)zend_object_store_get_object(
queue_obj TSRMLS_CC);
error_code = grpc_call_invoke_old(call->wrapped, queue->wrapped, (void *)tag1,
(void *)tag2, (gpr_uint32)flags);
MAYBE_THROW_CALL_ERROR(invoke, error_code);
}
/**
* Accept an incoming RPC, binding a completion queue to it. To be called after
* adding metadata to the call, but before sending messages. Can only be called
* on the server
* @param CompletionQueue $queue The completion queue to use with this call
* @param long $finished_tag The tag to associate with the finished event
* @param long $flags A bitwise combination of the Grpc\WRITE_* constants
* (optional)
* @return Void
*/
PHP_METHOD(Call, server_accept) {
long tag;
zval *queue_obj;
grpc_call_error error_code;
/* "Ol|l" == 1 Object, 1 long */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Ol", &queue_obj,
grpc_ce_completion_queue, &tag) == FAILURE) {
zend_throw_exception(
spl_ce_InvalidArgumentException,
"server_accept expects a CompletionQueue, a long, and an optional long",
1 TSRMLS_CC);
return;
error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped);
if (error != GRPC_CALL_OK) {
zend_throw_exception(spl_ce_LogicException,
"start_batch was called incorrectly",
(long)error TSRMLS_CC);
goto cleanup;
}
add_property_zval(getThis(), "completion_queue", queue_obj);
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
wrapped_grpc_completion_queue *queue =
(wrapped_grpc_completion_queue *)zend_object_store_get_object(
queue_obj TSRMLS_CC);
error_code =
grpc_call_server_accept_old(call->wrapped, queue->wrapped, (void *)tag);
MAYBE_THROW_CALL_ERROR(server_accept, error_code);
}
PHP_METHOD(Call, server_end_initial_metadata) {
grpc_call_error error_code;
long flags = 0;
/* "|l" == 1 optional long */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &flags) ==
FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"server_end_initial_metadata expects an optional long",
1 TSRMLS_CC);
}
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
error_code = grpc_call_server_end_initial_metadata_old(call->wrapped, flags);
MAYBE_THROW_CALL_ERROR(server_end_initial_metadata, error_code);
}
/**
* Called by clients to cancel an RPC on the server.
* @return Void
*/
PHP_METHOD(Call, cancel) {
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
grpc_call_error error_code = grpc_call_cancel(call->wrapped);
MAYBE_THROW_CALL_ERROR(cancel, error_code);
}
/**
* Queue a byte buffer for writing
* @param string $buffer The buffer to queue for writing
* @param long $tag The tag to associate with this write
* @param long $flags A bitwise combination of the Grpc\WRITE_* constants
* (optional)
* @return Void
*/
PHP_METHOD(Call, start_write) {
grpc_call_error error_code;
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
char *buffer;
int buffer_len;
long tag;
long flags = 0;
/* "Ol|l" == 1 Object, 1 mandatory long, 1 optional long */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sl|l", &buffer,
&buffer_len, &tag, &flags) == FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"start_write expects a string and an optional long",
event = grpc_completion_queue_pluck(call->queue, call->wrapped,
gpr_inf_future);
if (event->data.op_complete != GRPC_OP_OK) {
zend_throw_exception(spl_ce_LogicException,
"The batch failed for some reason",
1 TSRMLS_CC);
return;
}
error_code = grpc_call_start_write_old(
call->wrapped, string_to_byte_buffer(buffer, buffer_len), (void *)tag,
(gpr_uint32)flags);
MAYBE_THROW_CALL_ERROR(start_write, error_code);
}
/**
* Queue a status for writing
* @param long $status_code The status code to send
* @param string $status_details The status details to send
* @param long $tag The tag to associate with this status
* @return Void
*/
PHP_METHOD(Call, start_write_status) {
grpc_call_error error_code;
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
long status_code;
int status_details_length;
long tag;
char *status_details;
/* "lsl" == 1 long, 1 string, 1 long */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lsl", &status_code,
&status_details, &status_details_length,
&tag) == FAILURE) {
zend_throw_exception(
spl_ce_InvalidArgumentException,
"start_write_status expects a long, a string, and a long", 1 TSRMLS_CC);
return;
goto cleanup;
}
error_code = grpc_call_start_write_status_old(call->wrapped,
(grpc_status_code)status_code,
status_details, (void *)tag);
MAYBE_THROW_CALL_ERROR(start_write_status, error_code);
}
/**
* Indicate that there are no more messages to send
* @return Void
*/
PHP_METHOD(Call, writes_done) {
grpc_call_error error_code;
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
long tag;
/* "l" == 1 long */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &tag) == FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"writes_done expects a long", 1 TSRMLS_CC);
return;
for (int i = 0; i < op_num; i++) {
switch(ops[i].op) {
case GRPC_OP_SEND_INITIAL_METADATA:
add_property_bool(result, "send_metadata", true);
break;
case GRPC_OP_SEND_MESSAGE:
add_property_bool(result, "send_message", true);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
add_property_bool(result, "send_close", true);
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
add_property_bool(result, "send_status", true);
break;
case GRPC_OP_RECV_INITIAL_METADATA:
add_property_zval(result, "metadata",
grpc_parse_metadata_array(&recv_metadata));
break;
case GRPC_OP_RECV_MESSAGE:
byte_buffer_to_string(message, &message_str, &message_len);
add_property_stringl(result, "message", message_str, message_len,
false);
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
MAKE_STD_ZVAL(recv_status);
object_init(recv_status);
add_property_zval(recv_status, "metadata",
grpc_parse_metadata_array(&recv_trailing_metadata));
add_property_long(recv_status, "code", status);
add_property_string(recv_status, "details", status_details, true);
add_property_zval(result, "status", recv_status);
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
add_property_bool(result, "cancelled", cancelled);
break;
default:
break;
}
}
error_code = grpc_call_writes_done_old(call->wrapped, (void *)tag);
MAYBE_THROW_CALL_ERROR(writes_done, error_code);
}
/**
* Initiate a read on a call. Output event contains a byte buffer with the
* result of the read
* @param long $tag The tag to associate with this read
* @return Void
*/
PHP_METHOD(Call, start_read) {
grpc_call_error error_code;
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
long tag;
/* "l" == 1 long */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &tag) == FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"start_read expects a long", 1 TSRMLS_CC);
return;
cleanup:
grpc_metadata_array_destroy(&metadata);
grpc_metadata_array_destroy(&trailing_metadata);
grpc_metadata_array_destroy(&recv_metadata);
grpc_metadata_array_destroy(&recv_trailing_metadata);
if (status_details != NULL) {
gpr_free(status_details);
}
error_code = grpc_call_start_read_old(call->wrapped, (void *)tag);
MAYBE_THROW_CALL_ERROR(start_read, error_code);
RETURN_DESTROY_ZVAL(result);
}
static zend_function_entry call_methods[] = {
PHP_ME(Call, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
PHP_ME(Call, server_accept, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, server_end_initial_metadata, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, add_metadata, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, invoke, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, start_read, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, start_write, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, start_write_status, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, writes_done, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
PHP_ME(Call, start_batch, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
void grpc_init_call(TSRMLS_D) {
zend_class_entry ce;

@ -45,17 +45,6 @@
#include "grpc/grpc.h"
// Throw an exception if error_code is not OK
#define MAYBE_THROW_CALL_ERROR(func_name, error_code) \
do { \
if (error_code != GRPC_CALL_OK) { \
zend_throw_exception(spl_ce_LogicException, \
#func_name " was called incorrectly", \
(long)error_code TSRMLS_CC); \
return; \
} \
} while (0)
/* Class entry for the Call PHP class */
extern zend_class_entry *grpc_ce_call;
@ -65,16 +54,18 @@ typedef struct wrapped_grpc_call {
bool owned;
grpc_call *wrapped;
grpc_completion_queue *queue;
} wrapped_grpc_call;
/* Initializes the Call PHP class */
void grpc_init_call(TSRMLS_D);
/* Creates a Call object that wraps the given grpc_call struct */
zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned);
zval *grpc_php_wrap_call(grpc_call *wrapped, grpc_completion_queue *queue,
bool owned);
/* Creates and returns a PHP associative array of metadata from a C array of
* call metadata */
zval *grpc_call_create_metadata_array(int count, grpc_metadata *elements);
zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array);
#endif /* NET_GRPC_PHP_GRPC_CHANNEL_H_ */

@ -51,7 +51,6 @@
#include "grpc/support/log.h"
#include "grpc/grpc_security.h"
#include "completion_queue.h"
#include "server.h"
#include "credentials.h"
@ -139,6 +138,9 @@ PHP_METHOD(Channel, __construct) {
HashTable *array_hash;
zval **creds_obj = NULL;
wrapped_grpc_credentials *creds = NULL;
zval **override_obj;
char *override;
int override_len;
/* "s|a" == 1 string, 1 optional array */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|a", &target,
&target_length, &args_array) == FAILURE) {
@ -146,6 +148,8 @@ PHP_METHOD(Channel, __construct) {
"Channel expects a string and an array", 1 TSRMLS_CC);
return;
}
override = target;
override_len = target_length;
if (args_array == NULL) {
channel->wrapped = grpc_channel_create(target, NULL);
} else {
@ -162,6 +166,19 @@ PHP_METHOD(Channel, __construct) {
*creds_obj TSRMLS_CC);
zend_hash_del(array_hash, "credentials", 12);
}
if (zend_hash_find(array_hash, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
sizeof(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG),
(void **)&override_obj) == SUCCESS) {
if (Z_TYPE_PP(override_obj) != IS_STRING) {
zend_throw_exception(spl_ce_InvalidArgumentException,
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
" must be a string",
1 TSRMLS_CC);
return;
}
override = Z_STRVAL_PP(override_obj);
override_len = Z_STRLEN_PP(override_obj);
}
php_grpc_read_args_array(args_array, &args);
if (creds == NULL) {
channel->wrapped = grpc_channel_create(target, &args);
@ -172,8 +189,8 @@ PHP_METHOD(Channel, __construct) {
}
efree(args.args);
}
channel->target = ecalloc(target_length + 1, sizeof(char));
memcpy(channel->target, target, target_length);
channel->target = ecalloc(override_len + 1, sizeof(char));
memcpy(channel->target, override, override_len);
}
/**

@ -1,170 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "completion_queue.h"
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "php.h"
#include "php_ini.h"
#include "ext/standard/info.h"
#include "ext/spl/spl_exceptions.h"
#include "php_grpc.h"
#include "zend_exceptions.h"
#include <stdbool.h>
#include "grpc/grpc.h"
#include "event.h"
#include "timeval.h"
zend_class_entry *grpc_ce_completion_queue;
/* Frees and destroys a wrapped instance of grpc_completion_queue */
void free_wrapped_grpc_completion_queue(void *object TSRMLS_DC) {
wrapped_grpc_completion_queue *queue = NULL;
grpc_event *event;
queue = (wrapped_grpc_completion_queue *)object;
if (queue->wrapped != NULL) {
grpc_completion_queue_shutdown(queue->wrapped);
event = grpc_completion_queue_next(queue->wrapped, gpr_inf_future);
while (event != NULL) {
if (event->type == GRPC_QUEUE_SHUTDOWN) {
break;
}
event = grpc_completion_queue_next(queue->wrapped, gpr_inf_future);
}
grpc_completion_queue_destroy(queue->wrapped);
}
efree(queue);
}
/* Initializes an instance of wrapped_grpc_channel to be associated with an
* object of a class specified by class_type */
zend_object_value create_wrapped_grpc_completion_queue(
zend_class_entry *class_type TSRMLS_DC) {
zend_object_value retval;
wrapped_grpc_completion_queue *intern;
intern = (wrapped_grpc_completion_queue *)emalloc(
sizeof(wrapped_grpc_completion_queue));
memset(intern, 0, sizeof(wrapped_grpc_completion_queue));
zend_object_std_init(&intern->std, class_type TSRMLS_CC);
object_properties_init(&intern->std, class_type);
retval.handle = zend_objects_store_put(
intern, (zend_objects_store_dtor_t)zend_objects_destroy_object,
free_wrapped_grpc_completion_queue, NULL TSRMLS_CC);
retval.handlers = zend_get_std_object_handlers();
return retval;
}
/**
* Construct an instance of CompletionQueue
*/
PHP_METHOD(CompletionQueue, __construct) {
wrapped_grpc_completion_queue *queue =
(wrapped_grpc_completion_queue *)zend_object_store_get_object(getThis()
TSRMLS_CC);
queue->wrapped = grpc_completion_queue_create();
}
/**
* Blocks until an event is available, the completion queue is being shutdown,
* or timeout is reached. Returns NULL on timeout, otherwise the event that
* occurred. Callers should call event.finish once they have processed the
* event.
* @param Timeval $timeout The timeout for the event
* @return Event The event that occurred
*/
PHP_METHOD(CompletionQueue, next) {
zval *timeout;
/* "O" == 1 Object */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O", &timeout,
grpc_ce_timeval) == FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"next needs a Timeval", 1 TSRMLS_CC);
return;
}
wrapped_grpc_completion_queue *completion_queue =
(wrapped_grpc_completion_queue *)zend_object_store_get_object(getThis()
TSRMLS_CC);
wrapped_grpc_timeval *wrapped_timeout =
(wrapped_grpc_timeval *)zend_object_store_get_object(timeout TSRMLS_CC);
grpc_event *event = grpc_completion_queue_next(completion_queue->wrapped,
wrapped_timeout->wrapped);
if (event == NULL) {
RETURN_NULL();
}
zval *wrapped_event = grpc_php_convert_event(event);
RETURN_DESTROY_ZVAL(wrapped_event);
}
PHP_METHOD(CompletionQueue, pluck) {
long tag;
zval *timeout;
/* "lO" == 1 long, 1 Object */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lO", &tag, &timeout,
grpc_ce_timeval) == FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"pluck needs a long and a Timeval", 1 TSRMLS_CC);
}
wrapped_grpc_completion_queue *completion_queue =
(wrapped_grpc_completion_queue *)zend_object_store_get_object(getThis()
TSRMLS_CC);
wrapped_grpc_timeval *wrapped_timeout =
(wrapped_grpc_timeval *)zend_object_store_get_object(timeout TSRMLS_CC);
grpc_event *event = grpc_completion_queue_pluck(
completion_queue->wrapped, (void *)tag, wrapped_timeout->wrapped);
if (event == NULL) {
RETURN_NULL();
}
zval *wrapped_event = grpc_php_convert_event(event);
RETURN_DESTROY_ZVAL(wrapped_event);
}
static zend_function_entry completion_queue_methods[] = {
PHP_ME(CompletionQueue, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
PHP_ME(CompletionQueue, next, NULL, ZEND_ACC_PUBLIC)
PHP_ME(CompletionQueue, pluck, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
void grpc_init_completion_queue(TSRMLS_D) {
zend_class_entry ce;
INIT_CLASS_ENTRY(ce, "Grpc\\CompletionQueue", completion_queue_methods);
ce.create_object = create_wrapped_grpc_completion_queue;
grpc_ce_completion_queue = zend_register_internal_class(&ce TSRMLS_CC);
}

@ -1,62 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef NET_GRPC_PHP_GRPC_COMPLETION_QUEUE_H_
#define NET_GRPC_PHP_GRPC_COMPLETION_QUEUE_H_
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "php.h"
#include "php_ini.h"
#include "ext/standard/info.h"
#include "php_grpc.h"
#include "grpc/grpc.h"
/* Class entry for the PHP CompletionQueue class */
extern zend_class_entry *grpc_ce_completion_queue;
/* Wrapper class for grpc_completion_queue that can be associated with a
PHP object */
typedef struct wrapped_grpc_completion_queue {
zend_object std;
grpc_completion_queue *wrapped;
} wrapped_grpc_completion_queue;
/* Initialize the CompletionQueue class */
void grpc_init_completion_queue(TSRMLS_D);
#endif /* NET_GRPC_PHP_GRPC_COMPLETION_QUEUE_H_ */

@ -66,5 +66,5 @@ if test "$PHP_GRPC" != "no"; then
PHP_SUBST(GRPC_SHARED_LIBADD)
PHP_NEW_EXTENSION(grpc, byte_buffer.c call.c channel.c completion_queue.c credentials.c event.c timeval.c server.c server_credentials.c php_grpc.c, $ext_shared, , -Wall -Werror -pedantic -std=c99)
PHP_NEW_EXTENSION(grpc, byte_buffer.c call.c channel.c credentials.c timeval.c server.c server_credentials.c php_grpc.c, $ext_shared, , -Wall -Werror -pedantic -std=c99)
fi

@ -1,150 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "event.h"
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "php.h"
#include "php_ini.h"
#include "ext/standard/info.h"
#include "php_grpc.h"
#include <stdbool.h>
#include "grpc/grpc.h"
#include "byte_buffer.h"
#include "call.h"
#include "timeval.h"
/* Create a new PHP object containing the event data in the event struct.
event must not be used after this function is called */
zval *grpc_php_convert_event(grpc_event *event) {
zval *data_object;
char *detail_string;
size_t detail_len;
char *method_string;
size_t method_len;
char *host_string;
size_t host_len;
char *read_string;
size_t read_len;
zval *event_object;
if (event == NULL) {
return NULL;
}
MAKE_STD_ZVAL(event_object);
object_init(event_object);
add_property_zval(
event_object, "call",
grpc_php_wrap_call(event->call, event->type == GRPC_SERVER_RPC_NEW));
add_property_long(event_object, "type", event->type);
add_property_long(event_object, "tag", (long)event->tag);
switch (event->type) {
case GRPC_QUEUE_SHUTDOWN:
add_property_null(event_object, "data");
break;
case GRPC_READ:
if (event->data.read == NULL) {
add_property_null(event_object, "data");
} else {
byte_buffer_to_string(event->data.read, &read_string, &read_len);
add_property_stringl(event_object, "data", read_string, read_len, true);
}
break;
case GRPC_WRITE_ACCEPTED:
add_property_long(event_object, "data", (long)event->data.write_accepted);
break;
case GRPC_FINISH_ACCEPTED:
add_property_long(event_object, "data",
(long)event->data.finish_accepted);
break;
case GRPC_CLIENT_METADATA_READ:
data_object = grpc_call_create_metadata_array(
event->data.client_metadata_read.count,
event->data.client_metadata_read.elements);
add_property_zval(event_object, "data", data_object);
break;
case GRPC_FINISHED:
MAKE_STD_ZVAL(data_object);
object_init(data_object);
add_property_long(data_object, "code", event->data.finished.status);
if (event->data.finished.details == NULL) {
add_property_null(data_object, "details");
} else {
detail_len = strlen(event->data.finished.details);
detail_string = ecalloc(detail_len + 1, sizeof(char));
memcpy(detail_string, event->data.finished.details, detail_len);
add_property_string(data_object, "details", detail_string, true);
}
add_property_zval(data_object, "metadata",
grpc_call_create_metadata_array(
event->data.finished.metadata_count,
event->data.finished.metadata_elements));
add_property_zval(event_object, "data", data_object);
break;
case GRPC_SERVER_RPC_NEW:
MAKE_STD_ZVAL(data_object);
object_init(data_object);
method_len = strlen(event->data.server_rpc_new.method);
method_string = ecalloc(method_len + 1, sizeof(char));
memcpy(method_string, event->data.server_rpc_new.method, method_len);
add_property_string(data_object, "method", method_string, false);
host_len = strlen(event->data.server_rpc_new.host);
host_string = ecalloc(host_len + 1, sizeof(char));
memcpy(host_string, event->data.server_rpc_new.host, host_len);
add_property_string(data_object, "host", host_string, false);
add_property_zval(
data_object, "absolute_timeout",
grpc_php_wrap_timeval(event->data.server_rpc_new.deadline));
add_property_zval(data_object, "metadata",
grpc_call_create_metadata_array(
event->data.server_rpc_new.metadata_count,
event->data.server_rpc_new.metadata_elements));
add_property_zval(event_object, "data", data_object);
break;
default:
add_property_null(event_object, "data");
break;
}
grpc_event_finish(event);
return event_object;
}

@ -1,51 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef NET_GRPC_PHP_GRPC_EVENT_H_
#define NET_GRPC_PHP_GRPC_EVENT_H_
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "php.h"
#include "php_ini.h"
#include "ext/standard/info.h"
#include "php_grpc.h"
#include "grpc/grpc.h"
/* Create a new Event object that wraps an existing grpc_event struct */
zval *grpc_php_convert_event(grpc_event *event);
#endif /* NET_GRPC_PHP_GRPC_COMPLETION_CHANNEL_H */

@ -34,8 +34,6 @@
#include "call.h"
#include "channel.h"
#include "server.h"
#include "completion_queue.h"
#include "event.h"
#include "timeval.h"
#include "credentials.h"
#include "server_credentials.h"
@ -127,27 +125,12 @@ PHP_MINIT_FUNCTION(grpc) {
REGISTER_LONG_CONSTANT("Grpc\\CALL_ERROR_INVALID_FLAGS",
GRPC_CALL_ERROR_INVALID_FLAGS, CONST_CS);
/* Register op error constants */
REGISTER_LONG_CONSTANT("Grpc\\OP_OK", GRPC_OP_OK, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\OP_ERROR", GRPC_OP_ERROR, CONST_CS);
/* Register flag constants */
REGISTER_LONG_CONSTANT("Grpc\\WRITE_BUFFER_HINT", GRPC_WRITE_BUFFER_HINT,
CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\WRITE_NO_COMPRESS", GRPC_WRITE_NO_COMPRESS,
CONST_CS);
/* Register completion type constants */
REGISTER_LONG_CONSTANT("Grpc\\QUEUE_SHUTDOWN", GRPC_QUEUE_SHUTDOWN, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\READ", GRPC_READ, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\FINISH_ACCEPTED", GRPC_FINISH_ACCEPTED,
CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\WRITE_ACCEPTED", GRPC_WRITE_ACCEPTED, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\CLIENT_METADATA_READ",
GRPC_CLIENT_METADATA_READ, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\FINISHED", GRPC_FINISHED, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\SERVER_RPC_NEW", GRPC_SERVER_RPC_NEW, CONST_CS);
/* Register status constants */
REGISTER_LONG_CONSTANT("Grpc\\STATUS_OK", GRPC_STATUS_OK, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\STATUS_CANCELLED", GRPC_STATUS_CANCELLED,
@ -181,10 +164,27 @@ PHP_MINIT_FUNCTION(grpc) {
REGISTER_LONG_CONSTANT("Grpc\\STATUS_DATA_LOSS", GRPC_STATUS_DATA_LOSS,
CONST_CS);
/* Register op type constants */
REGISTER_LONG_CONSTANT("Grpc\\OP_SEND_INITIAL_METADATA",
GRPC_OP_SEND_INITIAL_METADATA, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\OP_SEND_MESSAGE",
GRPC_OP_SEND_MESSAGE, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\OP_SEND_CLOSE_FROM_CLIENT",
GRPC_OP_SEND_CLOSE_FROM_CLIENT, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\OP_SEND_STATUS_FROM_SERVER",
GRPC_OP_SEND_STATUS_FROM_SERVER, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\OP_RECV_INITIAL_METADATA",
GRPC_OP_RECV_INITIAL_METADATA, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\OP_RECV_MESSAGE",
GRPC_OP_RECV_MESSAGE, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\OP_RECV_STATUS_ON_CLIENT",
GRPC_OP_RECV_STATUS_ON_CLIENT, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\OP_RECV_CLOSE_ON_SERVER",
GRPC_OP_RECV_CLOSE_ON_SERVER, CONST_CS);
grpc_init_call(TSRMLS_C);
grpc_init_channel(TSRMLS_C);
grpc_init_server(TSRMLS_C);
grpc_init_completion_queue(TSRMLS_C);
grpc_init_timeval(TSRMLS_C);
grpc_init_credentials(TSRMLS_C);
grpc_init_server_credentials(TSRMLS_C);

@ -52,15 +52,27 @@
#include "grpc/grpc_security.h"
#include "server.h"
#include "completion_queue.h"
#include "channel.h"
#include "server_credentials.h"
#include "timeval.h"
zend_class_entry *grpc_ce_server;
/* Frees and destroys an instance of wrapped_grpc_server */
void free_wrapped_grpc_server(void *object TSRMLS_DC) {
wrapped_grpc_server *server = (wrapped_grpc_server *)object;
grpc_event *event;
if (server->queue != NULL) {
grpc_completion_queue_shutdown(server->queue);
event = grpc_completion_queue_next(server->queue, gpr_inf_future);
while (event != NULL) {
if (event->type == GRPC_QUEUE_SHUTDOWN) {
break;
}
event = grpc_completion_queue_next(server->queue, gpr_inf_future);
}
grpc_completion_queue_destroy(server->queue);
}
if (server->wrapped != NULL) {
grpc_server_shutdown(server->wrapped);
grpc_server_destroy(server->wrapped);
@ -95,26 +107,22 @@ zend_object_value create_wrapped_grpc_server(zend_class_entry *class_type
PHP_METHOD(Server, __construct) {
wrapped_grpc_server *server =
(wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC);
zval *queue_obj;
zval *args_array = NULL;
grpc_channel_args args;
/* "O|a" == 1 Object, 1 optional array */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O|a", &queue_obj,
grpc_ce_completion_queue, &args_array) == FAILURE) {
/* "|a" == 1 optional array */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|a", &args_array) ==
FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Server expects a CompletionQueue and an array",
"Server expects an array",
1 TSRMLS_CC);
return;
}
add_property_zval(getThis(), "completion_queue", queue_obj);
wrapped_grpc_completion_queue *queue =
(wrapped_grpc_completion_queue *)zend_object_store_get_object(
queue_obj TSRMLS_CC);
server->queue = grpc_completion_queue_create();
if (args_array == NULL) {
server->wrapped = grpc_server_create(queue->wrapped, NULL);
server->wrapped = grpc_server_create(server->queue, NULL);
} else {
php_grpc_read_args_array(args_array, &args);
server->wrapped = grpc_server_create(queue->wrapped, &args);
server->wrapped = grpc_server_create(server->queue, &args);
efree(args.args);
}
}
@ -129,16 +137,40 @@ PHP_METHOD(Server, request_call) {
grpc_call_error error_code;
wrapped_grpc_server *server =
(wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC);
long tag_new;
/* "l" == 1 long */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &tag_new) ==
FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"request_call expects a long", 1 TSRMLS_CC);
return;
grpc_call *call;
grpc_call_details details;
grpc_metadata_array metadata;
zval *result;
grpc_event *event;
MAKE_STD_ZVAL(result);
object_init(result);
grpc_call_details_init(&details);
grpc_metadata_array_init(&metadata);
error_code = grpc_server_request_call(server->wrapped, &call, &details,
&metadata, server->queue, NULL);
if (error_code != GRPC_CALL_OK) {
zend_throw_exception(spl_ce_LogicException, "request_call failed",
(long)error_code TSRMLS_CC);
goto cleanup;
}
event = grpc_completion_queue_pluck(server->queue, NULL, gpr_inf_future);
if (event->data.op_complete != GRPC_OP_OK) {
zend_throw_exception(spl_ce_LogicException,
"Failed to request a call for some reason",
1 TSRMLS_CC);
goto cleanup;
}
error_code = grpc_server_request_call_old(server->wrapped, (void *)tag_new);
MAYBE_THROW_CALL_ERROR(request_call, error_code);
add_property_zval(result, "call", grpc_php_wrap_call(call, server->queue,
true));
add_property_string(result, "method", details.method, true);
add_property_string(result, "host", details.host, true);
add_property_zval(result, "absolute_deadline",
grpc_php_wrap_timeval(details.deadline));
add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata));
cleanup:
grpc_call_details_destroy(&details);
grpc_metadata_array_destroy(&metadata);
RETURN_DESTROY_ZVAL(result);
}
/**
@ -168,7 +200,7 @@ PHP_METHOD(Server, add_secure_http2_port) {
int addr_len;
zval *creds_obj;
/* "sO" == 1 string, 1 object */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &addr, &addr_len,
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sO", &addr, &addr_len,
&creds_obj, grpc_ce_server_credentials) ==
FAILURE) {
zend_throw_exception(

@ -53,6 +53,7 @@ typedef struct wrapped_grpc_server {
zend_object std;
grpc_server *wrapped;
grpc_completion_queue *queue;
} wrapped_grpc_server;
/* Initializes the Server class */

@ -38,9 +38,7 @@ require_once realpath(dirname(__FILE__) . '/../autoload.php');
* Represents an active call that allows sending and recieving binary data
*/
class ActiveCall {
private $completion_queue;
private $call;
private $flags;
private $metadata;
/**
@ -48,24 +46,15 @@ class ActiveCall {
* @param Channel $channel The channel to communicate on
* @param string $method The method to call on the remote server
* @param array $metadata Metadata to send with the call, if applicable
* @param long $flags Write flags to use with this call
*/
public function __construct(Channel $channel,
$method,
$metadata = array(),
$flags = 0) {
$this->completion_queue = new CompletionQueue();
$metadata = array()) {
$this->call = new Call($channel, $method, Timeval::inf_future());
$this->call->add_metadata($metadata, 0);
$this->flags = $flags;
// Invoke the call.
$this->call->invoke($this->completion_queue,
CLIENT_METADATA_READ,
FINISHED, 0);
$metadata_event = $this->completion_queue->pluck(CLIENT_METADATA_READ,
Timeval::inf_future());
$this->metadata = $metadata_event->data;
$event = $this->call->start_batch([OP_SEND_INITIAL_METADATA => $metadata]);
$this->metadata = $event->metadata;
}
/**
@ -87,8 +76,7 @@ class ActiveCall {
* @return The next message from the server, or null if there is none.
*/
public function read() {
$this->call->start_read(READ);
$read_event = $this->completion_queue->pluck(READ, Timeval::inf_future());
$read_event = $this->call->start_batch([OP_RECV_MESSAGE => true]);
return $read_event->data;
}
@ -98,16 +86,14 @@ class ActiveCall {
* @param ByteBuffer $data The data to write
*/
public function write($data) {
$this->call->start_write($data, WRITE_ACCEPTED, $this->flags);
$this->completion_queue->pluck(WRITE_ACCEPTED, Timeval::inf_future());
$this->call->start_batch([OP_SEND_MESSAGE => $data]);
}
/**
* Indicate that no more writes will be sent.
*/
public function writesDone() {
$this->call->writes_done(FINISH_ACCEPTED);
$this->completion_queue->pluck(FINISH_ACCEPTED, Timeval::inf_future());
$this->call->start_batch([OP_SEND_CLOSE_FROM_CLIENT => true]);
}
/**
@ -116,8 +102,7 @@ class ActiveCall {
* and array $metadata members
*/
public function getStatus() {
$status_event = $this->completion_queue->pluck(FINISHED,
Timeval::inf_future());
$status_event = $this->call->start_batch([RECV_STATUS_ON_CLIENT => true]);
return $status_event->data;
}
}

@ -36,65 +36,47 @@ class CallTest extends PHPUnit_Framework_TestCase{
static $port;
public static function setUpBeforeClass() {
$cq = new Grpc\CompletionQueue();
self::$server = new Grpc\Server($cq, []);
self::$server = new Grpc\Server([]);
self::$port = self::$server->add_http2_port('0.0.0.0:0');
}
public function setUp() {
$this->cq = new Grpc\CompletionQueue();
$this->channel = new Grpc\Channel('localhost:' . self::$port, []);
$this->call = new Grpc\Call($this->channel,
'/foo',
Grpc\Timeval::inf_future());
}
/**
* @expectedException LogicException
* @expectedExceptionCode Grpc\CALL_ERROR_INVALID_FLAGS
* @expectedExceptionMessage invoke
*/
public function testInvokeRejectsBadFlags() {
$this->call->invoke($this->cq, 0, 0, 0xDEADBEEF);
}
/**
* @expectedException LogicException
* @expectedExceptionCode Grpc\CALL_ERROR_NOT_ON_CLIENT
* @expectedExceptionMessage server_accept
*/
public function testServerAcceptFailsCorrectly() {
$this->call->server_accept($this->cq, 0);
}
/* These test methods with assertTrue(true) at the end just check that the
method calls completed without errors. PHPUnit warns for tests with no
asserts, and this avoids that warning without changing the meaning of the
tests */
public function testAddEmptyMetadata() {
$this->call->add_metadata([], 0);
/* Dummy assert: Checks that the previous call completed without error */
$this->assertTrue(true);
$batch = [
Grpc\OP_SEND_INITIAL_METADATA => []
];
$result = $this->call->start_batch($batch);
$this->assertTrue($result->send_metadata);
}
public function testAddSingleMetadata() {
$this->call->add_metadata(['key' => ['value']], 0);
/* Dummy assert: Checks that the previous call completed without error */
$this->assertTrue(true);
$batch = [
Grpc\OP_SEND_INITIAL_METADATA => ['key' => ['value']]
];
$result = $this->call->start_batch($batch);
$this->assertTrue($result->send_metadata);
}
public function testAddMultiValueMetadata() {
$this->call->add_metadata(['key' => ['value1', 'value2']], 0);
/* Dummy assert: Checks that the previous call completed without error */
$this->assertTrue(true);
$batch = [
Grpc\OP_SEND_INITIAL_METADATA => ['key' => ['value1', 'value2']]
];
$result = $this->call->start_batch($batch);
$this->assertTrue($result->send_metadata);
}
public function testAddSingleAndMultiValueMetadata() {
$this->call->add_metadata(
['key1' => ['value1'],
'key2' => ['value2', 'value3']], 0);
/* Dummy assert: Checks that the previous call completed without error */
$this->assertTrue(true);
$batch = [
Grpc\OP_SEND_INITIAL_METADATA => ['key1' => ['value1'],
'key2' => ['value2', 'value3']]
];
$result = $this->call->start_batch($batch);
$this->assertTrue($result->send_metadata);
}
}

@ -1,46 +0,0 @@
<?php
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
class CompletionQueueTest extends PHPUnit_Framework_TestCase{
public function testNextReturnsNullWithNoCall() {
$cq = new Grpc\CompletionQueue();
$event = $cq->next(Grpc\Timeval::zero());
$this->assertNull($event);
}
public function testPluckReturnsNullWithNoCall() {
$cq = new Grpc\CompletionQueue();
$event = $cq->pluck(0, Grpc\Timeval::zero());
$this->assertNull($event);
}
}

@ -33,18 +33,15 @@
*/
class EndToEndTest extends PHPUnit_Framework_TestCase{
public function setUp() {
$this->client_queue = new Grpc\CompletionQueue();
$this->server_queue = new Grpc\CompletionQueue();
$this->server = new Grpc\Server($this->server_queue, []);
$this->server = new Grpc\Server([]);
$port = $this->server->add_http2_port('0.0.0.0:0');
$this->channel = new Grpc\Channel('localhost:' . $port, []);
$this->server->start();
}
public function tearDown() {
unset($this->channel);
unset($this->server);
unset($this->client_queue);
unset($this->server_queue);
}
public function testSimpleRequestBody() {
@ -53,55 +50,45 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline);
$tag = 1;
$call->invoke($this->client_queue, $tag, $tag);
$server_tag = 2;
$call->writes_done($tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
$this->assertSame(Grpc\OP_OK, $event->data);
// check that a server rpc new was received
$this->server->start();
$this->server->request_call($server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\SERVER_RPC_NEW, $event->type);
$server_call = $event->call;
$this->assertNotNull($server_call);
$server_call->server_accept($this->server_queue, $server_tag);
$server_call->server_end_initial_metadata();
$event = $call->start_batch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true
]);
// the server sends the status
$server_call->start_write_status(Grpc\STATUS_OK, $status_text, $server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
$this->assertSame(Grpc\OP_OK, $event->data);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_close);
// the client gets CLIENT_METADATA_READ
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\CLIENT_METADATA_READ, $event->type);
$event = $this->server->request_call();
$this->assertSame('dummy_method', $event->method);
$this->assertSame([], $event->metadata);
$server_call = $event->call;
// the client gets FINISHED
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISHED, $event->type);
$status = $event->data;
$event = $server_call->start_batch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => $status_text
],
Grpc\OP_RECV_CLOSE_ON_SERVER => true
]);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_status);
$this->assertFalse($event->cancelled);
$event = $call->start_batch([
Grpc\OP_RECV_INITIAL_METADATA => true,
Grpc\OP_RECV_STATUS_ON_CLIENT => true
]);
$this->assertSame([], $event->metadata);
$status = $event->status;
$this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
// and the server gets FINISHED
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISHED, $event->type);
$status = $event->data;
unset($call);
unset($server_call);
}
@ -115,79 +102,52 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline);
$tag = 1;
$call->invoke($this->client_queue, $tag, $tag);
$server_tag = 2;
$event = $call->start_batch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true,
Grpc\OP_SEND_MESSAGE => $req_text
]);
// the client writes
$call->start_write($req_text, $tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\WRITE_ACCEPTED, $event->type);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_close);
$this->assertTrue($event->send_message);
// check that a server rpc new was received
$this->server->start();
$this->server->request_call($server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\SERVER_RPC_NEW, $event->type);
$event = $this->server->request_call();
$this->assertSame('dummy_method', $event->method);
$server_call = $event->call;
$this->assertNotNull($server_call);
$server_call->server_accept($this->server_queue, $server_tag);
$server_call->server_end_initial_metadata();
// start the server read
$server_call->start_read($server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\READ, $event->type);
$this->assertSame($req_text, $event->data);
// the server replies
$server_call->start_write($reply_text, $server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\WRITE_ACCEPTED, $event->type);
// the client reads the metadata
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\CLIENT_METADATA_READ, $event->type);
// the client reads the reply
$call->start_read($tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\READ, $event->type);
$this->assertSame($reply_text, $event->data);
// the client sends writes done
$call->writes_done($tag);
$event = $this->client_queue->next($deadline);
$this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
$this->assertSame(Grpc\OP_OK, $event->data);
// the server sends the status
$server_call->start_write_status(GRPC\STATUS_OK, $status_text, $server_tag);
$event = $this->server_queue->next($deadline);
$this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
$this->assertSame(Grpc\OP_OK, $event->data);
// the client gets FINISHED
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISHED, $event->type);
$status = $event->data;
$event = $server_call->start_batch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_MESSAGE => $reply_text,
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => $status_text
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_status);
$this->assertTrue($event->send_message);
$this->assertFalse($event->cancelled);
$this->assertSame($req_text, $event->message);
$event = $call->start_batch([
Grpc\OP_RECV_INITIAL_METADATA => true,
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_STATUS_ON_CLIENT => true,
]);
$this->assertSame([], $event->metadata);
$this->assertSame($reply_text, $event->message);
$status = $event->status;
$this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
// and the server gets FINISHED
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISHED, $event->type);
unset($call);
unset($server_call);
}

@ -33,17 +33,16 @@
*/
class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
public function setUp() {
$this->client_queue = new Grpc\CompletionQueue();
$this->server_queue = new Grpc\CompletionQueue();
$credentials = Grpc\Credentials::createSsl(
file_get_contents(dirname(__FILE__) . '/../data/ca.pem'));
$server_credentials = Grpc\ServerCredentials::createSsl(
null,
file_get_contents(dirname(__FILE__) . '/../data/server1.key'),
file_get_contents(dirname(__FILE__) . '/../data/server1.pem'));
$this->server = new Grpc\Server($this->server_queue);
$this->server = new Grpc\Server();
$port = $this->server->add_secure_http2_port('0.0.0.0:0',
$server_credentials);
$this->server->start();
$this->channel = new Grpc\Channel(
'localhost:' . $port,
[
@ -55,70 +54,58 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
public function tearDown() {
unset($this->channel);
unset($this->server);
unset($this->client_queue);
unset($this->server_queue);
}
public function testSimpleRequestBody() {
$this->server->start();
$deadline = Grpc\Timeval::inf_future();
$status_text = 'xyz';
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline);
$tag = 1;
$call->invoke($this->client_queue, $tag, $tag);
$server_tag = 2;
$call->writes_done($tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
$this->assertSame(Grpc\OP_OK, $event->data);
// check that a server rpc new was received
$this->server->request_call($server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\SERVER_RPC_NEW, $event->type);
$event = $call->start_batch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true
]);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_close);
$event = $this->server->request_call();
$this->assertSame('dummy_method', $event->method);
$this->assertSame([], $event->metadata);
$server_call = $event->call;
$this->assertNotNull($server_call);
$server_call->server_accept($this->server_queue, $server_tag);
$server_call->server_end_initial_metadata();
// the server sends the status
$server_call->start_write_status(Grpc\STATUS_OK, $status_text, $server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
$this->assertSame(Grpc\OP_OK, $event->data);
// the client gets CLIENT_METADATA_READ
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\CLIENT_METADATA_READ, $event->type);
// the client gets FINISHED
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISHED, $event->type);
$status = $event->data;
$event = $server_call->start_batch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => $status_text
],
Grpc\OP_RECV_CLOSE_ON_SERVER => true
]);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_status);
$this->assertFalse($event->cancelled);
$event = $call->start_batch([
Grpc\OP_RECV_INITIAL_METADATA => true,
Grpc\OP_RECV_STATUS_ON_CLIENT => true
]);
$this->assertSame([], $event->metadata);
$status = $event->status;
$this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
// and the server gets FINISHED
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISHED, $event->type);
$status = $event->data;
unset($call);
unset($server_call);
}
public function testClientServerFullRequestResponse() {
$this->server->start();
$deadline = Grpc\Timeval::inf_future();
$req_text = 'client_server_full_request_response';
$reply_text = 'reply:client_server_full_request_response';
@ -127,78 +114,52 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline);
$tag = 1;
$call->invoke($this->client_queue, $tag, $tag);
$server_tag = 2;
// the client writes
$call->start_write($req_text, $tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\WRITE_ACCEPTED, $event->type);
// check that a server rpc new was received
$this->server->request_call($server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\SERVER_RPC_NEW, $event->type);
$event = $call->start_batch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true,
Grpc\OP_SEND_MESSAGE => $req_text
]);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_close);
$this->assertTrue($event->send_message);
$event = $this->server->request_call();
$this->assertSame('dummy_method', $event->method);
$server_call = $event->call;
$this->assertNotNull($server_call);
$server_call->server_accept($this->server_queue, $server_tag);
$server_call->server_end_initial_metadata();
// start the server read
$server_call->start_read($server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\READ, $event->type);
$this->assertSame($req_text, $event->data);
// the server replies
$server_call->start_write($reply_text, $server_tag);
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\WRITE_ACCEPTED, $event->type);
// the client reads the metadata
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\CLIENT_METADATA_READ, $event->type);
// the client reads the reply
$call->start_read($tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\READ, $event->type);
$this->assertSame($reply_text, $event->data);
// the client sends writes done
$call->writes_done($tag);
$event = $this->client_queue->next($deadline);
$this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
$this->assertSame(Grpc\OP_OK, $event->data);
// the server sends the status
$server_call->start_write_status(GRPC\STATUS_OK, $status_text, $server_tag);
$event = $this->server_queue->next($deadline);
$this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
$this->assertSame(Grpc\OP_OK, $event->data);
// the client gets FINISHED
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISHED, $event->type);
$status = $event->data;
$event = $server_call->start_batch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_MESSAGE => $reply_text,
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => $status_text
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
$this->assertTrue($event->send_metadata);
$this->assertTrue($event->send_status);
$this->assertTrue($event->send_message);
$this->assertFalse($event->cancelled);
$this->assertSame($req_text, $event->message);
$event = $call->start_batch([
Grpc\OP_RECV_INITIAL_METADATA => true,
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_STATUS_ON_CLIENT => true,
]);
$this->assertSame([], $event->metadata);
$this->assertSame($reply_text, $event->message);
$status = $event->status;
$this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
// and the server gets FINISHED
$event = $this->server_queue->next($deadline);
$this->assertNotNull($event);
$this->assertSame(Grpc\FINISHED, $event->type);
unset($call);
unset($server_call);
}

@ -2,14 +2,17 @@
require 'rake/extensiontask'
require 'rspec/core/rake_task'
require 'rubocop/rake_task'
require 'bundler/gem_tasks'
desc 'Run Rubocop to check for style violations'
# Add rubocop style checking tasks
RuboCop::RakeTask.new
# Add the extension compiler task
Rake::ExtensionTask.new 'grpc' do |ext|
ext.lib_dir = File.join('lib', 'grpc')
end
# Define the test suites
SPEC_SUITES = [
{ id: :wrapper, title: 'wrapper layer', files: %w(spec/*.rb) },
{ id: :idiomatic, title: 'idiomatic layer', dir: %w(spec/generic),
@ -19,36 +22,34 @@ SPEC_SUITES = [
{ id: :server, title: 'rpc server thread tests', dir: %w(spec/generic),
tag: 'server' }
]
namespace :suite do
SPEC_SUITES.each do |suite|
desc "Run all specs in the #{suite[:title]} spec suite"
RSpec::Core::RakeTask.new(suite[:id]) do |t|
spec_files = []
suite[:files].each { |f| spec_files += Dir[f] } if suite[:files]
if suite[:dir]
suite[:dir].each { |f| spec_files += Dir["#{f}/**/*_spec.rb"] }
end
helper = 'spec/spec_helper.rb'
spec_files << helper unless spec_files.include?(helper)
desc 'Run all RSpec tests'
namespace :spec do
namespace :suite do
SPEC_SUITES.each do |suite|
desc "Run all specs in #{suite[:title]} spec suite"
RSpec::Core::RakeTask.new(suite[:id]) do |t|
spec_files = []
suite[:files].each { |f| spec_files += Dir[f] } if suite[:files]
if suite[:dirs]
suite[:dirs].each { |f| spec_files += Dir["#{f}/**/*_spec.rb"] }
end
t.pattern = spec_files
t.rspec_opts = "--tag #{suite[:tag]}" if suite[:tag]
if suite[:tags]
t.rspec_opts = suite[:tags].map { |x| "--tag #{x}" }.join(' ')
end
t.pattern = spec_files
t.rspec_opts = "--tag #{suite[:tag]}" if suite[:tag]
if suite[:tags]
t.rspec_opts = suite[:tags].map { |x| "--tag #{x}" }.join(' ')
end
end
end
end
desc 'Compiles the extension then runs all the tests'
task :all
# Define dependencies between the suites.
task 'suite:wrapper' => [:compile, :rubocop]
task 'suite:idiomatic' => 'suite:wrapper'
task 'suite:bidi' => 'suite:wrapper'
task 'suite:server' => 'suite:wrapper'
desc 'Compiles the gRPC extension then runs all the tests'
task all: ['suite:idiomatic', 'suite:bidi', 'suite:server']
task default: :all
task 'spec:suite:wrapper' => [:compile, :rubocop]
task 'spec:suite:idiomatic' => 'spec:suite:wrapper'
task 'spec:suite:bidi' => 'spec:suite:wrapper'
task 'spec:suite:server' => 'spec:suite:wrapper'
task all: ['spec:suite:idiomatic', 'spec:suite:bidi', 'spec:suite:server']

@ -176,25 +176,26 @@ module GRPC
unmarshal = desc.unmarshal_proc(:output)
route = "/#{route_prefix}/#{name}"
if desc.request_response?
define_method(mth_name) do |req, deadline = nil|
define_method(mth_name) do |req, deadline = nil, **kw|
logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline)
request_response(route, req, marshal, unmarshal, deadline, **kw)
end
elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil|
define_method(mth_name) do |reqs, deadline = nil, **kw|
logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline)
client_streamer(route, reqs, marshal, unmarshal, deadline, **kw)
end
elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, &blk|
define_method(mth_name) do |req, deadline = nil, **kw, &blk|
logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline,
server_streamer(route, req, marshal, unmarshal, deadline, **kw,
&blk)
end
else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, &blk|
define_method(mth_name) do |reqs, deadline = nil, **kw, &blk|
logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk)
bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw,
&blk)
end
end
end

@ -67,7 +67,7 @@ describe GRPC::ActiveCall do
end
describe '#multi_req_view' do
xit 'exposes a fixed subset of the ActiveCall methods' do
it 'exposes a fixed subset of the ActiveCall methods' do
want = %w(cancelled, deadline, each_remote_read, metadata, shutdown)
v = @client_call.multi_req_view
want.each do |w|
@ -77,7 +77,7 @@ describe GRPC::ActiveCall do
end
describe '#single_req_view' do
xit 'exposes a fixed subset of the ActiveCall methods' do
it 'exposes a fixed subset of the ActiveCall methods' do
want = %w(cancelled, deadline, metadata, shutdown)
v = @client_call.single_req_view
want.each do |w|

@ -384,13 +384,7 @@ describe 'ClientStub' do
th.join
end
# disabled because an unresolved wire-protocol implementation feature
#
# - servers should be able initiate messaging, however, as it stand
# servers don't know if all the client metadata has been sent until
# they receive a message from the client. Without receiving all the
# metadata, the server does not accept the call, so this test hangs.
xit 'supports a server-initiated ping pong', bidi: true do
it 'supports a server-initiated ping pong', bidi: true do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)

@ -81,14 +81,17 @@ EchoStub = EchoService.rpc_stub_class
class SlowService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
attr_reader :received_md, :delay
def initialize(_default_var = 'ignored')
@delay = 0.25
@received_md = []
end
def an_rpc(req, _call)
delay = 0.25
logger.info("starting a slow #{delay} rpc")
sleep delay
def an_rpc(req, call)
logger.info("starting a slow #{@delay} rpc")
sleep @delay
@received_md << call.metadata unless call.metadata.nil?
req # send back the req as the response
end
end
@ -354,6 +357,37 @@ describe GRPC::RpcServer do
t.join
end
it 'should receive metadata when a deadline is specified', server: true do
service = SlowService.new
@srv.handle(service)
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
stub = SlowStub.new(@host, **@client_opts)
deadline = service.delay + 0.5 # wait for long enough
expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
expect(service.received_md).to eq(wanted_md)
@srv.stop
t.join
end
it 'should not receive metadata if the client times out', server: true do
service = SlowService.new
@srv.handle(service)
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
stub = SlowStub.new(@host, **@client_opts)
deadline = 0.1 # too short for SlowService to respond
blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') }
expect(&blk).to raise_error GRPC::BadStatus
wanted_md = []
expect(service.received_md).to eq(wanted_md)
@srv.stop
t.join
end
it 'should receive updated metadata', server: true do
service = EchoService.new
@srv.handle(service)

@ -39,10 +39,15 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include <openssl/crypto.h>
#include "src/core/support/string.h"
#include "src/core/tsi/ssl_transport_security.h"
#include "test/core/util/test_config.h"
/* Currently points to 1.0.2a. */
#define GRPC_MIN_OPENSSL_VERSION_NUMBER 0x1000201fL
typedef struct {
/* 1 if success, 0 if failure. */
int expected;
@ -296,8 +301,13 @@ static void test_peer_matches_name(void) {
}
}
static void test_openssl_version(void) {
GPR_ASSERT(OPENSSL_VERSION_NUMBER >= GRPC_MIN_OPENSSL_VERSION_NUMBER);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_peer_matches_name();
test_openssl_version();
return 0;
}

@ -595,6 +595,5 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS();
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return result;
}

@ -432,6 +432,5 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS();
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return result;
}

@ -83,8 +83,8 @@ bool ParseFromByteBuffer(ByteBuffer* buffer, grpc::protobuf::Message* message) {
buffer->Dump(&slices);
grpc::string buf;
buf.reserve(buffer->Length());
for (const Slice& s : slices) {
buf.append(reinterpret_cast<const char*>(s.begin()), s.size());
for (auto s = slices.begin(); s != slices.end(); s++) {
buf.append(reinterpret_cast<const char*>(s->begin()), s->size());
}
return message->ParseFromString(buf);
}
@ -265,6 +265,5 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS();
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return result;
}

@ -148,7 +148,8 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
for (auto& channel : channels_) {
for (auto channel = channels_.begin(); channel != channels_.end();
channel++) {
auto* cq = cli_cqs_[t].get();
t = (t + 1) % cli_cqs_.size();
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
@ -156,7 +157,7 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
return stub->AsyncUnaryCall(ctx, request, cq, tag);
};
TestService::Stub* stub = channel.get_stub();
TestService::Stub* stub = channel->get_stub();
const SimpleRequest& request = request_;
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, request, start_req, check_done);
@ -169,11 +170,11 @@ class AsyncUnaryClient GRPC_FINAL : public Client {
~AsyncUnaryClient() GRPC_OVERRIDE {
EndThreads();
for (auto& cq : cli_cqs_) {
cq->Shutdown();
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown();
void* got_tag;
bool ok;
while (cq->Next(&got_tag, &ok)) {
while ((*cq)->Next(&got_tag, &ok)) {
delete ClientRpcContext::detag(got_tag);
}
}

@ -154,19 +154,19 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
server_mark.mutable_mark();
ClientArgs client_mark;
client_mark.mutable_mark();
for (auto& server : servers) {
GPR_ASSERT(server.stream->Write(server_mark));
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
for (auto& client : clients) {
GPR_ASSERT(client.stream->Write(client_mark));
for (auto client = clients.begin(); client != clients.end(); client++) {
GPR_ASSERT(client->stream->Write(client_mark));
}
ServerStatus server_status;
ClientStatus client_status;
for (auto& server : servers) {
GPR_ASSERT(server.stream->Read(&server_status));
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Read(&server_status));
}
for (auto& client : clients) {
GPR_ASSERT(client.stream->Read(&client_status));
for (auto client = clients.begin(); client != clients.end(); client++) {
GPR_ASSERT(client->stream->Read(&client_status));
}
// Wait some time
@ -176,33 +176,33 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
// Finish a run
ScenarioResult result;
gpr_log(GPR_INFO, "Finishing");
for (auto& server : servers) {
GPR_ASSERT(server.stream->Write(server_mark));
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
for (auto& client : clients) {
GPR_ASSERT(client.stream->Write(client_mark));
for (auto client = clients.begin(); client != clients.end(); client++) {
GPR_ASSERT(client->stream->Write(client_mark));
}
for (auto& server : servers) {
GPR_ASSERT(server.stream->Read(&server_status));
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
result.server_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
}
for (auto& client : clients) {
GPR_ASSERT(client.stream->Read(&client_status));
for (auto client = clients.begin(); client != clients.end(); client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
result.latencies.MergeProto(stats.latencies());
result.client_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
}
for (auto& client : clients) {
GPR_ASSERT(client.stream->WritesDone());
GPR_ASSERT(client.stream->Finish().IsOk());
for (auto client = clients.begin(); client != clients.end(); client++) {
GPR_ASSERT(client->stream->WritesDone());
GPR_ASSERT(client->stream->Finish().IsOk());
}
for (auto& server : servers) {
GPR_ASSERT(server.stream->WritesDone());
GPR_ASSERT(server.stream->Finish().IsOk());
for (auto server = servers.begin(); server != servers.end(); server++) {
GPR_ASSERT(server->stream->WritesDone());
GPR_ASSERT(server->stream->Finish().IsOk());
}
return result;
}

@ -119,8 +119,8 @@ class AsyncQpsServerTest : public Server {
shutdown_ = true;
srv_cq_.Shutdown();
}
for (auto& thr : threads_) {
thr.join();
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
while (!contexts_.empty()) {
delete contexts_.front();

@ -43,8 +43,8 @@ namespace testing {
template <class T, class F>
double sum(const T& container, F functor) {
double r = 0;
for (auto v : container) {
r += functor(v);
for (auto v = container.begin(); v != container.end(); v++) {
r += functor(*v);
}
return r;
}

@ -57,8 +57,6 @@ RUN wget -O - https://github.com/google/protobuf/archive/v3.0.0-alpha-2.tar.gz |
./autogen.sh && \
./configure --prefix=/usr && \
make -j12 && make check && make install && \
cd java && mvn install && cd .. && \
cd javanano && mvn install && cd .. && \
rm -r "$(pwd)"
# Trigger download of as many Maven and Gradle artifacts as possible. We don't build grpc-java

Loading…
Cancel
Save