diff --git a/.travis.yml b/.travis.yml index e43a89e453a..165f8923c0e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,9 +19,12 @@ env: - CONFIG=opt TEST=python - CONFIG=gcov TEST=c - CONFIG=gcov TEST=c++ + - USE_GCC=4.4 CONFIG=opt TEST=build + - USE_GCC=4.5 CONFIG=opt TEST=build script: - rvm use $RUBY_VERSION - gem install bundler + - if [ ! -z "$USE_GCC" ] ; then export CC=gcc-$USE_GCC ; export CXX=g++-$USE_GCC ; fi - ./tools/run_tests/run_tests.py -l $TEST -t -j 16 -c $CONFIG -s 4.0 after_success: - if [ "$CONFIG" = "gcov" ] ; then coveralls --exclude third_party --exclude gens -b. --gcov-options '\-p' ; fi diff --git a/Makefile b/Makefile index 0742994982e..308796a9e8e 100644 --- a/Makefile +++ b/Makefile @@ -2553,6 +2553,7 @@ LIBGRPC_SRC = \ src/core/surface/byte_buffer_reader.c \ src/core/surface/call.c \ src/core/surface/call_details.c \ + src/core/surface/call_log_batch.c \ src/core/surface/channel.c \ src/core/surface/channel_create.c \ src/core/surface/client.c \ @@ -2699,6 +2700,7 @@ src/core/surface/byte_buffer_queue.c: $(OPENSSL_DEP) src/core/surface/byte_buffer_reader.c: $(OPENSSL_DEP) src/core/surface/call.c: $(OPENSSL_DEP) src/core/surface/call_details.c: $(OPENSSL_DEP) +src/core/surface/call_log_batch.c: $(OPENSSL_DEP) src/core/surface/channel.c: $(OPENSSL_DEP) src/core/surface/channel_create.c: $(OPENSSL_DEP) src/core/surface/client.c: $(OPENSSL_DEP) @@ -2861,6 +2863,7 @@ $(OBJDIR)/$(CONFIG)/src/core/surface/byte_buffer_queue.o: $(OBJDIR)/$(CONFIG)/src/core/surface/byte_buffer_reader.o: $(OBJDIR)/$(CONFIG)/src/core/surface/call.o: $(OBJDIR)/$(CONFIG)/src/core/surface/call_details.o: +$(OBJDIR)/$(CONFIG)/src/core/surface/call_log_batch.o: $(OBJDIR)/$(CONFIG)/src/core/surface/channel.o: $(OBJDIR)/$(CONFIG)/src/core/surface/channel_create.o: $(OBJDIR)/$(CONFIG)/src/core/surface/client.o: @@ -3036,6 +3039,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/surface/byte_buffer_reader.c \ src/core/surface/call.c \ src/core/surface/call_details.c \ + src/core/surface/call_log_batch.c \ src/core/surface/channel.c \ src/core/surface/channel_create.c \ src/core/surface/client.c \ @@ -3175,6 +3179,7 @@ $(OBJDIR)/$(CONFIG)/src/core/surface/byte_buffer_queue.o: $(OBJDIR)/$(CONFIG)/src/core/surface/byte_buffer_reader.o: $(OBJDIR)/$(CONFIG)/src/core/surface/call.o: $(OBJDIR)/$(CONFIG)/src/core/surface/call_details.o: +$(OBJDIR)/$(CONFIG)/src/core/surface/call_log_batch.o: $(OBJDIR)/$(CONFIG)/src/core/surface/channel.o: $(OBJDIR)/$(CONFIG)/src/core/surface/channel_create.o: $(OBJDIR)/$(CONFIG)/src/core/surface/client.o: @@ -3585,30 +3590,40 @@ $(OBJDIR)/$(CONFIG)/src/cpp/util/status.o: $(OBJDIR)/$(CONFIG)/src/cpp/util/time.o: -LIBGRPC_PYTHON_PLUGIN_SUPPORT_SRC = \ +LIBGRPC_PLUGIN_SUPPORT_SRC = \ + src/compiler/cpp_generator.cc \ src/compiler/python_generator.cc \ + src/compiler/ruby_generator.cc \ PUBLIC_HEADERS_CXX += \ + src/compiler/config.h \ + src/compiler/cpp_generator.h \ + src/compiler/cpp_generator_helpers.h \ + src/compiler/generator_helpers.h \ src/compiler/python_generator.h \ + src/compiler/ruby_generator.h \ + src/compiler/ruby_generator_helpers-inl.h \ + src/compiler/ruby_generator_map-inl.h \ + src/compiler/ruby_generator_string-inl.h \ -LIBGRPC_PYTHON_PLUGIN_SUPPORT_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC_PYTHON_PLUGIN_SUPPORT_SRC)))) +LIBGRPC_PLUGIN_SUPPORT_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC_PLUGIN_SUPPORT_SRC)))) ifeq ($(NO_PROTOBUF),true) # You can't build a C++ library if you don't have protobuf - a bit overreached, but still okay. -$(LIBDIR)/$(CONFIG)/libgrpc_python_plugin_support.a: protobuf_dep_error +$(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a: protobuf_dep_error else -$(LIBDIR)/$(CONFIG)/libgrpc_python_plugin_support.a: $(ZLIB_DEP) $(PROTOBUF_DEP) $(LIBGRPC_PYTHON_PLUGIN_SUPPORT_OBJS) +$(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a: $(ZLIB_DEP) $(PROTOBUF_DEP) $(LIBGRPC_PLUGIN_SUPPORT_OBJS) $(E) "[AR] Creating $@" $(Q) mkdir -p `dirname $@` - $(Q) rm -f $(LIBDIR)/$(CONFIG)/libgrpc_python_plugin_support.a - $(Q) $(AR) rcs $(LIBDIR)/$(CONFIG)/libgrpc_python_plugin_support.a $(LIBGRPC_PYTHON_PLUGIN_SUPPORT_OBJS) + $(Q) rm -f $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a + $(Q) $(AR) rcs $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a $(LIBGRPC_PLUGIN_SUPPORT_OBJS) ifeq ($(SYSTEM),Darwin) - $(Q) ranlib $(LIBDIR)/$(CONFIG)/libgrpc_python_plugin_support.a + $(Q) ranlib $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a endif @@ -3617,10 +3632,12 @@ endif endif ifneq ($(NO_DEPS),true) --include $(LIBGRPC_PYTHON_PLUGIN_SUPPORT_OBJS:.o=.dep) +-include $(LIBGRPC_PLUGIN_SUPPORT_OBJS:.o=.dep) endif +$(OBJDIR)/$(CONFIG)/src/compiler/cpp_generator.o: $(OBJDIR)/$(CONFIG)/src/compiler/python_generator.o: +$(OBJDIR)/$(CONFIG)/src/compiler/ruby_generator.o: LIBPUBSUB_CLIENT_LIB_SRC = \ @@ -8188,7 +8205,6 @@ endif GRPC_CPP_PLUGIN_SRC = \ - src/compiler/cpp_generator.cc \ src/compiler/cpp_plugin.cc \ GRPC_CPP_PLUGIN_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_CPP_PLUGIN_SRC)))) @@ -8202,15 +8218,14 @@ $(BINDIR)/$(CONFIG)/grpc_cpp_plugin: protobuf_dep_error else -$(BINDIR)/$(CONFIG)/grpc_cpp_plugin: $(PROTOBUF_DEP) $(GRPC_CPP_PLUGIN_OBJS) +$(BINDIR)/$(CONFIG)/grpc_cpp_plugin: $(PROTOBUF_DEP) $(GRPC_CPP_PLUGIN_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a $(E) "[HOSTLD] Linking $@" $(Q) mkdir -p `dirname $@` - $(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(GRPC_CPP_PLUGIN_OBJS) $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/grpc_cpp_plugin + $(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(GRPC_CPP_PLUGIN_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/grpc_cpp_plugin endif -$(OBJDIR)/$(CONFIG)/src/compiler/cpp_generator.o: -$(OBJDIR)/$(CONFIG)/src/compiler/cpp_plugin.o: +$(OBJDIR)/$(CONFIG)/src/compiler/cpp_plugin.o: $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a deps_grpc_cpp_plugin: $(GRPC_CPP_PLUGIN_OBJS:.o=.dep) @@ -8233,14 +8248,14 @@ $(BINDIR)/$(CONFIG)/grpc_python_plugin: protobuf_dep_error else -$(BINDIR)/$(CONFIG)/grpc_python_plugin: $(PROTOBUF_DEP) $(GRPC_PYTHON_PLUGIN_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_python_plugin_support.a +$(BINDIR)/$(CONFIG)/grpc_python_plugin: $(PROTOBUF_DEP) $(GRPC_PYTHON_PLUGIN_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a $(E) "[HOSTLD] Linking $@" $(Q) mkdir -p `dirname $@` - $(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(GRPC_PYTHON_PLUGIN_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_python_plugin_support.a $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/grpc_python_plugin + $(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(GRPC_PYTHON_PLUGIN_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/grpc_python_plugin endif -$(OBJDIR)/$(CONFIG)/src/compiler/python_plugin.o: $(LIBDIR)/$(CONFIG)/libgrpc_python_plugin_support.a +$(OBJDIR)/$(CONFIG)/src/compiler/python_plugin.o: $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a deps_grpc_python_plugin: $(GRPC_PYTHON_PLUGIN_OBJS:.o=.dep) @@ -8250,7 +8265,6 @@ endif GRPC_RUBY_PLUGIN_SRC = \ - src/compiler/ruby_generator.cc \ src/compiler/ruby_plugin.cc \ GRPC_RUBY_PLUGIN_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_RUBY_PLUGIN_SRC)))) @@ -8264,15 +8278,14 @@ $(BINDIR)/$(CONFIG)/grpc_ruby_plugin: protobuf_dep_error else -$(BINDIR)/$(CONFIG)/grpc_ruby_plugin: $(PROTOBUF_DEP) $(GRPC_RUBY_PLUGIN_OBJS) +$(BINDIR)/$(CONFIG)/grpc_ruby_plugin: $(PROTOBUF_DEP) $(GRPC_RUBY_PLUGIN_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a $(E) "[HOSTLD] Linking $@" $(Q) mkdir -p `dirname $@` - $(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(GRPC_RUBY_PLUGIN_OBJS) $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/grpc_ruby_plugin + $(Q) $(HOST_LDXX) $(HOST_LDFLAGS) $(GRPC_RUBY_PLUGIN_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a $(HOST_LDLIBSXX) $(HOST_LDLIBS_PROTOC) $(HOST_LDLIBS) $(HOST_LDLIBS_PROTOC) -o $(BINDIR)/$(CONFIG)/grpc_ruby_plugin endif -$(OBJDIR)/$(CONFIG)/src/compiler/ruby_generator.o: -$(OBJDIR)/$(CONFIG)/src/compiler/ruby_plugin.o: +$(OBJDIR)/$(CONFIG)/src/compiler/ruby_plugin.o: $(LIBDIR)/$(CONFIG)/libgrpc_plugin_support.a deps_grpc_ruby_plugin: $(GRPC_RUBY_PLUGIN_OBJS:.o=.dep) diff --git a/build.json b/build.json index a46866263f6..0d53bd60318 100644 --- a/build.json +++ b/build.json @@ -233,6 +233,7 @@ "src/core/surface/byte_buffer_reader.c", "src/core/surface/call.c", "src/core/surface/call_details.c", + "src/core/surface/call_log_batch.c", "src/core/surface/channel.c", "src/core/surface/channel_create.c", "src/core/surface/client.c", @@ -498,14 +499,24 @@ "secure": "no" }, { - "name": "grpc_python_plugin_support", + "name": "grpc_plugin_support", "build": "protoc", "language": "c++", "public_headers": [ - "src/compiler/python_generator.h" + "src/compiler/config.h", + "src/compiler/cpp_generator.h", + "src/compiler/cpp_generator_helpers.h", + "src/compiler/generator_helpers.h", + "src/compiler/python_generator.h", + "src/compiler/ruby_generator.h", + "src/compiler/ruby_generator_helpers-inl.h", + "src/compiler/ruby_generator_map-inl.h", + "src/compiler/ruby_generator_string-inl.h" ], "src": [ - "src/compiler/python_generator.cc" + "src/compiler/cpp_generator.cc", + "src/compiler/python_generator.cc", + "src/compiler/ruby_generator.cc" ], "deps": [], "secure": "no" @@ -1758,15 +1769,12 @@ "name": "grpc_cpp_plugin", "build": "protoc", "language": "c++", - "headers": [ - "src/compiler/cpp_generator.h", - "src/compiler/cpp_generator_helpers.h" - ], "src": [ - "src/compiler/cpp_generator.cc", "src/compiler/cpp_plugin.cc" ], - "deps": [], + "deps": [ + "grpc_plugin_support" + ], "secure": "no" }, { @@ -1777,7 +1785,7 @@ "src/compiler/python_plugin.cc" ], "deps": [ - "grpc_python_plugin_support" + "grpc_plugin_support" ], "secure": "no" }, @@ -1786,10 +1794,11 @@ "build": "protoc", "language": "c++", "src": [ - "src/compiler/ruby_generator.cc", "src/compiler/ruby_plugin.cc" ], - "deps": [], + "deps": [ + "grpc_plugin_support" + ], "secure": "no" }, { diff --git a/include/grpc++/config.h b/include/grpc++/config.h index 35bf5073647..8ef5d71bfac 100644 --- a/include/grpc++/config.h +++ b/include/grpc++/config.h @@ -65,6 +65,28 @@ ::google::protobuf::io::ZeroCopyInputStream #endif +#ifndef __clang__ +#ifdef __GNUC__ +#if (__GNUC__ * 100 + __GNUC_MINOR__ < 406) +#define GRPC_NO_NULLPTR +#endif +#endif +#endif + +#ifdef GRPC_NO_NULLPTR +#include +const class { +public: + template operator T*() const {return static_cast(0);} + template operator std::unique_ptr() const { + return std::unique_ptr(static_cast(0)); + } + operator bool() const {return false;} +private: + void operator&() const = delete; +} nullptr = {}; +#endif + namespace grpc { typedef GRPC_CUSTOM_STRING string; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index d5004624d42..0a84c735200 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -111,7 +111,8 @@ bool HasBidiStreaming(const grpc::protobuf::FileDescriptor *file) { } } // namespace -grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file) { +grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, + const Parameters ¶ms) { grpc::string temp = "#include \n" "#include \n" @@ -158,7 +159,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file) { return temp; } -grpc::string GetSourceIncludes() { +grpc::string GetSourceIncludes(const Parameters ¶m) { return "#include \n" "#include \n" "#include \n" @@ -353,16 +354,27 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer, printer->Print("};\n"); } -grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file) { +grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file, + const Parameters ¶ms) { grpc::string output; grpc::protobuf::io::StringOutputStream output_stream(&output); grpc::protobuf::io::Printer printer(&output_stream, '$'); std::map vars; + if (!params.services_namespace.empty()) { + vars["services_namespace"] = params.services_namespace; + printer.Print(vars, "\nnamespace $services_namespace$ {\n\n"); + } + for (int i = 0; i < file->service_count(); ++i) { PrintHeaderService(&printer, file->service(i), &vars); printer.Print("\n"); } + + if (!params.services_namespace.empty()) { + printer.Print(vars, "} // namespace $services_namespace$\n\n"); + } + return output; } @@ -376,18 +388,18 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { printer->Print(*vars, - "::grpc::Status $Service$::Stub::$Method$(" + "::grpc::Status $ns$$Service$::Stub::$Method$(" "::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) {\n"); printer->Print(*vars, " return ::grpc::BlockingUnaryCall(channel()," - "::grpc::RpcMethod($Service$_method_names[$Idx$]), " + "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$]), " "context, request, response);\n" "}\n\n"); printer->Print( *vars, "std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> " - "$Service$::Stub::Async$Method$(::grpc::ClientContext* context, " + "$ns$$Service$::Stub::Async$Method$(::grpc::ClientContext* context, " "const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, @@ -395,32 +407,32 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, "::grpc::ClientAsyncResponseReader< $Response$>>(new " "::grpc::ClientAsyncResponseReader< $Response$>(" "channel(), cq, " - "::grpc::RpcMethod($Service$_method_names[$Idx$]), " + "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$]), " "context, request, tag));\n" "}\n\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "std::unique_ptr< ::grpc::ClientWriter< $Request$>> " - "$Service$::Stub::$Method$(" + "$ns$$Service$::Stub::$Method$(" "::grpc::ClientContext* context, $Response$* response) {\n"); printer->Print(*vars, " return std::unique_ptr< ::grpc::ClientWriter< " "$Request$>>(new ::grpc::ClientWriter< $Request$>(" "channel()," - "::grpc::RpcMethod($Service$_method_names[$Idx$], " + "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " "context, response));\n" "}\n\n"); printer->Print(*vars, "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>> " - "$Service$::Stub::Async$Method$(" + "$ns$$Service$::Stub::Async$Method$(" "::grpc::ClientContext* context, $Response$* response, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, " return std::unique_ptr< ::grpc::ClientAsyncWriter< " "$Request$>>(new ::grpc::ClientAsyncWriter< $Request$>(" "channel(), cq, " - "::grpc::RpcMethod($Service$_method_names[$Idx$], " + "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " "context, response, tag));\n" "}\n\n"); @@ -428,26 +440,26 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, printer->Print( *vars, "std::unique_ptr< ::grpc::ClientReader< $Response$>> " - "$Service$::Stub::$Method$(" + "$ns$$Service$::Stub::$Method$(" "::grpc::ClientContext* context, const $Request$& request) {\n"); printer->Print(*vars, " return std::unique_ptr< ::grpc::ClientReader< " "$Response$>>(new ::grpc::ClientReader< $Response$>(" "channel()," - "::grpc::RpcMethod($Service$_method_names[$Idx$], " + "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " "::grpc::RpcMethod::RpcType::SERVER_STREAMING), " "context, request));\n" "}\n\n"); printer->Print(*vars, "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> " - "$Service$::Stub::Async$Method$(" + "$ns$$Service$::Stub::Async$Method$(" "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, " return std::unique_ptr< ::grpc::ClientAsyncReader< " "$Response$>>(new ::grpc::ClientAsyncReader< $Response$>(" "channel(), cq, " - "::grpc::RpcMethod($Service$_method_names[$Idx$], " + "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " "::grpc::RpcMethod::RpcType::SERVER_STREAMING), " "context, request, tag));\n" "}\n\n"); @@ -455,27 +467,27 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer, printer->Print( *vars, "std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>> " - "$Service$::Stub::$Method$(::grpc::ClientContext* context) {\n"); + "$ns$$Service$::Stub::$Method$(::grpc::ClientContext* context) {\n"); printer->Print(*vars, " return std::unique_ptr< ::grpc::ClientReaderWriter< " "$Request$, $Response$>>(new ::grpc::ClientReaderWriter< " "$Request$, $Response$>(" "channel()," - "::grpc::RpcMethod($Service$_method_names[$Idx$], " + "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " "::grpc::RpcMethod::RpcType::BIDI_STREAMING), " "context));\n" "}\n\n"); printer->Print(*vars, "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " "$Request$, $Response$>> " - "$Service$::Stub::Async$Method$(::grpc::ClientContext* context, " + "$ns$$Service$::Stub::Async$Method$(::grpc::ClientContext* context, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, " return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " "$Request$, $Response$>>(new " "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>(" "channel(), cq, " - "::grpc::RpcMethod($Service$_method_names[$Idx$], " + "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], " "::grpc::RpcMethod::RpcType::BIDI_STREAMING), " "context, tag));\n" "}\n\n"); @@ -492,7 +504,7 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { printer->Print(*vars, - "::grpc::Status $Service$::Service::$Method$(" + "::grpc::Status $ns$$Service$::Service::$Method$(" "::grpc::ServerContext* context, " "const $Request$* request, $Response$* response) {\n"); printer->Print( @@ -501,7 +513,7 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, printer->Print("}\n\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, - "::grpc::Status $Service$::Service::$Method$(" + "::grpc::Status $ns$$Service$::Service::$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerReader< $Request$>* reader, " "$Response$* response) {\n"); @@ -511,7 +523,7 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, printer->Print("}\n\n"); } else if (ServerOnlyStreaming(method)) { printer->Print(*vars, - "::grpc::Status $Service$::Service::$Method$(" + "::grpc::Status $ns$$Service$::Service::$Method$(" "::grpc::ServerContext* context, " "const $Request$* request, " "::grpc::ServerWriter< $Response$>* writer) {\n"); @@ -521,7 +533,7 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, printer->Print("}\n\n"); } else if (BidiStreaming(method)) { printer->Print(*vars, - "::grpc::Status $Service$::Service::$Method$(" + "::grpc::Status $ns$$Service$::Service::$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerReaderWriter< $Response$, $Request$>* " "stream) {\n"); @@ -543,7 +555,7 @@ void PrintSourceServerAsyncMethod( grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { printer->Print(*vars, - "void $Service$::AsyncService::Request$Method$(" + "void $ns$$Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "$Request$* request, " "::grpc::ServerAsyncResponseWriter< $Response$>* response, " @@ -554,7 +566,7 @@ void PrintSourceServerAsyncMethod( printer->Print("}\n\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, - "void $Service$::AsyncService::Request$Method$(" + "void $ns$$Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " "::grpc::CompletionQueue* cq, void* tag) {\n"); @@ -564,7 +576,7 @@ void PrintSourceServerAsyncMethod( printer->Print("}\n\n"); } else if (ServerOnlyStreaming(method)) { printer->Print(*vars, - "void $Service$::AsyncService::Request$Method$(" + "void $ns$$Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "$Request$* request, " "::grpc::ServerAsyncWriter< $Response$>* writer, " @@ -576,7 +588,7 @@ void PrintSourceServerAsyncMethod( } else if (BidiStreaming(method)) { printer->Print( *vars, - "void $Service$::AsyncService::Request$Method$(" + "void $ns$$Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " "::grpc::CompletionQueue* cq, void *tag) {\n"); @@ -592,7 +604,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, std::map *vars) { (*vars)["Service"] = service->name(); - printer->Print(*vars, "static const char* $Service$_method_names[] = {\n"); + printer->Print(*vars, "static const char* $prefix$$Service$_method_names[] = {\n"); for (int i = 0; i < service->method_count(); ++i) { (*vars)["Method"] = service->method(i)->name(); printer->Print(*vars, " \"/$Package$$Service$/$Method$\",\n"); @@ -601,9 +613,9 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, printer->Print( *vars, - "std::unique_ptr< $Service$::Stub> $Service$::NewStub(" + "std::unique_ptr< $ns$$Service$::Stub> $ns$$Service$::NewStub(" "const std::shared_ptr< ::grpc::ChannelInterface>& channel) {\n" - " std::unique_ptr< $Service$::Stub> stub(new $Service$::Stub());\n" + " std::unique_ptr< $ns$$Service$::Stub> stub(new $ns$$Service$::Stub());\n" " stub->set_channel(channel);\n" " return stub;\n" "}\n\n"); @@ -615,12 +627,12 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, (*vars)["MethodCount"] = as_string(service->method_count()); printer->Print( *vars, - "$Service$::AsyncService::AsyncService(::grpc::CompletionQueue* cq) : " - "::grpc::AsynchronousService(cq, $Service$_method_names, $MethodCount$) " + "$ns$$Service$::AsyncService::AsyncService(::grpc::CompletionQueue* cq) : " + "::grpc::AsynchronousService(cq, $prefix$$Service$_method_names, $MethodCount$) " "{}\n\n"); printer->Print(*vars, - "$Service$::Service::~Service() {\n" + "$ns$$Service$::Service::~Service() {\n" " delete service_;\n" "}\n\n"); for (int i = 0; i < service->method_count(); ++i) { @@ -629,7 +641,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, PrintSourceServerAsyncMethod(printer, service->method(i), vars); } printer->Print(*vars, - "::grpc::RpcService* $Service$::Service::service() {\n"); + "::grpc::RpcService* $ns$$Service$::Service::service() {\n"); printer->Indent(); printer->Print( "if (service_ != nullptr) {\n" @@ -648,52 +660,52 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, printer->Print( *vars, "service_->AddMethod(new ::grpc::RpcServiceMethod(\n" - " $Service$_method_names[$Idx$],\n" + " $prefix$$Service$_method_names[$Idx$],\n" " ::grpc::RpcMethod::NORMAL_RPC,\n" - " new ::grpc::RpcMethodHandler< $Service$::Service, $Request$, " + " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, $Request$, " "$Response$>(\n" - " std::function< ::grpc::Status($Service$::Service*, " + " std::function< ::grpc::Status($ns$$Service$::Service*, " "::grpc::ServerContext*, const $Request$*, $Response$*)>(" - "&$Service$::Service::$Method$), this),\n" + "&$ns$$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, "service_->AddMethod(new ::grpc::RpcServiceMethod(\n" - " $Service$_method_names[$Idx$],\n" + " $prefix$$Service$_method_names[$Idx$],\n" " ::grpc::RpcMethod::CLIENT_STREAMING,\n" " new ::grpc::ClientStreamingHandler< " - "$Service$::Service, $Request$, $Response$>(\n" - " std::function< ::grpc::Status($Service$::Service*, " + "$ns$$Service$::Service, $Request$, $Response$>(\n" + " std::function< ::grpc::Status($ns$$Service$::Service*, " "::grpc::ServerContext*, " "::grpc::ServerReader< $Request$>*, $Response$*)>(" - "&$Service$::Service::$Method$), this),\n" + "&$ns$$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, "service_->AddMethod(new ::grpc::RpcServiceMethod(\n" - " $Service$_method_names[$Idx$],\n" + " $prefix$$Service$_method_names[$Idx$],\n" " ::grpc::RpcMethod::SERVER_STREAMING,\n" " new ::grpc::ServerStreamingHandler< " - "$Service$::Service, $Request$, $Response$>(\n" - " std::function< ::grpc::Status($Service$::Service*, " + "$ns$$Service$::Service, $Request$, $Response$>(\n" + " std::function< ::grpc::Status($ns$$Service$::Service*, " "::grpc::ServerContext*, " "const $Request$*, ::grpc::ServerWriter< $Response$>*)>(" - "&$Service$::Service::$Method$), this),\n" + "&$ns$$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, "service_->AddMethod(new ::grpc::RpcServiceMethod(\n" - " $Service$_method_names[$Idx$],\n" + " $prefix$$Service$_method_names[$Idx$],\n" " ::grpc::RpcMethod::BIDI_STREAMING,\n" " new ::grpc::BidiStreamingHandler< " - "$Service$::Service, $Request$, $Response$>(\n" - " std::function< ::grpc::Status($Service$::Service*, " + "$ns$$Service$::Service, $Request$, $Response$>(\n" + " std::function< ::grpc::Status($ns$$Service$::Service*, " "::grpc::ServerContext*, " "::grpc::ServerReaderWriter< $Response$, $Request$>*)>(" - "&$Service$::Service::$Method$), this),\n" + "&$ns$$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } } @@ -702,7 +714,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, printer->Print("}\n\n"); } -grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file) { +grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file, + const Parameters ¶ms) { grpc::string output; grpc::protobuf::io::StringOutputStream output_stream(&output); grpc::protobuf::io::Printer printer(&output_stream, '$'); @@ -713,6 +726,13 @@ grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file) { if (!file->package().empty()) { vars["Package"].append("."); } + if (!params.services_namespace.empty()) { + vars["ns"] = params.services_namespace + "::"; + vars["prefix"] = params.services_namespace; + } else { + vars["ns"] = ""; + vars["prefix"] = ""; + } for (int i = 0; i < file->service_count(); ++i) { PrintSourceService(&printer, file->service(i), &vars); diff --git a/src/compiler/cpp_generator.h b/src/compiler/cpp_generator.h index 2ecdb5c47e2..04ad71c0673 100644 --- a/src/compiler/cpp_generator.h +++ b/src/compiler/cpp_generator.h @@ -38,17 +38,26 @@ namespace grpc_cpp_generator { +// Contains all the parameters that are parsed from the command line. +struct Parameters { + // Puts the service into a namespace + grpc::string services_namespace; +}; + // Return the includes needed for generated header file. -grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file); +grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, + const Parameters ¶ms); // Return the includes needed for generated source file. -grpc::string GetSourceIncludes(); +grpc::string GetSourceIncludes(const Parameters ¶ms); // Return the services for generated header file. -grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file); +grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file, + const Parameters ¶ms); // Return the services for generated source file. -grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file); +grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file, + const Parameters ¶ms); } // namespace grpc_cpp_generator diff --git a/src/compiler/cpp_plugin.cc b/src/compiler/cpp_plugin.cc index 5b83aa85cf1..57f25a1f75f 100644 --- a/src/compiler/cpp_plugin.cc +++ b/src/compiler/cpp_plugin.cc @@ -58,18 +58,37 @@ class CppGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { return false; } + grpc_cpp_generator::Parameters generator_parameters; + + if (!parameter.empty()) { + std::vector parameters_list = + grpc_generator::tokenize(parameter, ","); + for (auto parameter_string = parameters_list.begin(); + parameter_string != parameters_list.end(); + parameter_string++) { + std::vector param = + grpc_generator::tokenize(*parameter_string, "="); + if (param[0] == "services_namespace") { + generator_parameters.services_namespace = param[1]; + } else { + *error = grpc::string("Unknown parameter: ") + *parameter_string; + return false; + } + } + } + grpc::string file_name = grpc_generator::StripProto(file->name()); // Generate .pb.h Insert(context, file_name + ".pb.h", "includes", - grpc_cpp_generator::GetHeaderIncludes(file)); + grpc_cpp_generator::GetHeaderIncludes(file, generator_parameters)); Insert(context, file_name + ".pb.h", "namespace_scope", - grpc_cpp_generator::GetHeaderServices(file)); + grpc_cpp_generator::GetHeaderServices(file, generator_parameters)); // Generate .pb.cc Insert(context, file_name + ".pb.cc", "includes", - grpc_cpp_generator::GetSourceIncludes()); + grpc_cpp_generator::GetSourceIncludes(generator_parameters)); Insert(context, file_name + ".pb.cc", "namespace_scope", - grpc_cpp_generator::GetSourceServices(file)); + grpc_cpp_generator::GetSourceServices(file, generator_parameters)); return true; } diff --git a/src/compiler/generator_helpers.h b/src/compiler/generator_helpers.h index 1e6727dd4c0..30857891c77 100644 --- a/src/compiler/generator_helpers.h +++ b/src/compiler/generator_helpers.h @@ -75,6 +75,26 @@ inline grpc::string StringReplace(grpc::string str, const grpc::string &from, return str; } +inline std::vector tokenize(const grpc::string &input, + const grpc::string &delimiters) { + std::vector tokens; + size_t pos, last_pos = 0; + + for (;;) { + bool done = false; + pos = input.find_first_of(delimiters, last_pos); + if (pos == grpc::string::npos) { + done = true; + pos = input.length(); + } + + tokens.push_back(input.substr(last_pos, pos - last_pos)); + if (done) return tokens; + + last_pos = pos + 1; + } +} + } // namespace grpc_generator #endif // GRPC_INTERNAL_COMPILER_GENERATOR_HELPERS_H diff --git a/src/core/surface/call.c b/src/core/surface/call.c index cfce9437940..dba63058b83 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1006,6 +1006,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, const grpc_op *op; grpc_ioreq *req; + GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); + if (nops == 0) { grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE); grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); diff --git a/src/core/surface/call.h b/src/core/surface/call.h index cb81cb52c25..06434f87ac8 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -119,4 +119,13 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); +extern int grpc_trace_batch; + +void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, + grpc_call *call, const grpc_op *ops, size_t nops, + void *tag); + +#define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \ + if (grpc_trace_batch) grpc_call_log_batch(sev, call, ops, nops, tag) + #endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c new file mode 100644 index 00000000000..a33583a12d1 --- /dev/null +++ b/src/core/surface/call_log_batch.c @@ -0,0 +1,121 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/call.h" + +#include "src/core/support/string.h" +#include + +int grpc_trace_batch = 0; + +static void add_metadata(gpr_strvec *b, const grpc_metadata *md, size_t count) { + size_t i; + for(i = 0; i < count; i++) { + gpr_strvec_add(b, gpr_strdup("\nkey=")); + gpr_strvec_add(b, gpr_strdup(md[i].key)); + + gpr_strvec_add(b, gpr_strdup(" value=")); + gpr_strvec_add(b, gpr_hexdump(md[i].value, md[i].value_length, + GPR_HEXDUMP_PLAINTEXT)); + } +} + +char *grpc_op_string(const grpc_op *op) { + char *tmp; + char *out; + + gpr_strvec b; + gpr_strvec_init(&b); + + switch (op->op) { + case GRPC_OP_SEND_INITIAL_METADATA: + gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA")); + add_metadata(&b, op->data.send_initial_metadata.metadata, + op->data.send_initial_metadata.count); + break; + case GRPC_OP_SEND_MESSAGE: + gpr_asprintf(&tmp, "SEND_MESSAGE ptr=%p", op->data.send_message); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + gpr_strvec_add(&b, gpr_strdup("SEND_CLOSE_FROM_CLIENT")); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + gpr_asprintf(&tmp, "SEND_STATUS_FROM_SERVER status=%d details=%s", + op->data.send_status_from_server.status, + op->data.send_status_from_server.status_details); + gpr_strvec_add(&b, tmp); + add_metadata(&b, op->data.send_status_from_server.trailing_metadata, + op->data.send_status_from_server.trailing_metadata_count); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + gpr_asprintf(&tmp, "RECV_INITIAL_METADATA ptr=%p", + op->data.recv_initial_metadata); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_RECV_MESSAGE: + gpr_asprintf(&tmp, "RECV_MESSAGE ptr=%p", op->data.recv_message); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + gpr_asprintf(&tmp, + "RECV_STATUS_ON_CLIENT metadata=%p status=%p details=%p", + op->data.recv_status_on_client.trailing_metadata, + op->data.recv_status_on_client.status, + op->data.recv_status_on_client.status_details); + gpr_strvec_add(&b, tmp); + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + gpr_asprintf(&tmp, "RECV_CLOSE_ON_SERVER cancelled=%p", + op->data.recv_close_on_server.cancelled); + gpr_strvec_add(&b, tmp); + } + out = gpr_strvec_flatten(&b, NULL); + gpr_strvec_destroy(&b); + + return out; +} + +void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, + grpc_call *call, const grpc_op *ops, size_t nops, + void *tag) { + char *tmp; + size_t i; + gpr_log(file, line, severity, + "grpc_call_start_batch(%p, %p, %d, 0x%x)", call, ops, nops, tag); + for(i = 0; i < nops; i++) { + tmp = grpc_op_string(&ops[i]); + gpr_log(file, line, severity, "ops[%d]: %s", i, tmp); + gpr_free(tmp); + } +} diff --git a/src/core/surface/init.c b/src/core/surface/init.c index e48c4202e58..d4f0eb40e8e 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -36,6 +36,7 @@ #include "src/core/debug/trace.h" #include "src/core/statistics/census_interface.h" #include "src/core/channel/channel_stack.h" +#include "src/core/surface/call.h" #include "src/core/surface/init.h" #include "src/core/surface/surface_trace.h" #include "src/core/transport/chttp2_transport.h" @@ -57,6 +58,7 @@ void grpc_init(void) { grpc_register_tracer("channel", &grpc_trace_channel); grpc_register_tracer("surface", &grpc_surface_trace); grpc_register_tracer("http", &grpc_http_trace); + grpc_register_tracer("batch", &grpc_trace_batch); grpc_security_pre_init(); grpc_tracer_init("GRPC_TRACE"); grpc_iomgr_init(); @@ -82,4 +84,3 @@ int grpc_is_initialized(void) { gpr_mu_unlock(&g_init_mu); return r; } - diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc index 2c0f4da812b..80c96111b1f 100644 --- a/src/cpp/server/thread_pool.cc +++ b/src/cpp/server/thread_pool.cc @@ -35,28 +35,29 @@ namespace grpc { +void ThreadPool::ThreadFunc() { + for (;;) { + // Wait until work is available or we are shutting down. + std::unique_lock lock(mu_); + if (!shutdown_ && callbacks_.empty()) { + cv_.wait(lock); + } + // Drain callbacks before considering shutdown to ensure all work + // gets completed. + if (!callbacks_.empty()) { + auto cb = callbacks_.front(); + callbacks_.pop(); + lock.unlock(); + cb(); + } else if (shutdown_) { + return; + } + } +} + ThreadPool::ThreadPool(int num_threads) : shutdown_(false) { for (int i = 0; i < num_threads; i++) { - threads_.push_back(std::thread([this]() { - for (;;) { - // Wait until work is available or we are shutting down. - auto have_work = [this]() { return shutdown_ || !callbacks_.empty(); }; - std::unique_lock lock(mu_); - if (!have_work()) { - cv_.wait(lock, have_work); - } - // Drain callbacks before considering shutdown to ensure all work - // gets completed. - if (!callbacks_.empty()) { - auto cb = callbacks_.front(); - callbacks_.pop(); - lock.unlock(); - cb(); - } else if (shutdown_) { - return; - } - } - })); + threads_.push_back(std::thread(&ThreadPool::ThreadFunc, this)); } } diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h index 6225d82a0b6..41e2009ff16 100644 --- a/src/cpp/server/thread_pool.h +++ b/src/cpp/server/thread_pool.h @@ -58,6 +58,8 @@ class ThreadPool GRPC_FINAL : public ThreadPoolInterface { bool shutdown_; std::queue> callbacks_; std::vector threads_; + + void ThreadFunc(); }; } // namespace grpc diff --git a/src/php/ext/grpc/byte_buffer.c b/src/php/ext/grpc/byte_buffer.c index 1ced1bf3f03..9f122d6da67 100644 --- a/src/php/ext/grpc/byte_buffer.c +++ b/src/php/ext/grpc/byte_buffer.c @@ -57,6 +57,11 @@ grpc_byte_buffer *string_to_byte_buffer(char *string, size_t length) { void byte_buffer_to_string(grpc_byte_buffer *buffer, char **out_string, size_t *out_length) { + if (buffer == NULL) { + *out_string = NULL; + *out_length = 0; + return; + } size_t length = grpc_byte_buffer_length(buffer); char *string = ecalloc(length + 1, sizeof(char)); size_t offset = 0; diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index d0e324e2ccd..ba1b2a407d3 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -448,8 +448,12 @@ PHP_METHOD(Call, start_batch) { break; case GRPC_OP_RECV_MESSAGE: byte_buffer_to_string(message, &message_str, &message_len); - add_property_stringl(result, "message", message_str, message_len, - false); + if (message_str == NULL) { + add_property_null(result, "message"); + } else { + add_property_stringl(result, "message", message_str, message_len, + false); + } break; case GRPC_OP_RECV_STATUS_ON_CLIENT: MAKE_STD_ZVAL(recv_status); @@ -478,9 +482,20 @@ cleanup: RETURN_DESTROY_ZVAL(result); } +/** + * Cancel the call. This will cause the call to end with STATUS_CANCELLED if it + * has not already ended with another status. + */ +PHP_METHOD(Call, cancel) { + wrapped_grpc_call *call = + (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); + grpc_call_cancel(call->wrapped); +} + static zend_function_entry call_methods[] = { PHP_ME(Call, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) - PHP_ME(Call, start_batch, NULL, ZEND_ACC_PUBLIC) PHP_FE_END}; + PHP_ME(Call, start_batch, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC) PHP_FE_END}; void grpc_init_call(TSRMLS_D) { zend_class_entry ce; diff --git a/src/php/lib/Grpc/BidiStreamingSurfaceActiveCall.php b/src/php/lib/Grpc/AbstractCall.php old mode 100755 new mode 100644 similarity index 64% rename from src/php/lib/Grpc/BidiStreamingSurfaceActiveCall.php rename to src/php/lib/Grpc/AbstractCall.php index 0459f21e279..b813d16470d --- a/src/php/lib/Grpc/BidiStreamingSurfaceActiveCall.php +++ b/src/php/lib/Grpc/AbstractCall.php @@ -32,44 +32,48 @@ * */ namespace Grpc; + require_once realpath(dirname(__FILE__) . '/../autoload.php'); -/** - * Represents an active call that allows for sending and recieving messages in - * streams in any order. - */ -class BidiStreamingSurfaceActiveCall extends AbstractSurfaceActiveCall { +abstract class AbstractCall { + + protected $call; + protected $deserialize; + protected $metadata; /** - * Reads the next value from the server. - * @return The next value from the server, or null if there is none + * Create a new Call wrapper object. + * @param Channel $channel The channel to communicate on + * @param string $method The method to call on the remote server */ - public function read() { - return $this->_read(); + public function __construct(Channel $channel, $method, $deserialize) { + $this->call = new Call($channel, $method, Timeval::inf_future()); + $this->deserialize = $deserialize; } /** - * Writes a single message to the server. This cannot be called after - * writesDone is called. - * @param $value The message to send + * @return The metadata sent by the server. */ - public function write($value) { - $this->_write($value); + public function getMetadata() { + return $this->metadata; } /** - * Indicate that no more writes will be sent + * Cancels the call */ - public function writesDone() { - $this->_writesDone(); + public function cancel() { + $this->call->cancel(); } /** - * Wait for the server to send the status, and return it. - * @return object The status object, with integer $code and string $details - * members + * Deserialize a response value to an object. + * @param string $value The binary value to deserialize + * @return The deserialized value */ - public function getStatus() { - return $this->_getStatus(); + protected function deserializeResponse($value) { + if ($value === null) { + return null; + } + return call_user_func($this->deserialize, $value); } -} +} \ No newline at end of file diff --git a/src/php/lib/Grpc/AbstractSurfaceActiveCall.php b/src/php/lib/Grpc/AbstractSurfaceActiveCall.php deleted file mode 100755 index 9d0af090ceb..00000000000 --- a/src/php/lib/Grpc/AbstractSurfaceActiveCall.php +++ /dev/null @@ -1,98 +0,0 @@ -active_call = new ActiveCall($channel, $method, $metadata, $flags); - $this->deserialize = $deserialize; - } - - /** - * @return The metadata sent by the server - */ - public function getMetadata() { - return $this->metadata(); - } - - /** - * Cancels the call - */ - public function cancel() { - $this->active_call->cancel(); - } - - protected function _read() { - $response = $this->active_call->read(); - if ($response === null) { - return null; - } - return call_user_func($this->deserialize, $response); - } - - protected function _write($value) { - return $this->active_call->write($value->serialize()); - } - - protected function _writesDone() { - $this->active_call->writesDone(); - } - - protected function _getStatus() { - return $this->active_call->getStatus(); - } -} diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index fde055a3b32..9bc17111106 100755 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -69,11 +69,9 @@ class BaseStub { $argument, callable $deserialize, $metadata = array()) { - return new SimpleSurfaceActiveCall($this->channel, - $method, - $deserialize, - $argument, - $metadata); + $call = new UnaryCall($this->channel, $method, $deserialize); + $call->start($argument, $metadata); + return $call; } /** @@ -91,11 +89,9 @@ class BaseStub { $arguments, callable $deserialize, $metadata = array()) { - return new ClientStreamingSurfaceActiveCall($this->channel, - $method, - $deserialize, - $arguments, - $metadata); + $call = new ClientStreamingCall($this->channel, $method, $deserialize); + $call->start($arguments, $metadata); + return $call; } /** @@ -112,11 +108,9 @@ class BaseStub { $argument, callable $deserialize, $metadata = array()) { - return new ServerStreamingSurfaceActiveCall($this->channel, - $method, - $deserialize, - $argument, - $metadata); + $call = new ServerStreamingCall($this->channel, $method, $deserialize); + $call->start($argument, $metadata); + return $call; } /** @@ -130,9 +124,8 @@ class BaseStub { public function _bidiRequest($method, callable $deserialize, $metadata = array()) { - return new BidiStreamingSurfaceActiveCall($this->channel, - $method, - $deserialize, - $metadata); + $call = new BidiStreamingCall($this->channel, $method, $deserialize); + $call->start($metadata); + return $call; } } diff --git a/src/php/lib/Grpc/ActiveCall.php b/src/php/lib/Grpc/BidiStreamingCall.php old mode 100755 new mode 100644 similarity index 68% rename from src/php/lib/Grpc/ActiveCall.php rename to src/php/lib/Grpc/BidiStreamingCall.php index af4dca50d76..0d3dd629f25 --- a/src/php/lib/Grpc/ActiveCall.php +++ b/src/php/lib/Grpc/BidiStreamingCall.php @@ -35,49 +35,28 @@ namespace Grpc; require_once realpath(dirname(__FILE__) . '/../autoload.php'); /** - * Represents an active call that allows sending and recieving binary data + * Represents an active call that allows for sending and recieving messages in + * streams in any order. */ -class ActiveCall { - private $call; - private $metadata; - +class BidiStreamingCall extends AbstractCall { /** - * Create a new active call. - * @param Channel $channel The channel to communicate on - * @param string $method The method to call on the remote server + * Start the call * @param array $metadata Metadata to send with the call, if applicable */ - public function __construct(Channel $channel, - $method, - $metadata = array()) { - $this->call = new Call($channel, $method, Timeval::inf_future()); - - $event = $this->call->start_batch([OP_SEND_INITIAL_METADATA => $metadata]); - + public function start($metadata) { + $event = $this->call->start_batch([ + OP_SEND_INITIAL_METADATA => $metadata, + OP_RECV_INITIAL_METADATA => true]); $this->metadata = $event->metadata; } /** - * @return The metadata sent by the server. - */ - public function getMetadata() { - return $this->metadata; - } - - /** - * Cancels the call - */ - public function cancel() { - $this->call->cancel(); - } - - /** - * Read a single message from the server. - * @return The next message from the server, or null if there is none. + * Reads the next value from the server. + * @return The next value from the server, or null if there is none */ public function read() { $read_event = $this->call->start_batch([OP_RECV_MESSAGE => true]); - return $read_event->data; + return $this->deserializeResponse($read_event->message); } /** @@ -86,7 +65,7 @@ class ActiveCall { * @param ByteBuffer $data The data to write */ public function write($data) { - $this->call->start_batch([OP_SEND_MESSAGE => $data]); + $this->call->start_batch([OP_SEND_MESSAGE => $data->serialize()]); } /** @@ -102,7 +81,9 @@ class ActiveCall { * and array $metadata members */ public function getStatus() { - $status_event = $this->call->start_batch([RECV_STATUS_ON_CLIENT => true]); - return $status_event->data; + $status_event = $this->call->start_batch([ + OP_RECV_STATUS_ON_CLIENT => true + ]); + return $status_event->status; } -} +} \ No newline at end of file diff --git a/src/php/lib/Grpc/ClientStreamingSurfaceActiveCall.php b/src/php/lib/Grpc/ClientStreamingCall.php old mode 100755 new mode 100644 similarity index 72% rename from src/php/lib/Grpc/ClientStreamingSurfaceActiveCall.php rename to src/php/lib/Grpc/ClientStreamingCall.php index d33f09fbe4e..4b3abcbdec5 --- a/src/php/lib/Grpc/ClientStreamingSurfaceActiveCall.php +++ b/src/php/lib/Grpc/ClientStreamingCall.php @@ -38,25 +38,21 @@ require_once realpath(dirname(__FILE__) . '/../autoload.php'); * Represents an active call that sends a stream of messages and then gets a * single response. */ -class ClientStreamingSurfaceActiveCall extends AbstractSurfaceActiveCall { +class ClientStreamingCall extends AbstractCall { /** - * Create a new simple (single request/single response) active call. - * @param Channel $channel The channel to communicate on - * @param string $method The method to call on the remote server - * @param callable $deserialize The function to deserialize a value + * Start the call. * @param Traversable $arg_iter The iterator of arguments to send * @param array $metadata Metadata to send with the call, if applicable */ - public function __construct(Channel $channel, - $method, - callable $deserialize, - $arg_iter, - $metadata = array()) { - parent::__construct($channel, $method, $deserialize, $metadata, 0); + public function start($arg_iter, $metadata = array()) { + $event = $this->call->start_batch([ + OP_SEND_INITIAL_METADATA => $metadata, + OP_RECV_INITIAL_METADATA => true]); + $this->metadata = $event->metadata; foreach($arg_iter as $arg) { - $this->_write($arg); + $this->call->start_batch([OP_SEND_MESSAGE => $arg->serialize()]); } - $this->_writesDone(); + $this->call->start_batch([OP_SEND_CLOSE_FROM_CLIENT => true]); } /** @@ -64,8 +60,9 @@ class ClientStreamingSurfaceActiveCall extends AbstractSurfaceActiveCall { * @return [response data, status] */ public function wait() { - $response = $this->_read(); - $status = $this->_getStatus(); - return array($response, $status); + $event = $this->call->start_batch([ + OP_RECV_MESSAGE => true, + OP_RECV_STATUS_ON_CLIENT => true]); + return array($this->deserializeResponse($event->message), $event->status); } -} +} \ No newline at end of file diff --git a/src/php/lib/Grpc/ServerStreamingSurfaceActiveCall.php b/src/php/lib/Grpc/ServerStreamingCall.php old mode 100755 new mode 100644 similarity index 67% rename from src/php/lib/Grpc/ServerStreamingSurfaceActiveCall.php rename to src/php/lib/Grpc/ServerStreamingCall.php index fd08e86e513..7458f28bcbf --- a/src/php/lib/Grpc/ServerStreamingSurfaceActiveCall.php +++ b/src/php/lib/Grpc/ServerStreamingCall.php @@ -39,36 +39,41 @@ require_once realpath(dirname(__FILE__) . '/../autoload.php'); * Represents an active call that sends a single message and then gets a stream * of reponses */ -class ServerStreamingSurfaceActiveCall extends AbstractSurfaceActiveCall { +class ServerStreamingCall extends AbstractCall { /** - * Create a new simple (single request/single response) active call. - * @param Channel $channel The channel to communicate on - * @param string $method The method to call on the remote server - * @param callable $deserialize The function to deserialize a value + * Start the call * @param $arg The argument to send * @param array $metadata Metadata to send with the call, if applicable */ - public function __construct(Channel $channel, - $method, - callable $deserialize, - $arg, - $metadata = array()) { - parent::__construct($channel, $method, $deserialize, $metadata, - \Grpc\WRITE_BUFFER_HINT); - $this->_write($arg); - $this->_writesDone(); + public function start($arg, $metadata = array()) { + $event = $this->call->start_batch([ + OP_SEND_INITIAL_METADATA => $metadata, + OP_RECV_INITIAL_METADATA => true, + OP_SEND_MESSAGE => $arg->serialize(), + OP_SEND_CLOSE_FROM_CLIENT => true]); + $this->metadata = $event->metadata; } /** * @return An iterator of response values */ public function responses() { - while(($response = $this->_read()) !== null) { - yield $response; + $response = $this->call->start_batch([OP_RECV_MESSAGE => true])->message; + while($response !== null) { + yield $this->deserializeResponse($response); + $response = $this->call->start_batch([OP_RECV_MESSAGE => true])->message; } } + /** + * Wait for the server to send the status, and return it. + * @return object The status object, with integer $code, string $details, + * and array $metadata members + */ public function getStatus() { - return $this->_getStatus(); + $status_event = $this->call->start_batch([ + OP_RECV_STATUS_ON_CLIENT => true + ]); + return $status_event->status; } -} +} \ No newline at end of file diff --git a/src/php/lib/Grpc/SimpleSurfaceActiveCall.php b/src/php/lib/Grpc/UnaryCall.php old mode 100755 new mode 100644 similarity index 70% rename from src/php/lib/Grpc/SimpleSurfaceActiveCall.php rename to src/php/lib/Grpc/UnaryCall.php index ba82f5704f5..bbf9cfb5883 --- a/src/php/lib/Grpc/SimpleSurfaceActiveCall.php +++ b/src/php/lib/Grpc/UnaryCall.php @@ -39,24 +39,19 @@ require_once realpath(dirname(__FILE__) . '/../autoload.php'); * Represents an active call that sends a single message and then gets a single * response. */ -class SimpleSurfaceActiveCall extends AbstractSurfaceActiveCall { +class UnaryCall extends AbstractCall { /** - * Create a new simple (single request/single response) active call. - * @param Channel $channel The channel to communicate on - * @param string $method The method to call on the remote server - * @param callable $deserialize The function to deserialize a value + * Start the call * @param $arg The argument to send * @param array $metadata Metadata to send with the call, if applicable */ - public function __construct(Channel $channel, - $method, - callable $deserialize, - $arg, - $metadata = array()) { - parent::__construct($channel, $method, $deserialize, $metadata, - \Grpc\WRITE_BUFFER_HINT); - $this->_write($arg); - $this->_writesDone(); + public function start($arg, $metadata = array()) { + $event = $this->call->start_batch([ + OP_SEND_INITIAL_METADATA => $metadata, + OP_RECV_INITIAL_METADATA => true, + OP_SEND_MESSAGE => $arg->serialize(), + OP_SEND_CLOSE_FROM_CLIENT => true]); + $this->metadata = $event->metadata; } /** @@ -64,8 +59,9 @@ class SimpleSurfaceActiveCall extends AbstractSurfaceActiveCall { * @return [response data, status] */ public function wait() { - $response = $this->_read(); - $status = $this->_getStatus(); - return array($response, $status); + $event = $this->call->start_batch([ + OP_RECV_MESSAGE => true, + OP_RECV_STATUS_ON_CLIENT => true]); + return array($this->deserializeResponse($event->message), $event->status); } -} +} \ No newline at end of file diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index 82ca4381690..7ee089e2415 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -132,8 +132,6 @@ function serverStreaming($stub) { } $call = $stub->StreamingOutputCall($request); - hardAssert($call->getStatus()->code === Grpc\STATUS_OK, - 'Call did not complete successfully'); $i = 0; foreach($call->responses() as $value) { hardAssert($i < 4, 'Too many responses'); @@ -142,7 +140,10 @@ function serverStreaming($stub) { 'Payload ' . $i . ' had the wrong type'); hardAssert(strlen($payload->getBody()) === $sizes[$i], 'Response ' . $i . ' had the wrong length'); + $i += 1; } + hardAssert($call->getStatus()->code === Grpc\STATUS_OK, + 'Call did not complete successfully'); } /** @@ -240,4 +241,6 @@ switch($args['test_case']) { break; case 'cancel_after_first_response': cancelAfterFirstResponse($stub); + default: + exit(1); } diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index a132ef45413..3cf6ddf2623 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -224,6 +224,24 @@ class CSharpLanguage(object): def __str__(self): return 'csharp' +class Build(object): + + def test_specs(self, config, travis): + return [] + + def make_targets(self): + return ['all'] + + def build_steps(self): + return [] + + def supports_multi_config(self): + return True + + def __str__(self): + return self.make_target + + # different configurations we can run under _CONFIGS = { 'dbg': SimpleConfig('dbg'), @@ -248,7 +266,8 @@ _LANGUAGES = { 'php': PhpLanguage(), 'python': PythonLanguage(), 'ruby': RubyLanguage(), - 'csharp': CSharpLanguage() + 'csharp': CSharpLanguage(), + 'build': Build(), } # parse command line diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj index 754c89985f1..a88eb34ab88 100644 --- a/vsprojects/vs2013/grpc.vcxproj +++ b/vsprojects/vs2013/grpc.vcxproj @@ -358,6 +358,8 @@ + + diff --git a/vsprojects/vs2013/grpc.vcxproj.filters b/vsprojects/vs2013/grpc.vcxproj.filters index 463a770fb8c..20dbe8c444f 100644 --- a/vsprojects/vs2013/grpc.vcxproj.filters +++ b/vsprojects/vs2013/grpc.vcxproj.filters @@ -253,6 +253,9 @@ src\core\surface + + src\core\surface + src\core\surface diff --git a/vsprojects/vs2013/grpc_shared.vcxproj b/vsprojects/vs2013/grpc_shared.vcxproj index 927d2051a2c..b673cc7bded 100644 --- a/vsprojects/vs2013/grpc_shared.vcxproj +++ b/vsprojects/vs2013/grpc_shared.vcxproj @@ -362,6 +362,8 @@ + + diff --git a/vsprojects/vs2013/grpc_shared.vcxproj.filters b/vsprojects/vs2013/grpc_shared.vcxproj.filters index 463a770fb8c..20dbe8c444f 100644 --- a/vsprojects/vs2013/grpc_shared.vcxproj.filters +++ b/vsprojects/vs2013/grpc_shared.vcxproj.filters @@ -253,6 +253,9 @@ src\core\surface + + src\core\surface + src\core\surface diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj b/vsprojects/vs2013/grpc_unsecure.vcxproj index e1c1bc890b3..98c14c2fdb4 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj @@ -302,6 +302,8 @@ + + diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters index fe966237cb2..4b758d61132 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters @@ -193,6 +193,9 @@ src\core\surface + + src\core\surface + src\core\surface