Merge github.com:grpc/grpc into mmm-mmm-mmm-mmm

pull/1625/head
Craig Tiller 10 years ago
commit bf6bd2303c
  1. 250
      Makefile
  2. 3
      README.md
  3. 8
      src/compiler/generator_helpers.h
  4. 307
      src/compiler/objective_c_generator.cc
  5. 9
      src/compiler/objective_c_generator.h
  6. 12
      src/compiler/objective_c_generator_helpers.h
  7. 79
      src/compiler/objective_c_plugin.cc
  8. 4
      src/core/iomgr/tcp_posix.c
  9. 2
      src/core/transport/chttp2/frame.h
  10. 40
      src/core/transport/chttp2/frame_rst_stream.c
  11. 11
      src/core/transport/chttp2/frame_rst_stream.h
  12. 116
      src/core/transport/chttp2_transport.c
  13. 29
      src/node/src/server.js
  14. 28
      src/node/test/end_to_end_test.js
  15. 224
      src/node/test/surface_test.js
  16. 6
      src/objective-c/GRPCClient/GRPCCall.m
  17. 17
      src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h
  18. 8
      src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m
  19. 4
      src/objective-c/ProtoRPC/ProtoRPC.m
  20. 48
      src/objective-c/README.md
  21. 59
      src/objective-c/RxLibrary/GRXBufferedPipe.h
  22. 146
      src/objective-c/RxLibrary/GRXBufferedPipe.m
  23. 4
      src/objective-c/RxLibrary/GRXImmediateWriter.m
  24. 8
      src/objective-c/RxLibrary/GRXWriteable.h
  25. 4
      src/objective-c/RxLibrary/GRXWriteable.m
  26. 6
      src/objective-c/RxLibrary/GRXWriter.h
  27. 8
      src/objective-c/RxLibrary/GRXWriter.m
  28. 4
      src/objective-c/RxLibrary/transformations/GRXMappingWriter.m
  29. 98
      src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
  30. 2
      src/python/README.md
  31. 2
      src/python/requirements.txt
  32. 2
      src/python/src/setup.py
  33. 25
      src/ruby/ext/grpc/extconf.rb
  34. 2
      src/ruby/ext/grpc/rb_byte_buffer.c
  35. 3
      src/ruby/ext/grpc/rb_byte_buffer.h
  36. 2
      src/ruby/ext/grpc/rb_call.c
  37. 3
      src/ruby/ext/grpc/rb_call.h
  38. 2
      src/ruby/ext/grpc/rb_channel.c
  39. 3
      src/ruby/ext/grpc/rb_channel.h
  40. 3
      src/ruby/ext/grpc/rb_channel_args.c
  41. 3
      src/ruby/ext/grpc/rb_channel_args.h
  42. 3
      src/ruby/ext/grpc/rb_completion_queue.h
  43. 2
      src/ruby/ext/grpc/rb_credentials.c
  44. 3
      src/ruby/ext/grpc/rb_credentials.h
  45. 3
      src/ruby/ext/grpc/rb_grpc.h
  46. 2
      src/ruby/ext/grpc/rb_server.c
  47. 3
      src/ruby/ext/grpc/rb_server.h
  48. 2
      src/ruby/ext/grpc/rb_server_credentials.c
  49. 3
      src/ruby/ext/grpc/rb_server_credentials.h
  50. 14
      test/core/end2end/cq_verifier.c
  51. 1
      test/core/end2end/gen_build_json.py
  52. 23
      test/core/end2end/tests/invoke_large_request.c
  53. 3
      test/core/end2end/tests/max_message_length.c
  54. 22
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
  55. 22
      test/core/end2end/tests/request_response_with_metadata_and_payload.c
  56. 22
      test/core/end2end/tests/request_response_with_payload.c
  57. 22
      test/core/end2end/tests/request_response_with_payload_and_call_creds.c
  58. 22
      test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
  59. 20
      test/core/end2end/tests/request_with_large_metadata.c
  60. 20
      test/core/end2end/tests/request_with_payload.c
  61. 200
      test/core/end2end/tests/server_finishes_request.c
  62. 2
      third_party/protobuf
  63. 8
      tools/dockerfile/grpc_node/build.sh
  64. 2
      tools/dockerfile/grpc_python_base/Dockerfile
  65. 8
      tools/dockerfile/grpc_ruby/build.sh
  66. 19
      tools/gce_setup/grpc_docker.sh
  67. 2
      tools/run_tests/build_python.sh
  68. 13
      tools/run_tests/build_ruby.sh
  69. 5
      tools/run_tests/jobset.py
  70. 2
      tools/run_tests/run_sanity.sh
  71. 2
      tools/run_tests/run_tests.py
  72. 115
      tools/run_tests/tests.json
  73. 85
      vsprojects/Grpc.mak

File diff suppressed because one or more lines are too long

@ -1,4 +1,5 @@
[![Build Status](https://travis-ci.org/grpc/grpc.svg?branch=master)](https://travis-ci.org/grpc/grpc)
[![Coverage Status](https://img.shields.io/coveralls/grpc/grpc.svg)](https://coveralls.io/r/grpc/grpc?branch=master)
[gRPC - An RPC library and framework](http://github.com/grpc/grpc)
===================================
@ -37,7 +38,7 @@ Libraries in different languages are in different state of development. We are s
* C++ Library: [src/cpp] (src/cpp) : Early adopter ready - Alpha.
* Ruby Library: [src/ruby] (src/ruby) : Early adopter ready - Alpha.
* NodeJS Library: [src/node] (src/node) : Early adopter ready - Alpha.
* Python Library: [src/python] (src/python) : Usable with limitations - Alpha.
* Python Library: [src/python] (src/python) : Early adopter ready - Alpha.
* PHP Library: [src/php] (src/php) : Pre-Alpha.
* C# Library: [src/csharp] (src/csharp) : Pre-Alpha.
* Objective-C Library: [src/objective-c] (src/objective-c): Pre-Alpha.

@ -103,6 +103,14 @@ inline grpc::string CapitalizeFirstLetter(grpc::string s) {
return s;
}
inline grpc::string LowercaseFirstLetter(grpc::string s) {
if (s.empty()) {
return s;
}
s[0] = ::tolower(s[0]);
return s;
}
inline grpc::string LowerUnderscoreToUpperCamel(grpc::string str) {
std::vector<grpc::string> tokens = tokenize(str, "_");
grpc::string result = "";

@ -40,195 +40,200 @@
#include <sstream>
using ::grpc::protobuf::io::Printer;
using ::grpc::protobuf::MethodDescriptor;
using ::grpc::protobuf::ServiceDescriptor;
using ::std::map;
using ::grpc::string;
namespace grpc_objective_c_generator {
namespace {
void PrintSimpleBlockSignature(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["method_name"] = method->name();
(*vars)["request_type"] = PrefixedName(method->input_type()->name());
(*vars)["response_type"] = PrefixedName(method->output_type()->name());
void PrintProtoRpcDeclarationAsPragma(Printer *printer,
const MethodDescriptor *method,
map<string, string> vars) {
vars["client_stream"] = method->client_streaming() ? "stream " : "";
vars["server_stream"] = method->server_streaming() ? "stream " : "";
if (method->server_streaming()) {
printer->Print("// When the response stream finishes, the handler is "
"called with nil for both arguments.\n\n");
} else {
printer->Print("// The handler is only called once.\n\n");
}
printer->Print(*vars, "- (id<GRXLiveSource>)$method_name$WithRequest:"
"($request_type$)request completionHandler:(void(^)"
"($response_type$ *, NSError *))handler");
printer->Print(vars,
"#pragma mark $method_name$($client_stream$$request_type$)"
" returns ($server_stream$$response_type$)\n\n");
}
void PrintSimpleDelegateSignature(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["method_name"] = method->name();
(*vars)["request_type"] = PrefixedName(method->input_type()->name());
void PrintMethodSignature(Printer *printer,
const MethodDescriptor *method,
const map<string, string>& vars) {
// TODO(jcanizales): Print method comments.
printer->Print(*vars, "- (id<GRXLiveSource>)$method_name$WithRequest:"
"($request_type$)request delegate:(id<GRXSink>)delegate");
}
printer->Print(vars, "- ($return_type$)$method_name$With");
if (method->client_streaming()) {
printer->Print("RequestsWriter:(id<GRXWriter>)request");
} else {
printer->Print(vars, "Request:($prefix$$request_type$ *)request");
}
void PrintAdvancedSignature(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["method_name"] = method->name();
printer->Print(*vars, "- (GRXSource *)$method_name$WithRequest:"
"(id<GRXSource>)request");
// TODO(jcanizales): Put this on a new line and align colons.
// TODO(jcanizales): eventHandler for server streaming?
printer->Print(" handler:(void(^)(");
if (method->server_streaming()) {
printer->Print("BOOL done, ");
}
printer->Print(vars,
"$prefix$$response_type$ *response, NSError *error))handler");
}
void PrintSourceMethodSimpleBlock(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars) {
PrintSimpleBlockSignature(printer, method, vars);
(*vars)["method_name"] = method->name();
printer->Print(" {\n");
printer->Indent();
printer->Print(*vars, "return [[self $method_name$WithRequest:request] "
"connectHandler:^(id value, NSError *error) {\n");
printer->Indent();
printer->Print("handler(value, error);\n");
printer->Outdent();
printer->Print("}];\n");
printer->Outdent();
printer->Print("}\n");
void PrintSimpleSignature(Printer *printer,
const MethodDescriptor *method,
map<string, string> vars) {
vars["method_name"] =
grpc_generator::LowercaseFirstLetter(vars["method_name"]);
vars["return_type"] = "void";
PrintMethodSignature(printer, method, vars);
}
void PrintSourceMethodSimpleDelegate(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars) {
PrintSimpleDelegateSignature(printer, method, vars);
(*vars)["method_name"] = method->name();
printer->Print(" {\n");
printer->Indent();
printer->Print(*vars, "return [[self $method_name$WithRequest:request]"
"connectToSink:delegate];\n");
printer->Outdent();
printer->Print("}\n");
void PrintAdvancedSignature(Printer *printer,
const MethodDescriptor *method,
map<string, string> vars) {
vars["method_name"] = "RPCTo" + vars["method_name"];
vars["return_type"] = "ProtoRPC *";
PrintMethodSignature(printer, method, vars);
}
void PrintSourceMethodAdvanced(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars) {
void PrintMethodDeclarations(Printer *printer,
const MethodDescriptor *method,
map<string, string> vars) {
vars["method_name"] = method->name();
vars["request_type"] = method->input_type()->name();
vars["response_type"] = method->output_type()->name();
PrintProtoRpcDeclarationAsPragma(printer, method, vars);
PrintSimpleSignature(printer, method, vars);
printer->Print(";\n\n");
PrintAdvancedSignature(printer, method, vars);
printer->Print(";\n\n\n");
}
(*vars)["method_name"] = method->name();
printer->Print(" {\n");
printer->Indent();
printer->Print(*vars, "return [self $method_name$WithRequest:request "
"client:[self newClient]];\n");
printer->Outdent();
void PrintSimpleImplementation(Printer *printer,
const MethodDescriptor *method,
map<string, string> vars) {
printer->Print("{\n");
printer->Print(vars, " [[self RPCTo$method_name$With");
if (method->client_streaming()) {
printer->Print("RequestsWriter:request");
} else {
printer->Print("Request:request");
}
printer->Print(" handler:handler] start];\n");
printer->Print("}\n");
}
void PrintSourceMethodHandler(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::MethodDescriptor *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["method_name"] = method->name();
(*vars)["response_type"] = PrefixedName(method->output_type()->name());
(*vars)["caps_name"] = grpc_generator::CapitalizeFirstLetter(method->name());
printer->Print(*vars, "- (GRXSource *)$method_name$WithRequest:"
"(id<GRXSource>)request client:(PBgRPCClient *)client {\n");
printer->Indent();
printer->Print(*vars,
"return [self responseWithMethod:$@\"$caps_name\"\n");
printer->Print(*vars,
" class:[$response_type$ class]\n");
printer->Print(" request:request\n");
printer->Print(" client:client];\n");
printer->Outdent();
void PrintAdvancedImplementation(Printer *printer,
const MethodDescriptor *method,
map<string, string> vars) {
printer->Print("{\n");
printer->Print(vars, " return [self RPCToMethod:@\"$method_name$\"\n");
printer->Print(" requestsWriter:");
if (method->client_streaming()) {
printer->Print("request\n");
} else {
printer->Print("[GRXWriter writerWithValue:request]\n");
}
printer->Print(vars,
" responseClass:[$prefix$$response_type$ class]\n");
printer->Print(" responsesWriteable:[GRXWriteable ");
if (method->server_streaming()) {
printer->Print("writeableWithStreamHandler:handler]];\n");
} else {
printer->Print("writeableWithSingleValueHandler:handler]];\n");
}
printer->Print("}\n");
}
void PrintMethodImplementations(Printer *printer,
const MethodDescriptor *method,
map<string, string> vars) {
vars["method_name"] = method->name();
vars["request_type"] = method->input_type()->name();
vars["response_type"] = method->output_type()->name();
PrintProtoRpcDeclarationAsPragma(printer, method, vars);
// TODO(jcanizales): Print documentation from the method.
PrintSimpleSignature(printer, method, vars);
PrintSimpleImplementation(printer, method, vars);
printer->Print("// Returns a not-yet-started RPC object.\n");
PrintAdvancedSignature(printer, method, vars);
PrintAdvancedImplementation(printer, method, vars);
}
grpc::string GetHeader(const grpc::protobuf::ServiceDescriptor *service,
const grpc::string message_header) {
grpc::string output;
} // namespace
string GetHeader(const ServiceDescriptor *service, const string prefix) {
string output;
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
printer.Print("#import \"PBgRPCClient.h\"\n");
printer.Print("#import \"PBStub.h\"\n");
vars["message_header"] = message_header;
printer.Print(vars, "#import \"$message_header$\"\n\n");
printer.Print("@protocol GRXSource\n");
printer.Print("@class GRXSource\n\n");
vars["service_name"] = service->name();
printer.Print("@protocol $service_name$Stub <NSObject>\n\n");
printer.Print("#pragma mark Simple block handlers\n\n");
for (int i = 0; i < service->method_count(); i++) {
PrintSimpleBlockSignature(&printer, service->method(i), &vars);
printer.Print(";\n");
}
printer.Print("\n");
printer.Print("#pragma mark Simple delegate handlers.\n\n");
printer.Print("# TODO(jcanizales): Use high-level snippets to remove this duplication.");
for (int i = 0; i < service->method_count(); i++) {
PrintSimpleDelegateSignature(&printer, service->method(i), &vars);
printer.Print(";\n");
}
printer.Print("\n");
printer.Print("#pragma mark Advanced handlers.\n\n");
Printer printer(&output_stream, '$');
printer.Print("@protocol GRXWriteable;\n");
printer.Print("@protocol GRXWriter;\n\n");
map<string, string> vars = {{"service_name", service->name()},
{"prefix", prefix}};
printer.Print(vars, "@protocol $prefix$$service_name$ <NSObject>\n\n");
for (int i = 0; i < service->method_count(); i++) {
PrintAdvancedSignature(&printer, service->method(i), &vars);
printer.Print(";\n");
PrintMethodDeclarations(&printer, service->method(i), vars);
}
printer.Print("\n");
printer.Print("@end\n\n");
printer.Print("// Basic stub that only does marshalling and parsing\n");
printer.Print(vars, "@interface $service_name$Stub :"
" PBStub<$service_name$Stub>\n");
printer.Print("- (instancetype)initWithHost:(NSString *)host;\n");
printer.Print("// Basic service implementation, over gRPC, that only does"
" marshalling and parsing.\n");
printer.Print(vars, "@interface $prefix$$service_name$ :"
" ProtoService<$prefix$$service_name$>\n");
printer.Print("- (instancetype)initWithHost:(NSString *)host"
" NS_DESIGNATED_INITIALIZER;\n");
printer.Print("@end\n");
return output;
}
grpc::string GetSource(const grpc::protobuf::ServiceDescriptor *service) {
grpc::string output;
string GetSource(const ServiceDescriptor *service, const string prefix) {
string output;
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
vars["service_name"] = service->name();
printer.Print(vars, "#import \"$service_name$Stub.pb.h\"\n");
printer.Print("#import \"PBGeneratedMessage+GRXSource.h\"\n\n");
vars["full_name"] = service->full_name();
Printer printer(&output_stream, '$');
map<string, string> vars = {{"service_name", service->name()},
{"package", service->file()->package()},
{"prefix", prefix}};
printer.Print(vars,
"static NSString *const kPackageName = @\"$package$\";\n");
printer.Print(vars,
"static NSString *const kInterface = @\"$full_name$\";\n");
printer.Print("@implementation $service_name$Stub\n\n");
"static NSString *const kServiceName = @\"$service_name$\";\n\n");
printer.Print(vars, "@implementation $prefix$$service_name$\n\n");
printer.Print("// Designated initializer\n");
printer.Print("- (instancetype)initWithHost:(NSString *)host {\n");
printer.Indent();
printer.Print("if ((self = [super initWithHost:host "
"interface:kInterface])) {\n");
printer.Print("}\n");
printer.Print("return self;\n");
printer.Outdent();
printer.Print(" return (self = [super initWithHost:host"
" packageName:kPackageName serviceName:kServiceName]);\n");
printer.Print("}\n\n");
printer.Print("#pragma mark Simple block handlers.\n");
for (int i = 0; i < service->method_count(); i++) {
PrintSourceMethodSimpleBlock(&printer, service->method(i), &vars);
}
printer.Print("\n");
printer.Print("#pragma mark Simple delegate handlers.\n");
for (int i = 0; i < service->method_count(); i++) {
PrintSourceMethodSimpleDelegate(&printer, service->method(i), &vars);
}
printer.Print("\n");
printer.Print("#pragma mark Advanced handlers.\n");
for (int i = 0; i < service->method_count(); i++) {
PrintSourceMethodAdvanced(&printer, service->method(i), &vars);
}
printer.Print("\n");
printer.Print("#pragma mark Handlers for subclasses "
"(stub wrappers) to override.\n");
printer.Print("// Override superclass initializer to disallow different"
" package and service names.\n");
printer.Print("- (instancetype)initWithHost:(NSString *)host\n");
printer.Print(" packageName:(NSString *)packageName\n");
printer.Print(" serviceName:(NSString *)serviceName {\n");
printer.Print(" return [self initWithHost:host];\n");
printer.Print("}\n\n\n");
for (int i = 0; i < service->method_count(); i++) {
PrintSourceMethodHandler(&printer, service->method(i), &vars);
PrintMethodImplementations(&printer, service->method(i), vars);
}
printer.Print("@end\n");
return output;
}

@ -38,10 +38,15 @@
namespace grpc_objective_c_generator {
// Returns the content to be included in the "global_scope" insertion point of
// the generated header file.
grpc::string GetHeader(const grpc::protobuf::ServiceDescriptor *service,
const grpc::string message_header);
const grpc::string prefix);
grpc::string GetSource(const grpc::protobuf::ServiceDescriptor *service);
// Returns the content to be included in the "global_scope" insertion point of
// the generated implementation file.
grpc::string GetSource(const grpc::protobuf::ServiceDescriptor *service,
const grpc::string prefix);
} // namespace grpc_objective_c_generator

@ -40,18 +40,8 @@
namespace grpc_objective_c_generator {
const grpc::string prefix = "PBG";
inline grpc::string MessageHeaderName(const grpc::protobuf::FileDescriptor *file) {
return grpc_generator::FileNameInUpperCamel(file) + ".pb.h";
}
inline grpc::string StubFileName(grpc::string service_name) {
return prefix + service_name + "Stub";
}
inline grpc::string PrefixedName(grpc::string name) {
return prefix + name;
return grpc_generator::FileNameInUpperCamel(file) + ".pbobjc.h";
}
}

@ -39,54 +39,77 @@
#include "src/compiler/objective_c_generator.h"
#include "src/compiler/objective_c_generator_helpers.h"
using ::grpc::string;
class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
public:
ObjectiveCGrpcGenerator() {}
virtual ~ObjectiveCGrpcGenerator() {}
virtual bool Generate(const grpc::protobuf::FileDescriptor *file,
const grpc::string &parameter,
const string &parameter,
grpc::protobuf::compiler::GeneratorContext *context,
grpc::string *error) const {
string *error) const {
if (file->service_count() == 0) {
// No services. Do nothing.
return true;
}
for (int i = 0; i < file->service_count(); i++) {
const grpc::protobuf::ServiceDescriptor *service = file->service(i);
grpc::string file_name = grpc_objective_c_generator::StubFileName(
service->name());
// Generate .pb.h
grpc::string header_code = grpc_objective_c_generator::GetHeader(
service, grpc_objective_c_generator::MessageHeaderName(file));
std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> header_output(
context->Open(file_name + ".pb.h"));
grpc::protobuf::io::CodedOutputStream header_coded_out(
header_output.get());
header_coded_out.WriteRaw(header_code.data(), header_code.size());
// Generate .pb.m
grpc::string source_code = grpc_objective_c_generator::GetSource(service);
std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> source_output(
context->Open(file_name + ".pb.m"));
grpc::protobuf::io::CodedOutputStream source_coded_out(
source_output.get());
source_coded_out.WriteRaw(source_code.data(), source_code.size());
string file_name = grpc_generator::FileNameInUpperCamel(file);
string prefix = file->options().objc_class_prefix();
{
// Generate .pbrpc.h
string imports = string("#import \"") + file_name + ".pbobjc.h\"\n"
"#import <gRPC/ProtoService.h>\n";
// TODO(jcanizales): Instead forward-declare the input and output types
// and import the files in the .pbrpc.m
string proto_imports;
for (int i = 0; i < file->dependency_count(); i++) {
string header = grpc_objective_c_generator::MessageHeaderName(
file->dependency(i));
proto_imports += string("#import \"") + header + "\"\n";
}
string declarations;
for (int i = 0; i < file->service_count(); i++) {
const grpc::protobuf::ServiceDescriptor *service = file->service(i);
declarations += grpc_objective_c_generator::GetHeader(service, prefix);
}
Write(context, file_name + ".pbrpc.h",
imports + '\n' + proto_imports + '\n' + declarations);
}
{
// Generate .pbrpc.m
string imports = string("#import \"") + file_name + ".pbrpc.h\"\n"
"#import <gRPC/GRXWriteable.h>\n"
"#import <gRPC/GRXWriter+Immediate.h>\n"
"#import <gRPC/ProtoRPC.h>\n";
string definitions;
for (int i = 0; i < file->service_count(); i++) {
const grpc::protobuf::ServiceDescriptor *service = file->service(i);
definitions += grpc_objective_c_generator::GetSource(service, prefix);
}
Write(context, file_name + ".pbrpc.m", imports + '\n' + definitions);
}
return true;
}
private:
// Insert the given code into the given file at the given insertion point.
void Insert(grpc::protobuf::compiler::GeneratorContext *context,
const grpc::string &filename, const grpc::string &insertion_point,
const grpc::string &code) const {
// Write the given code into the given file.
void Write(grpc::protobuf::compiler::GeneratorContext *context,
const string &filename, const string &code) const {
std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
context->OpenForInsert(filename, insertion_point));
context->Open(filename));
grpc::protobuf::io::CodedOutputStream coded_out(output.get());
coded_out.WriteRaw(code.data(), code.size());
}

@ -138,8 +138,10 @@ static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
native "trim the first N bytes" operation to splice */
/* TODO(klempner): This really shouldn't be modifying the current slice
unless we own the slices array. */
*current_slice = gpr_slice_split_tail(current_slice, prefix_bytes);
gpr_slice tail;
tail = gpr_slice_split_tail(current_slice, prefix_bytes);
gpr_slice_unref(*current_slice);
*current_slice = tail;
return;
} else {
gpr_slice_unref(*current_slice);

@ -53,12 +53,14 @@ typedef struct {
gpr_uint8 send_ping_ack;
gpr_uint8 process_ping_reply;
gpr_uint8 goaway;
gpr_uint8 rst_stream;
gpr_int64 initial_window_update;
gpr_uint32 window_update;
gpr_uint32 goaway_last_stream_index;
gpr_uint32 goaway_error;
gpr_slice goaway_text;
gpr_uint32 rst_stream_reason;
} grpc_chttp2_parse_state;
#define GRPC_CHTTP2_FRAME_DATA 0

@ -32,6 +32,9 @@
*/
#include "src/core/transport/chttp2/frame_rst_stream.h"
#include <grpc/support/log.h>
#include "src/core/transport/chttp2/frame.h"
gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
@ -54,3 +57,40 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
return slice;
}
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags) {
if (length != 4) {
gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length, flags);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
parser->byte = 0;
return GRPC_CHTTP2_PARSE_OK;
}
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
int is_last) {
gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
grpc_chttp2_rst_stream_parser *p = parser;
while (p->byte != 4 && cur != end) {
p->reason_bytes[p->byte] = *cur;
cur++;
p->byte++;
}
if (p->byte == 4) {
GPR_ASSERT(is_last);
state->rst_stream = 1;
state->rst_stream_reason =
(((gpr_uint32)p->reason_bytes[0]) << 24) |
(((gpr_uint32)p->reason_bytes[1]) << 16) |
(((gpr_uint32)p->reason_bytes[2]) << 8) |
(((gpr_uint32)p->reason_bytes[3]));
}
return GRPC_CHTTP2_PARSE_OK;
}

@ -35,7 +35,18 @@
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H
#include <grpc/support/slice.h>
#include "src/core/transport/chttp2/frame.h"
typedef struct {
gpr_uint8 byte;
gpr_uint8 reason_bytes[4];
} grpc_chttp2_rst_stream_parser;
gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags);
grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */

@ -154,7 +154,13 @@ typedef enum {
WRITE_STATE_OPEN,
WRITE_STATE_QUEUED_CLOSE,
WRITE_STATE_SENT_CLOSE
} WRITE_STATE;
} write_state;
typedef enum {
DONT_SEND_CLOSED = 0,
SEND_CLOSED,
SEND_CLOSED_WITH_RST_STREAM
} send_closed;
typedef struct {
stream *head;
@ -267,6 +273,7 @@ struct transport {
grpc_chttp2_window_update_parser window_update;
grpc_chttp2_settings_parser settings;
grpc_chttp2_ping_parser ping;
grpc_chttp2_rst_stream_parser rst_stream;
} simple_parsers;
/* goaway */
@ -312,8 +319,8 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
WRITE_STATE write_state;
gpr_uint8 send_closed;
write_state write_state;
send_closed send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
@ -937,7 +944,11 @@ static int prepare_write(transport *t) {
if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
s->outgoing_sopb->nops == 0) {
s->send_closed = 1;
if (!t->is_client && !s->read_closed) {
s->send_closed = SEND_CLOSED_WITH_RST_STREAM;
} else {
s->send_closed = SEND_CLOSED;
}
}
if (s->writing_sopb.nops > 0 || s->send_closed) {
stream_list_join(t, s, WRITING);
@ -982,9 +993,12 @@ static void finalize_outbuf(transport *t) {
while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
s->send_closed != DONT_SEND_CLOSED, s->id, &t->hpack_compressor, &t->outbuf);
s->writing_sopb.nops = 0;
if (s->send_closed) {
if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR));
}
if (s->send_closed != DONT_SEND_CLOSED) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
@ -999,9 +1013,10 @@ static void finish_write_common(transport *t, int success) {
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
s->write_state = WRITE_STATE_SENT_CLOSE;
if (1||!s->cancelled) {
maybe_finish_read(t, s);
if (!t->is_client) {
s->read_closed = 1;
}
maybe_finish_read(t, s);
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@ -1214,12 +1229,14 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
schedule_nuke_sopb(t, &s->parser.incoming_sopb);
if (s->outgoing_sopb) {
schedule_nuke_sopb(t, s->outgoing_sopb);
s->outgoing_sopb = NULL;
stream_list_remove(t, s, WRITABLE);
schedule_cb(t, s->send_done_closure, 0);
if (error_code != GRPC_CHTTP2_NO_ERROR) {
schedule_nuke_sopb(t, &s->parser.incoming_sopb);
if (s->outgoing_sopb) {
schedule_nuke_sopb(t, s->outgoing_sopb);
s->outgoing_sopb = NULL;
stream_list_remove(t, s, WRITABLE);
schedule_cb(t, s->send_done_closure, 0);
}
}
if (s->cancelled) {
send_rst = 0;
@ -1228,31 +1245,34 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
s->cancelled = 1;
stream_list_join(t, s, CANCELLED);
gpr_ltoa(local_status, buffer);
add_incoming_metadata(
t, s,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
if (!optional_message) {
switch (local_status) {
case GRPC_STATUS_CANCELLED:
add_incoming_metadata(
t, s, grpc_mdelem_from_strings(t->metadata_context,
"grpc-message", "Cancelled"));
break;
default:
break;
}
} else {
if (error_code != GRPC_CHTTP2_NO_ERROR) {
/* synthesize a status if we don't believe we'll get one */
gpr_ltoa(local_status, buffer);
add_incoming_metadata(
t, s,
grpc_mdelem_from_metadata_strings(
t->metadata_context,
grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
grpc_mdstr_ref(optional_message)));
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
if (!optional_message) {
switch (local_status) {
case GRPC_STATUS_CANCELLED:
add_incoming_metadata(
t, s, grpc_mdelem_from_strings(t->metadata_context,
"grpc-message", "Cancelled"));
break;
default:
break;
}
} else {
add_incoming_metadata(
t, s,
grpc_mdelem_from_metadata_strings(
t->metadata_context,
grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
grpc_mdstr_ref(optional_message)));
}
add_metadata_batch(t, s);
}
add_metadata_batch(t, s);
maybe_finish_read(t, s);
}
maybe_finish_read(t, s);
}
if (!id) send_rst = 0;
if (send_rst) {
@ -1527,6 +1547,19 @@ static int init_ping_parser(transport *t) {
return ok;
}
static int init_rst_stream_parser(transport *t) {
int ok = GRPC_CHTTP2_PARSE_OK ==
grpc_chttp2_rst_stream_parser_begin_frame(&t->simple_parsers.rst_stream,
t->incoming_frame_size,
t->incoming_frame_flags);
if (!ok) {
drop_connection(t);
}
t->parser = grpc_chttp2_rst_stream_parser_parse;
t->parser_data = &t->simple_parsers.rst_stream;
return ok;
}
static int init_goaway_parser(transport *t) {
int ok =
GRPC_CHTTP2_PARSE_OK ==
@ -1581,12 +1614,7 @@ static int init_frame_parser(transport *t) {
gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
return 0;
case GRPC_CHTTP2_FRAME_RST_STREAM:
/* TODO(ctiller): actually parse the reason */
cancel_stream_id(
t, t->incoming_stream_id,
grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
GRPC_CHTTP2_CANCEL, 0);
return init_skip_frame(t, 0);
return init_rst_stream_parser(t);
case GRPC_CHTTP2_FRAME_SETTINGS:
return init_settings_frame_parser(t);
case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
@ -1650,6 +1678,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (st.goaway) {
add_goaway(t, st.goaway_error, st.goaway_text);
}
if (st.rst_stream) {
cancel_stream_id(
t, t->incoming_stream_id,
grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason),
st.rst_stream_reason, 0);
}
if (st.process_ping_reply) {
for (i = 0; i < t->ping_count; i++) {
if (0 ==

@ -291,7 +291,15 @@ function _read(size) {
return;
}
var data = event.read;
if (self.push(self.deserialize(data)) && data !== null) {
var deserialized;
try {
deserialized = self.deserialize(data);
} catch (e) {
e.code = grpc.status.INVALID_ARGUMENT;
self.emit('error', e);
return;
}
if (self.push(deserialized) && data !== null) {
var read_batch = {};
read_batch[grpc.opType.RECV_MESSAGE] = true;
self.call.startBatch(read_batch, readCallback);
@ -354,7 +362,13 @@ function handleUnary(call, handler, metadata) {
handleError(call, err);
return;
}
emitter.request = handler.deserialize(result.read);
try {
emitter.request = handler.deserialize(result.read);
} catch (e) {
e.code = grpc.status.INVALID_ARGUMENT;
handleError(call, e);
return;
}
if (emitter.cancelled) {
return;
}
@ -388,7 +402,13 @@ function handleServerStreaming(call, handler, metadata) {
stream.emit('error', err);
return;
}
stream.request = handler.deserialize(result.read);
try {
stream.request = handler.deserialize(result.read);
} catch (e) {
e.code = grpc.status.INVALID_ARGUMENT;
stream.emit('error', e);
return;
}
handler.func(stream);
});
}
@ -401,6 +421,9 @@ function handleServerStreaming(call, handler, metadata) {
*/
function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, handler.deserialize);
stream.on('error', function(error) {
handleError(call, error);
});
waitForCancel(call, stream);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;

@ -286,20 +286,24 @@ describe('end-to-end', function() {
assert.ifError(err);
assert(response['send metadata']);
assert.strictEqual(response.read.toString(), requests[0]);
var end_batch = {};
end_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
'metadata': {},
'code': grpc.status.OK,
'details': status_text
};
end_batch[grpc.opType.RECV_MESSAGE] = true;
server_call.startBatch(end_batch, function(err, response) {
var snd_batch = {};
snd_batch[grpc.opType.RECV_MESSAGE] = true;
server_call.startBatch(snd_batch, function(err, response) {
assert.ifError(err);
assert(response['send status']);
assert(!response.cancelled);
assert.strictEqual(response.read.toString(), requests[1]);
done();
var end_batch = {};
end_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
'metadata': {},
'code': grpc.status.OK,
'details': status_text
};
server_call.startBatch(end_batch, function(err, response) {
assert.ifError(err);
assert(response['send status']);
assert(!response.cancelled);
done();
});
});
});
});

@ -47,6 +47,8 @@ var mathService = math_proto.lookup('math.Math');
var capitalize = require('underscore.string/capitalize');
var _ = require('underscore');
describe('File loader', function() {
it('Should load a proto file by default', function() {
assert.doesNotThrow(function() {
@ -178,9 +180,10 @@ describe('Generic client and server', function() {
});
});
});
describe('Trailing metadata', function() {
describe('Other conditions', function() {
var client;
var server;
var port;
before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
var test_service = test_proto.lookup('TestService');
@ -246,7 +249,7 @@ describe('Trailing metadata', function() {
}
}
});
var port = server.bind('localhost:0');
port = server.bind('localhost:0');
var Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port);
server.listen();
@ -254,86 +257,167 @@ describe('Trailing metadata', function() {
after(function() {
server.shutdown();
});
it('should be present when a unary call succeeds', function(done) {
var call = client.unary({error: false}, function(err, data) {
assert.ifError(err);
describe('Server recieving bad input', function() {
var misbehavingClient;
var badArg = new Buffer([0xFF]);
before(function() {
var test_service_attrs = {
unary: {
path: '/TestService/Unary',
requestStream: false,
responseStream: false,
requestSerialize: _.identity,
responseDeserialize: _.identity
},
clientStream: {
path: '/TestService/ClientStream',
requestStream: true,
responseStream: false,
requestSerialize: _.identity,
responseDeserialize: _.identity
},
serverStream: {
path: '/TestService/ServerStream',
requestStream: false,
responseStream: true,
requestSerialize: _.identity,
responseDeserialize: _.identity
},
bidiStream: {
path: '/TestService/BidiStream',
requestStream: true,
responseStream: true,
requestSerialize: _.identity,
responseDeserialize: _.identity
}
};
var Client = surface_client.makeClientConstructor(test_service_attrs,
'TestService');
misbehavingClient = new Client('localhost:' + port);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
it('should respond correctly to a unary call', function(done) {
misbehavingClient.unary(badArg, function(err, data) {
assert(err);
assert.strictEqual(err.code, grpc.status.INVALID_ARGUMENT);
done();
});
});
});
it('should be present when a unary call fails', function(done) {
var call = client.unary({error: true}, function(err, data) {
assert(err);
it('should respond correctly to a client stream', function(done) {
var call = misbehavingClient.clientStream(function(err, data) {
assert(err);
assert.strictEqual(err.code, grpc.status.INVALID_ARGUMENT);
done();
});
call.write(badArg);
// TODO(mlumish): Remove call.end()
call.end();
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
it('should respond correctly to a server stream', function(done) {
var call = misbehavingClient.serverStream(badArg);
call.on('data', function(data) {
assert.fail(data, null, 'Unexpected data', '===');
});
call.on('error', function(err) {
assert.strictEqual(err.code, grpc.status.INVALID_ARGUMENT);
done();
});
});
it('should respond correctly to a bidi stream', function(done) {
var call = misbehavingClient.bidiStream();
call.on('data', function(data) {
assert.fail(data, null, 'Unexpected data', '===');
});
call.on('error', function(err) {
assert.strictEqual(err.code, grpc.status.INVALID_ARGUMENT);
done();
});
call.write(badArg);
// TODO(mlumish): Remove call.end()
call.end();
});
});
it('should be present when a client stream call succeeds', function(done) {
var call = client.clientStream(function(err, data) {
assert.ifError(err);
describe('Trailing metadata', function() {
it('should be present when a unary call succeeds', function(done) {
var call = client.unary({error: false}, function(err, data) {
assert.ifError(err);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
call.write({error: false});
call.write({error: false});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
it('should be present when a unary call fails', function(done) {
var call = client.unary({error: true}, function(err, data) {
assert(err);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
});
it('should be present when a client stream call fails', function(done) {
var call = client.clientStream(function(err, data) {
assert(err);
it('should be present when a client stream call succeeds', function(done) {
var call = client.clientStream(function(err, data) {
assert.ifError(err);
});
call.write({error: false});
call.write({error: false});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
call.write({error: false});
call.write({error: true});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
it('should be present when a client stream call fails', function(done) {
var call = client.clientStream(function(err, data) {
assert(err);
});
call.write({error: false});
call.write({error: true});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
});
it('should be present when a server stream call succeeds', function(done) {
var call = client.serverStream({error: false});
call.on('data', function(){});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
it('should be present when a server stream call succeeds', function(done) {
var call = client.serverStream({error: false});
call.on('data', function(){});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
});
it('should be present when a server stream call fails', function(done) {
var call = client.serverStream({error: true});
call.on('data', function(){});
call.on('error', function(error) {
assert.deepEqual(error.metadata.metadata, ['yes']);
done();
it('should be present when a server stream call fails', function(done) {
var call = client.serverStream({error: true});
call.on('data', function(){});
call.on('error', function(error) {
assert.deepEqual(error.metadata.metadata, ['yes']);
done();
});
});
});
it('should be present when a bidi stream succeeds', function(done) {
var call = client.bidiStream();
call.write({error: false});
call.write({error: false});
call.end();
call.on('data', function(){});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
it('should be present when a bidi stream succeeds', function(done) {
var call = client.bidiStream();
call.write({error: false});
call.write({error: false});
call.end();
call.on('data', function(){});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.metadata, ['yes']);
done();
});
});
});
it('should be present when a bidi stream fails', function(done) {
var call = client.bidiStream();
call.write({error: false});
call.write({error: true});
call.end();
call.on('data', function(){});
call.on('error', function(error) {
assert.deepEqual(error.metadata.metadata, ['yes']);
done();
it('should be present when a bidi stream fails', function(done) {
var call = client.bidiStream();
call.write({error: false});
call.write({error: true});
call.end();
call.on('data', function(){});
call.on('error', function(error) {
assert.deepEqual(error.metadata.metadata, ['yes']);
done();
});
});
});
});

@ -231,7 +231,7 @@
handler:resumingHandler]] errorHandler:errorHandler];
}
- (void)didReceiveValue:(id)value {
- (void)writeValue:(id)value {
// TODO(jcanizales): Throw/assert if value isn't NSData.
// Pause the input and only resume it when the C layer notifies us that writes
@ -255,7 +255,7 @@
errorHandler:errorHandler];
}
- (void)didFinishWithError:(NSError *)errorOrNil {
- (void)writesFinishedWithError:(NSError *)errorOrNil {
if (errorOrNil) {
[self cancel];
} else {
@ -306,7 +306,7 @@
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
// The following produces a retain cycle self:_responseWriteable:self, which is only
// broken when didFinishWithError: is sent to the wrapped writeable.
// broken when writesFinishedWithError: is sent to the wrapped writeable.
// Care is taken not to retain self strongly in any of the blocks used in
// the implementation of GRPCCall, so that the life of the instance is
// determined by this retain cycle.

@ -38,11 +38,11 @@
// This is a thread-safe wrapper over a GRXWriteable instance. It lets one
// enqueue calls to a GRXWriteable instance for the main thread, guaranteeing
// that didFinishWithError: is the last message sent to it (no matter what
// that writesFinishedWithError: is the last message sent to it (no matter what
// messages are sent to the wrapper, in what order, nor from which thread). It
// also guarantees that, if cancelWithError: is called from the main thread
// (e.g. by the app cancelling the writes), no further messages are sent to the
// writeable except didFinishWithError:.
// writeable except writesFinishedWithError:.
//
// TODO(jcanizales): Let the user specify another queue for the writeable
// callbacks.
@ -51,23 +51,22 @@
// The GRXWriteable passed is the wrapped writeable.
// Both the GRXWriter instance and the GRXWriteable instance are retained until
// didFinishWithError: is sent to the writeable, and released after that.
// writesFinishedWithError: is sent to the writeable, and released after that.
// This is used to create a retain cycle that keeps both objects alive until the
// writing is explicitly finished.
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(id<GRXWriter>)writer
NS_DESIGNATED_INITIALIZER;
// Enqueues didReceiveValue: to be sent to the writeable in the main thread.
// The passed handler is invoked from the main thread after didReceiveValue:
// returns.
// Enqueues writeValue: to be sent to the writeable in the main thread.
// The passed handler is invoked from the main thread after writeValue: returns.
- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler;
// Enqueues didFinishWithError:nil to be sent to the writeable in the main
// Enqueues writesFinishedWithError:nil to be sent to the writeable in the main
// thread. After that message is sent to the writeable, all other methods of
// this object are effectively noops.
- (void)enqueueSuccessfulCompletion;
// If the writeable has not yet received a didFinishWithError: message, this
// If the writeable has not yet received a writesFinishedWithError: message, this
// will enqueue one to be sent to it in the main thread, and cancel all other
// pending messages to the writeable enqueued by this object (both past and
// future).
@ -75,7 +74,7 @@
- (void)cancelWithError:(NSError *)error;
// Cancels all pending messages to the writeable enqueued by this object (both
// past and future). Because the writeable won't receive didFinishWithError:,
// past and future). Because the writeable won't receive writesFinishedWithError:,
// this also releases the writeable and the writer.
- (void)cancelSilently;
@end

@ -43,7 +43,7 @@
@implementation GRPCDelegateWrapper {
dispatch_queue_t _writeableQueue;
// This ensures that didFinishWithError: is only sent once to the writeable.
// This ensures that writesFinishedWithError: is only sent once to the writeable.
dispatch_once_t _alreadyFinished;
}
@ -69,7 +69,7 @@
// the race.
id<GRXWriteable> writeable = self.writeable;
if (writeable) {
[writeable didReceiveValue:message];
[writeable writeValue:message];
handler();
}
});
@ -80,7 +80,7 @@
dispatch_once(&_alreadyFinished, ^{
// Cancellation is now impossible. None of the other three blocks can run
// concurrently with this one.
[self.writeable didFinishWithError:nil];
[self.writeable writesFinishedWithError:nil];
// Break the retain cycle with writer, and skip any possible message to the
// wrapped writeable enqueued after this one.
self.writeable = nil;
@ -100,7 +100,7 @@
self.writeable = nil;
dispatch_async(_writeableQueue, ^{
[writeable didFinishWithError:error];
[writeable writesFinishedWithError:error];
// Break the retain cycle with writer.
self.writer = nil;
});

@ -71,9 +71,9 @@
if ((self = [super initWithHost:host method:method requestsWriter:bytesWriter])) {
// A writeable that parses the proto messages received.
_responseWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
[responsesWriteable didReceiveValue:[responseClass parseFromData:value]];
[responsesWriteable writeValue:[responseClass parseFromData:value]];
} completionHandler:^(NSError *errorOrNil) {
[responsesWriteable didFinishWithError:errorOrNil];
[responsesWriteable writesFinishedWithError:errorOrNil];
}];
}
return self;

@ -1,3 +1,47 @@
gRPC implementation for Objective-C on iOS
# gRPC for Objective-C
This is a work in progress.
## How to generate a client library from a Protocol Buffers definition
First install v3 of the Protocol Buffers compiler (_protoc_), by cloning [its Git repository](https://github.com/google/protobuf) and following these [installation instructions](https://github.com/google/protobuf#c-installation---unix) (the ones titled C++; don't miss the note for Mac users).
Then clone this repository and execute the following commands from the root directory where it was cloned.
Compile the gRPC plugins for _protoc_:
```sh
make plugins
```
Create a symbolic link to the compiled plugin binary somewhere in your `$PATH`:
```sh
ln -s `pwd`/bins/opt/grpc_objective_c_plugin /usr/local/bin/protoc-gen-objcgrpc
```
(Notice that the name of the created link must begin with "protoc-gen-" for _protoc_ to recognize it as a plugin).
If you don't want to create the symbolic link, you can alternatively copy the binary (with the appropriate name). Or you might prefer instead to specify the plugin's path as a flag when invoking _protoc_, in which case no system modification nor renaming is necessary.
Finally, run _protoc_ with the following flags to generate the client library for your `.proto` files:
```sh
protoc --objc_out=. --objcrpc_out=. *.proto
```
This will generate a pair of `.pbobjc.h`/`.pbobjc.m` files for each `.proto` file, with the messages and enums defined in them. And a pair of `.pbrpc.h`/`.pbrpc.m` files for each `.proto` file with services defined. The latter contains the code to make remote calls to the specified API.
## How to integrate a generated gRPC library in your project
### If you use Cocoapods
This is the recommended approach.
You need to create a Podspec file for the generated library. This is simply a matter of copying an example like [this one](https://github.com/grpc/grpc/blob/master/src/objective-c/examples/Sample/RemoteTestClient/RemoteTest.podspec) to the directory where the source files were generated. Update the name and other metadata of the Podspec as suitable.
Once your library has a Podspec, refer to it from your Podfile using `:path` as described [here](https://guides.cocoapods.org/using/the-podfile.html#using-the-files-from-a-folder-local-to-the-machine).
### If you don't use Cocoapods
You need to compile the generated `.pbpbjc.*` files (the enums and messages) without ARC support, and the generated `.pbrpc.*` files (the services) with ARC support. The generated code depends on v0.3+ of the Objective-C gRPC runtime library and v3.0+ of the Objective-C Protobuf runtime library.
These libraries need to be integrated into your project as described in their respective Podspec files:
* [Podspec](https://github.com/grpc/grpc/blob/master/gRPC.podspec) for the Objective-C gRPC runtime library. This can be tedious to configure manually.
* [Podspec](https://github.com/jcanizales/protobuf/blob/add-podspec/Protobuf.podspec) for the Objective-C Protobuf runtime library.

@ -0,0 +1,59 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import <Foundation/Foundation.h>
#import "GRXWriteable.h"
#import "GRXWriter.h"
// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
// to -startWithWriteable:).
// Once it is started, whatever values are written into it (via -writeValue:) will be propagated
// immediately, unless flow control prevents it.
// If it is throttled and keeps receiving values, as well as if it receives values before being
// started, it will buffer them and propagate them in order as soon as its state becomes
// GRXWriterStateStarted.
// If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
// propagate the error immediately.
//
// Beware that a pipe of this type can't prevent receiving more values when it is paused (for
// example if used to write data to a congested network connection). Because in such situations the
// pipe will keep buffering all data written to it, your application could run out of memory and
// crash. If you want to react to flow control signals to prevent that, instead of using this class
// you can implement an object that conforms to GRXWriter.
@interface GRXBufferedPipe : NSObject<GRXWriteable, GRXWriter>
// Convenience constructor.
+ (instancetype)pipe;
@end

@ -0,0 +1,146 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import "GRXBufferedPipe.h"
@implementation GRXBufferedPipe {
id<GRXWriteable> _writeable;
NSMutableArray *_queue;
BOOL _inputIsFinished;
NSError *_errorOrNil;
}
@synthesize state = _state;
+ (instancetype)pipe {
return [[self alloc] init];
}
- (instancetype)init {
if (self = [super init]) {
_queue = [NSMutableArray array];
_state = GRXWriterStateNotStarted;
}
return self;
}
- (id)popValue {
id value = _queue[0];
[_queue removeObjectAtIndex:0];
return value;
}
- (void)writeBufferUntilPausedOrStopped {
while (_state == GRXWriterStateStarted && _queue.count > 0) {
[_writeable writeValue:[self popValue]];
}
if (_inputIsFinished && _queue.count == 0) {
// Our writer finished normally while we were paused or not-started-yet.
[self finishWithError:_errorOrNil];
}
}
#pragma mark GRXWriteable implementation
// Returns whether events can be simply propagated to the other end of the pipe.
- (BOOL)shouldFastForward {
return _state == GRXWriterStateStarted && _queue.count == 0;
}
- (void)writeValue:(id)value {
if (self.shouldFastForward) {
// Skip the queue.
[_writeable writeValue:value];
} else {
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
// So just buffer the new value.
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
if ([value respondsToSelector:@selector(copy)]) {
value = [value copy];
}
[_queue addObject:value];
}
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
_inputIsFinished = YES;
_errorOrNil = errorOrNil;
if (errorOrNil || self.shouldFastForward) {
// No need to write pending values.
[self finishWithError:_errorOrNil];
}
}
#pragma mark GRXWriter implementation
- (void)setState:(GRXWriterState)newState {
// Manual transitions are only allowed from the started or paused states.
if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
}
switch (newState) {
case GRXWriterStateFinished:
_state = newState;
_queue = nil;
// Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
// writeable to be messaged anymore.
_writeable = nil;
return;
case GRXWriterStatePaused:
_state = newState;
return;
case GRXWriterStateStarted:
if (_state == GRXWriterStatePaused) {
_state = newState;
[self writeBufferUntilPausedOrStopped];
}
return;
case GRXWriterStateNotStarted:
return;
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
_state = GRXWriterStateStarted;
_writeable = writeable;
[self writeBufferUntilPausedOrStopped];
}
- (void)finishWithError:(NSError *)errorOrNil {
id<GRXWriteable> writeable = _writeable;
self.state = GRXWriterStateFinished;
[writeable writesFinishedWithError:errorOrNil];
}
@end

@ -109,7 +109,7 @@
- (void)writeUntilPausedOrStopped {
id value;
while (value = [_enumerator nextObject]) {
[_writeable didReceiveValue:value];
[_writeable writeValue:value];
// If the writeable has a reference to us, it might change our state to paused or finished.
if (_state == GRXWriterStatePaused || _state == GRXWriterStateFinished) {
return;
@ -130,7 +130,7 @@
_errorOrNil = nil;
id<GRXWriteable> writeable = _writeable;
_writeable = nil;
[writeable didFinishWithError:errorOrNil];
[writeable writesFinishedWithError:errorOrNil];
}
- (void)setState:(GRXWriterState)newState {

@ -38,14 +38,12 @@
@protocol GRXWriteable <NSObject>
// Push the next value of the sequence to the receiving object.
// TODO(jcanizales): Name it enumerator:(id<GRXEnumerator>) didProduceValue:(id)?
- (void)didReceiveValue:(id)value;
- (void)writeValue:(id)value;
// Signal that the sequence is completed, or that an error ocurred. After this
// message is sent to the instance, neither it nor didReceiveValue: may be
// message is sent to the instance, neither it nor writeValue: may be
// called again.
// TODO(jcanizales): enumerator:(id<GRXEnumerator>) didFinishWithError:(NSError*)?
- (void)didFinishWithError:(NSError *)errorOrNil;
- (void)writesFinishedWithError:(NSError *)errorOrNil;
@end
typedef void (^GRXValueHandler)(id value);

@ -76,13 +76,13 @@
return self;
}
- (void)didReceiveValue:(id)value {
- (void)writeValue:(id)value {
if (_valueHandler) {
_valueHandler(value);
}
}
- (void)didFinishWithError:(NSError *)errorOrNil {
- (void)writesFinishedWithError:(NSError *)errorOrNil {
if (_completionHandler) {
_completionHandler(errorOrNil);
}

@ -50,7 +50,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
// The writer is temporarily paused, and won't send any more values to the
// writeable unless its state is set back to Started. The writer might still
// transition to the Finished state at any moment, and is allowed to send
// didFinishWithError: to its writeable.
// writesFinishedWithError: to its writeable.
//
// Not all implementations of writer have to support pausing, and thus
// trying to set an writer's state to this value might have no effect.
@ -59,7 +59,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
// The writer has released its writeable and won't interact with it anymore.
//
// One seldomly wants to set an writer's state to this value, as its
// writeable isn't notified with a didFinishWithError: message. Instead, sending
// writeable isn't notified with a writesFinishedWithError: message. Instead, sending
// finishWithError: to the writer will make it notify the writeable and then
// transition to this state.
GRXWriterStateFinished
@ -105,7 +105,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
// This method might only be called on writers in the NotStarted state.
- (void)startWithWriteable:(id<GRXWriteable>)writeable;
// Send didFinishWithError:errorOrNil immediately to the writeable, and don't send
// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send
// any more messages to it.
//
// This method might only be called on writers in the Started or Paused

@ -62,7 +62,7 @@
- (void)finishOutputWithError:(NSError *)errorOrNil {
id<GRXWriteable> writeable = _writeable;
_writeable = nil;
[writeable didFinishWithError:errorOrNil];
[writeable writesFinishedWithError:errorOrNil];
}
// This is used to stop the input writer. It nillifies our reference to it
@ -75,11 +75,11 @@
#pragma mark GRXWriteable implementation
- (void)didReceiveValue:(id)value {
[_writeable didReceiveValue:value];
- (void)writeValue:(id)value {
[_writeable writeValue:value];
}
- (void)didFinishWithError:(NSError *)errorOrNil {
- (void)writesFinishedWithError:(NSError *)errorOrNil {
_writer = nil;
[self finishOutputWithError:errorOrNil];
}

@ -57,7 +57,7 @@ static id (^kIdentity)(id value) = ^id(id value) {
}
// Override
- (void)didReceiveValue:(id)value {
[super didReceiveValue:_map(value)];
- (void)writeValue:(id)value {
[super writeValue:_map(value)];
}
@end

@ -36,13 +36,46 @@
#import <UIKit/UIKit.h>
#import <XCTest/XCTest.h>
#import <gRPC/ProtoRPC.h>
#import <gRPC/GRXWriter+Immediate.h>
#import <gRPC/GRXBufferedPipe.h>
#import <gRPC/ProtoRPC.h>
#import <RemoteTest/Empty.pbobjc.h>
#import <RemoteTest/Messages.pbobjc.h>
#import <RemoteTest/Test.pbobjc.h>
#import <RemoteTest/Test.pbrpc.h>
// Convenience constructors for the generated proto messages:
@interface RMTStreamingOutputCallRequest (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
requestedResponseSize:(NSNumber *)responseSize;
@end
@implementation RMTStreamingOutputCallRequest (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
requestedResponseSize:(NSNumber *)responseSize {
RMTStreamingOutputCallRequest *request = [self message];
RMTResponseParameters *parameters = [RMTResponseParameters message];
parameters.size = responseSize.integerValue;
[request.responseParametersArray addObject:parameters];
request.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
return request;
}
@end
@interface RMTStreamingOutputCallResponse (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize;
@end
@implementation RMTStreamingOutputCallResponse (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize {
RMTStreamingOutputCallResponse * response = [self message];
response.payload.type = RMTPayloadType_Compressable;
response.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
return response;
}
@end
@interface RemoteProtoTests : XCTestCase
@end
@ -70,7 +103,7 @@
[expectation fulfill];
}];
[self waitForExpectationsWithTimeout:2. handler:nil];
[self waitForExpectationsWithTimeout:2 handler:nil];
}
- (void)testLargeUnaryRPC {
@ -92,7 +125,7 @@
[expectation fulfill];
}];
[self waitForExpectationsWithTimeout:4. handler:nil];
[self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testClientStreamingRPC {
@ -124,7 +157,7 @@
[expectation fulfill];
}];
[self waitForExpectationsWithTimeout:4. handler:nil];
[self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testServerStreamingRPC {
@ -149,10 +182,7 @@
if (response) {
XCTAssertLessThan(index, 4, @"More than 4 responses received.");
RMTStreamingOutputCallResponse * expected = [RMTStreamingOutputCallResponse message];
expected.payload.type = RMTPayloadType_Compressable;
int expectedSize = [expectedSizes[index] unsignedIntegerValue];
expected.payload.body = [NSMutableData dataWithLength:expectedSize];
id expected = [RMTStreamingOutputCallResponse messageWithPayloadSize:expectedSizes[index]];
XCTAssertEqualObjects(response, expected);
index += 1;
}
@ -166,6 +196,49 @@
[self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testPingPongRPC {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPong"];
NSArray *requests = @[@27182, @8, @1828, @45904];
NSArray *responses = @[@31415, @9, @2653, @58979];
GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
__block int index = 0;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[requestsBuffer writeValue:request];
[_service fullDuplexCallWithRequestsWriter:requestsBuffer
handler:^(BOOL done,
RMTStreamingOutputCallResponse *response,
NSError *error) {
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
XCTAssertTrue(done || response, @"Event handler called without an event.");
if (response) {
XCTAssertLessThan(index, 4, @"More than 4 responses received.");
id expected = [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
XCTAssertEqualObjects(response, expected);
index += 1;
if (index < 4) {
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[requestsBuffer writeValue:request];
} else {
[requestsBuffer writesFinishedWithError:nil];
}
}
if (done) {
XCTAssertEqual(index, 4, @"Received %i responses instead of 4.", index);
[expectation fulfill];
}
}];
[self waitForExpectationsWithTimeout:2 handler:nil];
}
- (void)testEmptyStreamRPC {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"];
[_service fullDuplexCallWithRequestsWriter:[GRXWriter emptyWriter]
@ -176,13 +249,16 @@
XCTAssert(done, @"Unexpected response: %@", response);
[expectation fulfill];
}];
[self waitForExpectationsWithTimeout:4 handler:nil];
[self waitForExpectationsWithTimeout:2 handler:nil];
}
- (void)testCancelAfterBeginRPC {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterBegin"];
// TODO(mlumish): change to writing that blocks instead of writing
ProtoRPC *call = [_service RPCToStreamingInputCallWithRequestsWriter:[GRXWriter emptyWriter]
// A buffered pipe to which we never write any value acts as a writer that just hangs.
GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
ProtoRPC *call = [_service RPCToStreamingInputCallWithRequestsWriter:requestsBuffer
handler:^(RMTStreamingInputCallResponse *response,
NSError *error) {
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);

@ -7,7 +7,7 @@ The Python facility of gRPC.
Status
-------
Usable with limitations, Alpha
Alpha : Ready for early adopters
Prerequisites
-----------------------

@ -1,3 +1,3 @@
enum34==1.0.4
futures==2.2.0
protobuf==3.0.0-alpha-1
protobuf==3.0.0a2

@ -93,6 +93,6 @@ setuptools.setup(
install_requires=[
'enum34==1.0.4',
'futures==2.2.0',
'protobuf==3.0.0-alpha-1'
'protobuf==3.0.0a2'
]
)

@ -34,13 +34,25 @@ INCLUDEDIR = RbConfig::CONFIG['includedir']
if ENV.key? 'GRPC_ROOT'
GRPC_ROOT = ENV['GRPC_ROOT']
if ENV.key? 'GRPC_LIB_DIR'
GRPC_LIB_DIR = ENV['GRPC_LIB_DIR']
else
grpc_root = File.expand_path(File.join(File.dirname(__FILE__), '../../../..'))
if File.exist?(File.join(grpc_root, 'include/grpc/grpc.h'))
GRPC_ROOT = grpc_root
else
GRPC_LIB_DIR = 'libs/opt'
GRPC_ROOT = nil
end
end
if ENV.key? 'CONFIG'
GRPC_CONFIG = ENV['CONFIG']
else
GRPC_ROOT = nil
GRPC_CONFIG = 'opt'
end
if (ENV.key? 'GRPC_LIB_DIR') && (!GRPC_ROOT.nil?)
GRPC_LIB_DIR = File.join(GRPC_ROOT, ENV['GRPC_LIB_DIR'])
else
GRPC_LIB_DIR = File.join(File.join(GRPC_ROOT, 'libs'), GRPC_CONFIG)
end
HEADER_DIRS = [
@ -67,7 +79,10 @@ LIB_DIRS = [
unless GRPC_ROOT.nil?
HEADER_DIRS.unshift File.join(GRPC_ROOT, 'include')
LIB_DIRS.unshift File.join(GRPC_ROOT, GRPC_LIB_DIR)
LIB_DIRS.unshift GRPC_LIB_DIR
unless File.exist?(File.join(GRPC_LIB_DIR, 'libgrpc.a'))
system("make -C #{GRPC_ROOT} static_c CONFIG=#{GRPC_CONFIG}")
end
end
def crash(msg)

@ -33,7 +33,7 @@
#include "rb_byte_buffer.h"
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include <grpc/support/slice.h>

@ -34,8 +34,9 @@
#ifndef GRPC_RB_BYTE_BUFFER_H_
#define GRPC_RB_BYTE_BUFFER_H_
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include <ruby.h>
/* Converts a char* with a length to a grpc_byte_buffer */
grpc_byte_buffer *grpc_rb_s_to_byte_buffer(char *string, size_t length);

@ -33,7 +33,7 @@
#include "rb_call.h"
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>

@ -34,8 +34,9 @@
#ifndef GRPC_RB_CALL_H_
#define GRPC_RB_CALL_H_
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include <ruby.h>
/* Gets the wrapped call from a VALUE. */
grpc_call* grpc_rb_get_wrapped_call(VALUE v);

@ -33,7 +33,7 @@
#include "rb_channel.h"
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>

@ -34,7 +34,8 @@
#ifndef GRPC_RB_CHANNEL_H_
#define GRPC_RB_CHANNEL_H_
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
/* Initializes the Channel class. */

@ -33,7 +33,8 @@
#include "rb_channel_args.h"
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include "rb_grpc.h"

@ -34,7 +34,8 @@
#ifndef GRPC_RB_CHANNEL_ARGS_H_
#define GRPC_RB_CHANNEL_ARGS_H_
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
/* Converts a hash object containing channel args to a channel args instance.

@ -34,8 +34,9 @@
#ifndef GRPC_RB_COMPLETION_QUEUE_H_
#define GRPC_RB_COMPLETION_QUEUE_H_
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include <ruby.h>
/* Gets the wrapped completion queue from the ruby wrapper */
grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);

@ -33,7 +33,7 @@
#include "rb_credentials.h"
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>

@ -34,7 +34,8 @@
#ifndef GRPC_RB_CREDENTIALS_H_
#define GRPC_RB_CREDENTIALS_H_
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc_security.h>
/* Initializes the ruby Credentials class. */

@ -35,7 +35,8 @@
#define GRPC_RB_H_
#include <sys/time.h>
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/support/time.h>
/* grpc_rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */

@ -33,7 +33,7 @@
#include "rb_server.h"
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>

@ -34,7 +34,8 @@
#ifndef GRPC_RB_SERVER_H_
#define GRPC_RB_SERVER_H_
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
/* Initializes the Server class. */

@ -33,7 +33,7 @@
#include "rb_server_credentials.h"
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>

@ -34,7 +34,8 @@
#ifndef GRPC_RB_SERVER_CREDENTIALS_H_
#define GRPC_RB_SERVER_CREDENTIALS_H_
#include <ruby.h>
#include <ruby/ruby.h>
#include <grpc/grpc_security.h>
/* Initializes the ruby ServerCredentials class. */

@ -127,11 +127,15 @@ static gpr_slice merge_slices(gpr_slice *slices, size_t nslices) {
}
static int byte_buffer_eq_slice(grpc_byte_buffer *bb, gpr_slice b) {
gpr_slice a =
merge_slices(bb->data.slice_buffer.slices, bb->data.slice_buffer.count);
int ok = GPR_SLICE_LENGTH(a) == GPR_SLICE_LENGTH(b) &&
0 == memcmp(GPR_SLICE_START_PTR(a), GPR_SLICE_START_PTR(b),
GPR_SLICE_LENGTH(a));
gpr_slice a;
int ok;
if (!bb) return 0;
a = merge_slices(bb->data.slice_buffer.slices, bb->data.slice_buffer.count);
ok = GPR_SLICE_LENGTH(a) == GPR_SLICE_LENGTH(b) &&
0 == memcmp(GPR_SLICE_START_PTR(a), GPR_SLICE_START_PTR(b),
GPR_SLICE_LENGTH(a));
gpr_slice_unref(a);
gpr_slice_unref(b);
return ok;

@ -82,6 +82,7 @@ END2END_TESTS = {
'request_response_with_payload_and_call_creds': TestOptions(flaky=False, secure=True),
'request_with_large_metadata': default_test_options,
'request_with_payload': default_test_options,
'server_finishes_request': default_test_options,
'simple_delayed_request': default_test_options,
'simple_request': default_test_options,
'simple_request_with_high_initial_sequence_number': default_test_options,

@ -171,23 +171,30 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload;
op++;
op = ops;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(102), 1);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);

@ -178,8 +178,7 @@ static void test_max_message_length(grpc_end2end_test_config config) {
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_CANCELLED);
GPR_ASSERT(0 == strcmp(details, "Cancelled"));
GPR_ASSERT(status != GRPC_STATUS_OK);
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
GPR_ASSERT(was_cancelled == 1);

@ -188,6 +188,18 @@ static void test_request_response_with_metadata_and_payload(
op->data.send_initial_metadata.count = 2;
op->data.send_initial_metadata.metadata = meta_s;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload;
op++;
@ -196,15 +208,9 @@ static void test_request_response_with_metadata_and_payload(
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(102), 1);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);

@ -174,6 +174,18 @@ static void test_request_response_with_metadata_and_payload(
op->data.send_initial_metadata.count = 2;
op->data.send_initial_metadata.metadata = meta_s;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload;
op++;
@ -182,15 +194,9 @@ static void test_request_response_with_metadata_and_payload(
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(102), 1);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);

@ -165,6 +165,18 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload;
op++;
@ -173,15 +185,9 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(102), 1);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);

@ -224,6 +224,18 @@ static void request_response_with_payload_and_call_creds(
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload;
op++;
@ -232,15 +244,9 @@ static void request_response_with_payload_and_call_creds(
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(102), 1);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);

@ -174,6 +174,18 @@ static void test_request_response_with_metadata_and_payload(
op->data.send_initial_metadata.count = 2;
op->data.send_initial_metadata.metadata = meta_s;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload;
op++;
@ -183,15 +195,9 @@ static void test_request_response_with_metadata_and_payload(
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(102), 1);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);

@ -169,20 +169,26 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(102), 1);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);

@ -160,20 +160,26 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(102), 1);
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);

@ -0,0 +1,200 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "test/core/end2end/end2end_tests.h"
#include <stdio.h>
#include <string.h>
#include "src/core/support/string.h"
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
enum { TIMEOUT = 200000 };
static void *tag(gpr_intptr t) { return (void *)t; }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char *test_name,
grpc_channel_args *client_args,
grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_client(&f, client_args);
config.init_server(&f, server_args);
return f;
}
static gpr_timespec n_seconds_time(int n) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
}
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_destroy(f->server);
f->server = NULL;
}
static void shutdown_client(grpc_end2end_test_fixture *f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = NULL;
}
static void end_test(grpc_end2end_test_fixture *f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->server_cq);
drain_cq(f->server_cq);
grpc_completion_queue_destroy(f->server_cq);
grpc_completion_queue_shutdown(f->client_cq);
drain_cq(f->client_cq);
grpc_completion_queue_destroy(f->client_cq);
}
static void simple_request_body(grpc_end2end_test_fixture f) {
grpc_call *c;
grpc_call *s;
gpr_timespec deadline = five_seconds_time();
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
grpc_op ops[6];
grpc_op *op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_status_code status;
char *details = NULL;
size_t details_capacity = 0;
int was_cancelled = 2;
c = grpc_channel_create_call(f.client, f.client_cq, "/foo",
"foo.test.google.fr:1234", deadline);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->data.recv_status_on_client.status_details_capacity = &details_capacity;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.server_cq,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
op->data.send_status_from_server.status_details = "xyz";
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
GPR_ASSERT(was_cancelled == 0);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_destroy(c);
grpc_call_destroy(s);
cq_verifier_destroy(v_client);
cq_verifier_destroy(v_server);
}
static void test_invoke_simple_request(grpc_end2end_test_config config) {
grpc_end2end_test_fixture f;
f = begin_test(config, __FUNCTION__, NULL, NULL);
simple_request_body(f);
end_test(&f);
config.tear_down_data(&f);
}
void grpc_end2end_tests(grpc_end2end_test_config config) {
test_invoke_simple_request(config);
}

@ -1 +1 @@
Subproject commit 644a6a1da71385e9d7a7a26b3476c93fdd71788c
Subproject commit a8b38c598d7f65b281a72809b28117afdb760931

@ -0,0 +1,8 @@
#!/bin/bash
cp -R /var/local/git-clone/grpc /var/local/git
make clean -C /var/local/git/grpc
make install_c -j12 -C /var/local/git/grpc
cd /var/local/git/grpc/src/node && npm install && node-gyp rebuild

@ -43,7 +43,7 @@ RUN apt-get update && apt-get install -y \
python-virtualenv
# Install Python packages from PyPI
RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.0.0-alpha-1
RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.0.0a2
# Get the GRPC source from GitHub
RUN git clone --recursive https://github.com/grpc/grpc.git /var/local/git/grpc

@ -0,0 +1,8 @@
#!/bin/bash
cp -R /var/local/git-clone/grpc /var/local/git
make clean -C /var/local/git/grpc
make install_c -j12 -C /var/local/git/grpc
/bin/bash -l -c 'cd /var/local/git/grpc/src/ruby && gem update bundler && bundle && rake'

@ -428,6 +428,7 @@ grpc_interop_test_args() {
python) grpc_port=8050 ;;
ruby) grpc_port=8060 ;;
csharp_mono) grpc_port=8070 ;;
csharp_dotnet) grpc_port=8070 ;;
*) echo "bad server_type: $1" 1>&2; return 1 ;;
esac
shift
@ -870,6 +871,23 @@ grpc_launch_servers() {
done
}
# Launch servers on windows.
grpc_launch_windows_servers() {
local host='grpc-windows-interop1'
local killcmd="ps -e | grep Grpc.IntegrationTesting | awk '{print \\\$1}' | xargs kill -9"
echo "killing all servers and clients on $host with command $killcmd"
gcloud compute $project_opt ssh $zone_opt stoked-keyword-656@grpc-windows-proxy --command "ssh $host \"$killcmd\""
local cmd='cd /cygdrive/c/github/grpc/src/csharp/Grpc.IntegrationTesting.Server/bin/Debug && ./Grpc.IntegrationTesting.Server.exe --use_tls=true --port=8070'
# gcloud's auto-uploading of RSA keys doesn't work for Windows VMs.
# So we have a linux machine that is authorized to access the Windows
# machine through ssh and we use gcloud auth support to logon to the proxy.
echo "will run:"
echo " $cmd"
echo "on $host (through grpc-windows-proxy)"
gcloud compute $project_opt ssh $zone_opt stoked-keyword-656@grpc-windows-proxy --command "ssh $host '$cmd'"
}
# Runs a test command on a docker instance
#
# The test command is issued via gcloud compute
@ -949,6 +967,7 @@ test_runner() {
# node: 8040
# python: 8050
# ruby: 8060
# csharp: 8070
#
# each client_type should have an associated bash func:
# grpc_interop_gen_<client_type>_cmd

@ -37,6 +37,6 @@ root=`pwd`
rm -rf python2.7_virtual_environment
virtualenv -p /usr/bin/python2.7 python2.7_virtual_environment
source python2.7_virtual_environment/bin/activate
pip install enum34==1.0.4 futures==2.2.0 protobuf==3.0.0-alpha-1
pip install enum34==1.0.4 futures==2.2.0 protobuf==3.0.0a2
CFLAGS=-I$root/include LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src
pip install src/python/interop

@ -31,17 +31,10 @@
set -ex
CONFIG=${CONFIG:-opt}
export CONFIG=${CONFIG:-opt}
# change to grpc repo root
cd $(dirname $0)/../..
# tells npm install to look for files in that directory
export GRPC_ROOT=`pwd`
# tells npm install the subdirectory with library files
export GRPC_LIB_SUBDIR=libs/$CONFIG
cd src/ruby
# change to grpc's ruby directory
cd $(dirname $0)/../../src/ruby
bundle install
rake compile:grpc

@ -209,8 +209,9 @@ class Job(object):
self._state = _FAILURE
self._tempfile.seek(0)
stdout = self._tempfile.read()
message('FAILED', '%s [ret=%d]' % (
self._spec.shortname, self._process.returncode), stdout, do_newline=True)
message('FAILED', '%s [ret=%d, pid=%d]' % (
self._spec.shortname, self._process.returncode, self._process.pid),
stdout, do_newline=True)
else:
self._state = _SUCCESS
message('PASSED', '%s [time=%.1fsec]' % (self._spec.shortname, elapsed),

@ -45,6 +45,6 @@ git submodule > $submodules
diff -u $submodules - << EOF
05b155ff59114735ec8cd089f669c4c3d8f59029 third_party/gflags (v2.1.0-45-g05b155f)
3df69d3aefde7671053d4e3c242b228e5d79c83f third_party/openssl (OpenSSL_1_0_2a)
644a6a1da71385e9d7a7a26b3476c93fdd71788c third_party/protobuf (v3.0.0-alpha-1-35-g644a6a1)
a8b38c598d7f65b281a72809b28117afdb760931 third_party/protobuf (v3.0.0-alpha-1-978-ga8b38c5)
50893291621658f355bc5b4d450a8d06a563053d third_party/zlib (v1.2.8)
EOF

@ -214,7 +214,7 @@ class RubyLanguage(object):
return [config.job_spec(['tools/run_tests/run_ruby.sh'], None)]
def make_targets(self):
return ['static_c']
return ['run_dep_checks']
def build_steps(self):
return [['tools/run_tests/build_ruby.sh']]

@ -938,6 +938,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_fake_security_server_finishes_request_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -1190,6 +1199,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_server_finishes_request_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -1417,6 +1435,14 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_uds_posix_server_finishes_request_test",
"platforms": [
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -1666,6 +1692,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_simple_ssl_fullstack_server_finishes_request_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -1918,6 +1953,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_simple_ssl_with_oauth2_fullstack_server_finishes_request_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -2170,6 +2214,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_server_finishes_request_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -2422,6 +2475,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_one_byte_at_a_time_server_finishes_request_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -2674,6 +2736,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_with_grpc_trace_server_finishes_request_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -2917,6 +2988,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_server_finishes_request_unsecure_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -3136,6 +3216,14 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_uds_posix_server_finishes_request_unsecure_test",
"platforms": [
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -3376,6 +3464,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_server_finishes_request_unsecure_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -3619,6 +3716,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_one_byte_at_a_time_server_finishes_request_unsecure_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -3862,6 +3968,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_with_grpc_trace_server_finishes_request_unsecure_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save