Merge github.com:grpc/grpc into bye-bye-completion-queue-pie

pull/1472/head
Craig Tiller 10 years ago
commit 54478f8562
  1. 43
      Makefile
  2. 18
      build.json
  3. 6
      include/grpc++/async_generic_service.h
  4. 7
      include/grpc++/completion_queue.h
  5. 37
      include/grpc++/impl/service_type.h
  6. 8
      include/grpc++/server.h
  7. 7
      include/grpc++/server_builder.h
  8. 18
      include/grpc/grpc.h
  9. 103
      src/compiler/cpp_generator.cc
  10. 65
      src/core/surface/server.c
  11. 3
      src/core/surface/server.h
  12. 5
      src/core/surface/server_create.c
  13. 10
      src/cpp/server/async_generic_service.cc
  14. 68
      src/cpp/server/server.cc
  15. 9
      src/cpp/server/server_builder.cc
  16. 6
      src/csharp/ext/grpc_csharp_ext.c
  17. 14
      src/node/examples/math.proto
  18. 24
      src/node/examples/route_guide.proto
  19. 8
      src/node/examples/stock.proto
  20. 6
      src/node/ext/server.cc
  21. 2
      src/node/interop/empty.proto
  22. 44
      src/node/interop/interop_client.js
  23. 23
      src/node/interop/interop_server.js
  24. 36
      src/node/interop/messages.proto
  25. 3
      src/node/interop/test.proto
  26. 4
      src/node/package.json
  27. 2
      src/node/src/common.js
  28. 39
      src/node/test/echo_service.proto
  29. 30
      src/node/test/surface_test.js
  30. 6
      src/node/test/test_service.proto
  31. 10
      src/php/ext/grpc/server.c
  32. 2
      src/python/README.md
  33. 7
      src/python/src/grpc/_adapter/_server.c
  34. 4
      src/ruby/ext/grpc/rb_server.c
  35. 11
      test/core/end2end/dualstack_socket_test.c
  36. 4
      test/core/end2end/fixtures/chttp2_fake_security.c
  37. 3
      test/core/end2end/fixtures/chttp2_fullstack.c
  38. 3
      test/core/end2end/fixtures/chttp2_fullstack_uds_posix.c
  39. 4
      test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
  40. 4
      test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
  41. 4
      test/core/end2end/fixtures/chttp2_socket_pair.c
  42. 4
      test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
  43. 7
      test/core/end2end/tests/cancel_after_accept.c
  44. 7
      test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
  45. 8
      test/core/end2end/tests/census_simple_request.c
  46. 8
      test/core/end2end/tests/disappearing_server.c
  47. 8
      test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
  48. 8
      test/core/end2end/tests/early_server_shutdown_finishes_tags.c
  49. 8
      test/core/end2end/tests/graceful_server_shutdown.c
  50. 8
      test/core/end2end/tests/invoke_large_request.c
  51. 24
      test/core/end2end/tests/max_concurrent_streams.c
  52. 8
      test/core/end2end/tests/max_message_length.c
  53. 8
      test/core/end2end/tests/ping_pong_streaming.c
  54. 8
      test/core/end2end/tests/registered_call.c
  55. 8
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
  56. 8
      test/core/end2end/tests/request_response_with_metadata_and_payload.c
  57. 8
      test/core/end2end/tests/request_response_with_payload.c
  58. 5
      test/core/end2end/tests/request_response_with_payload_and_call_creds.c
  59. 6
      test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
  60. 8
      test/core/end2end/tests/request_with_large_metadata.c
  61. 8
      test/core/end2end/tests/request_with_payload.c
  62. 8
      test/core/end2end/tests/simple_delayed_request.c
  63. 8
      test/core/end2end/tests/simple_request.c
  64. 8
      test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c
  65. 7
      test/core/fling/server.c
  66. 53
      test/cpp/end2end/async_end2end_test.cc
  67. 21
      test/cpp/end2end/generic_end2end_test.cc
  68. 79
      test/cpp/qps/async_streaming_ping_pong_test.cc
  69. 22
      test/cpp/qps/server_async.cc
  70. 62
      tools/gce_setup/cloud_prod_test.sh
  71. 63
      tools/gce_setup/grpc_docker.sh
  72. 67
      tools/gce_setup/interop_test.sh

File diff suppressed because one or more lines are too long

@ -1807,6 +1807,24 @@
"gpr" "gpr"
] ]
}, },
{
"name": "async_streaming_ping_pong_test",
"build": "test",
"run": false,
"language": "c++",
"src": [
"test/cpp/qps/async_streaming_ping_pong_test.cc"
],
"deps": [
"qps",
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
},
{ {
"name": "async_unary_ping_pong_test", "name": "async_unary_ping_pong_test",
"build": "test", "build": "test",

@ -65,10 +65,8 @@ class AsyncGenericService GRPC_FINAL {
void RequestCall(GenericServerContext* ctx, void RequestCall(GenericServerContext* ctx,
GenericServerAsyncReaderWriter* reader_writer, GenericServerAsyncReaderWriter* reader_writer,
CompletionQueue* cq, void* tag); CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag);
// The new rpc event should be obtained from this completion queue.
CompletionQueue* completion_queue();
private: private:
friend class Server; friend class Server;

@ -58,6 +58,7 @@ class ServerReaderWriter;
class CompletionQueue; class CompletionQueue;
class Server; class Server;
class ServerBuilder;
class ServerContext; class ServerContext;
class CompletionQueueTag { class CompletionQueueTag {
@ -137,6 +138,12 @@ class CompletionQueue : public GrpcLibrary {
grpc_completion_queue* cq_; // owned grpc_completion_queue* cq_; // owned
}; };
class ServerCompletionQueue : public CompletionQueue {
private:
friend class ServerBuilder;
ServerCompletionQueue() {}
};
} // namespace grpc } // namespace grpc
#endif // GRPCXX_COMPLETION_QUEUE_H #endif // GRPCXX_COMPLETION_QUEUE_H

@ -39,8 +39,10 @@
namespace grpc { namespace grpc {
class Call; class Call;
class CompletionQueue;
class RpcService; class RpcService;
class Server; class Server;
class ServerCompletionQueue;
class ServerContext; class ServerContext;
class Status; class Status;
@ -70,52 +72,55 @@ class AsynchronousService {
ServerContext* context, ServerContext* context,
::grpc::protobuf::Message* request, ::grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) = 0; CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) = 0;
}; };
AsynchronousService(CompletionQueue* cq, const char** method_names, AsynchronousService(const char** method_names, size_t method_count)
size_t method_count) : dispatch_impl_(nullptr),
: cq_(cq),
dispatch_impl_(nullptr),
method_names_(method_names), method_names_(method_names),
method_count_(method_count), method_count_(method_count),
request_args_(nullptr) {} request_args_(nullptr) {}
~AsynchronousService() { delete[] request_args_; } ~AsynchronousService() { delete[] request_args_; }
CompletionQueue* completion_queue() const { return cq_; }
protected: protected:
void RequestAsyncUnary(int index, ServerContext* context, void RequestAsyncUnary(int index, ServerContext* context,
grpc::protobuf::Message* request, grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) { CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
stream, cq, tag); stream, call_cq, notification_cq, tag);
} }
void RequestClientStreaming(int index, ServerContext* context, void RequestClientStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) { CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
stream, cq, tag); stream, call_cq, notification_cq, tag);
} }
void RequestServerStreaming(int index, ServerContext* context, void RequestServerStreaming(int index, ServerContext* context,
grpc::protobuf::Message* request, grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) { CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, dispatch_impl_->RequestAsyncCall(request_args_[index], context, request,
stream, cq, tag); stream, call_cq, notification_cq, tag);
} }
void RequestBidiStreaming(int index, ServerContext* context, void RequestBidiStreaming(int index, ServerContext* context,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) { CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr,
stream, cq, tag); stream, call_cq, notification_cq, tag);
} }
private: private:
friend class Server; friend class Server;
CompletionQueue* const cq_;
DispatchImpl* dispatch_impl_; DispatchImpl* dispatch_impl_;
const char** const method_names_; const char** const method_names_;
size_t method_count_; size_t method_count_;

@ -101,11 +101,15 @@ class Server GRPC_FINAL : public GrpcLibrary,
void RequestAsyncCall(void* registered_method, ServerContext* context, void RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request, grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) GRPC_OVERRIDE; CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) GRPC_OVERRIDE;
void RequestAsyncGenericCall(GenericServerContext* context, void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag); CompletionQueue* cq,
ServerCompletionQueue* notification_cq,
void* tag);
const int max_message_size_; const int max_message_size_;

@ -46,6 +46,7 @@ class AsynchronousService;
class CompletionQueue; class CompletionQueue;
class RpcService; class RpcService;
class Server; class Server;
class ServerCompletionQueue;
class ServerCredentials; class ServerCredentials;
class SynchronousService; class SynchronousService;
class ThreadPoolInterface; class ThreadPoolInterface;
@ -82,6 +83,11 @@ class ServerBuilder {
// Does not take ownership. // Does not take ownership.
void SetThreadPool(ThreadPoolInterface* thread_pool); void SetThreadPool(ThreadPoolInterface* thread_pool);
// Add a completion queue for handling asynchronous services
// Caller is required to keep this completion queue live until calling
// BuildAndStart()
std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
// Return a running server which is ready for processing rpcs. // Return a running server which is ready for processing rpcs.
std::unique_ptr<Server> BuildAndStart(); std::unique_ptr<Server> BuildAndStart();
@ -96,6 +102,7 @@ class ServerBuilder {
std::vector<RpcService*> services_; std::vector<RpcService*> services_;
std::vector<AsynchronousService*> async_services_; std::vector<AsynchronousService*> async_services_;
std::vector<Port> ports_; std::vector<Port> ports_;
std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_; std::shared_ptr<ServerCredentials> creds_;
AsyncGenericService* generic_service_; AsyncGenericService* generic_service_;
ThreadPoolInterface* thread_pool_; ThreadPoolInterface* thread_pool_;

@ -442,7 +442,8 @@ void grpc_call_destroy(grpc_call *call);
grpc_call_error grpc_server_request_call( grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata, grpc_metadata_array *request_metadata,
grpc_completion_queue *cq_bound_to_call, void *tag_new); grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag_new);
/* Registers a method in the server. /* Registers a method in the server.
Methods to this (host, method) pair will not be reported by Methods to this (host, method) pair will not be reported by
@ -452,21 +453,26 @@ grpc_call_error grpc_server_request_call(
Must be called before grpc_server_start. Must be called before grpc_server_start.
Returns NULL on failure. */ Returns NULL on failure. */
void *grpc_server_register_method(grpc_server *server, const char *method, void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host, const char *host);
grpc_completion_queue *new_call_cq);
/* Request notification of a new pre-registered call */ /* Request notification of a new pre-registered call */
grpc_call_error grpc_server_request_registered_call( grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call, grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *request_metadata, gpr_timespec *deadline, grpc_metadata_array *request_metadata,
grpc_byte_buffer **optional_payload, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call, void *tag_new); grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag_new);
/* Create a server. Additional configuration for each incoming channel can /* Create a server. Additional configuration for each incoming channel can
be specified with args. If no additional configuration is needed, args can be specified with args. If no additional configuration is needed, args can
be NULL. See grpc_channel_args for more. */ be NULL. See grpc_channel_args for more. */
grpc_server *grpc_server_create(grpc_completion_queue *cq, grpc_server *grpc_server_create(const grpc_channel_args *args);
const grpc_channel_args *args);
/* Register a completion queue with the server. Must be done for any completion
queue that is passed to grpc_server_request_* call. Must be performed prior
to grpc_server_start. */
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq);
/* Add a HTTP2 over plaintext over tcp listener. /* Add a HTTP2 over plaintext over tcp listener.
Returns bound port number on success, 0 on failure. Returns bound port number on success, 0 on failure.

@ -120,6 +120,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
"class CompletionQueue;\n" "class CompletionQueue;\n"
"class ChannelInterface;\n" "class ChannelInterface;\n"
"class RpcService;\n" "class RpcService;\n"
"class ServerCompletionQueue;\n"
"class ServerContext;\n" "class ServerContext;\n"
"} // namespace grpc\n\n"; "} // namespace grpc\n\n";
@ -499,30 +500,37 @@ void PrintHeaderServerMethodAsync(
(*vars)["Response"] = (*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true); grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) { if (NoStreaming(method)) {
printer->Print(*vars, printer->Print(
"void Request$Method$(" *vars,
"::grpc::ServerContext* context, $Request$* request, " "void Request$Method$("
"::grpc::ServerAsyncResponseWriter< $Response$>* response, " "::grpc::ServerContext* context, $Request$* request, "
"::grpc::CompletionQueue* cq, void *tag);\n"); "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(
"void Request$Method$(" *vars,
"::grpc::ServerContext* context, " "void Request$Method$("
"::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " "::grpc::ServerContext* context, "
"::grpc::CompletionQueue* cq, void *tag);\n"); "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(
"void Request$Method$(" *vars,
"::grpc::ServerContext* context, $Request$* request, " "void Request$Method$("
"::grpc::ServerAsyncWriter< $Response$>* writer, " "::grpc::ServerContext* context, $Request$* request, "
"::grpc::CompletionQueue* cq, void *tag);\n"); "::grpc::ServerAsyncWriter< $Response$>* writer, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
} else if (BidiStreaming(method)) { } else if (BidiStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
"void Request$Method$(" "void Request$Method$("
"::grpc::ServerContext* context, " "::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* cq, void *tag);\n"); "::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
} }
} }
@ -603,7 +611,7 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
" public:\n"); " public:\n");
printer->Indent(); printer->Indent();
(*vars)["MethodCount"] = as_string(service->method_count()); (*vars)["MethodCount"] = as_string(service->method_count());
printer->Print("explicit AsyncService(::grpc::CompletionQueue* cq);\n"); printer->Print("explicit AsyncService();\n");
printer->Print("~AsyncService() {};\n"); printer->Print("~AsyncService() {};\n");
for (int i = 0; i < service->method_count(); ++i) { for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderServerMethodAsync(printer, service->method(i), vars); PrintHeaderServerMethodAsync(printer, service->method(i), vars);
@ -878,36 +886,43 @@ void PrintSourceServerAsyncMethod(
(*vars)["Response"] = (*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true); grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) { if (NoStreaming(method)) {
printer->Print(*vars, printer->Print(
"void $ns$$Service$::AsyncService::Request$Method$(" *vars,
"::grpc::ServerContext* context, " "void $ns$$Service$::AsyncService::Request$Method$("
"$Request$* request, " "::grpc::ServerContext* context, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, " "$Request$* request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars, printer->Print(*vars,
" AsynchronousService::RequestAsyncUnary($Idx$, context, " " AsynchronousService::RequestAsyncUnary($Idx$, context, "
"request, response, cq, tag);\n"); "request, response, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n"); printer->Print("}\n\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(
"void $ns$$Service$::AsyncService::Request$Method$(" *vars,
"::grpc::ServerContext* context, " "void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " "::grpc::ServerContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
"::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars, printer->Print(*vars,
" AsynchronousService::RequestClientStreaming($Idx$, " " AsynchronousService::RequestClientStreaming($Idx$, "
"context, reader, cq, tag);\n"); "context, reader, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n"); printer->Print("}\n\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(
"void $ns$$Service$::AsyncService::Request$Method$(" *vars,
"::grpc::ServerContext* context, " "void $ns$$Service$::AsyncService::Request$Method$("
"$Request$* request, " "::grpc::ServerContext* context, "
"::grpc::ServerAsyncWriter< $Response$>* writer, " "$Request$* request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::ServerAsyncWriter< $Response$>* writer, "
printer->Print(*vars, "::grpc::CompletionQueue* new_call_cq, "
" AsynchronousService::RequestServerStreaming($Idx$, " "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
"context, request, writer, cq, tag);\n"); printer->Print(
*vars,
" AsynchronousService::RequestServerStreaming($Idx$, "
"context, request, writer, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n"); printer->Print("}\n\n");
} else if (BidiStreaming(method)) { } else if (BidiStreaming(method)) {
printer->Print( printer->Print(
@ -915,10 +930,11 @@ void PrintSourceServerAsyncMethod(
"void $ns$$Service$::AsyncService::Request$Method$(" "void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, " "::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* cq, void *tag) {\n"); "::grpc::CompletionQueue* new_call_cq, "
"::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
printer->Print(*vars, printer->Print(*vars,
" AsynchronousService::RequestBidiStreaming($Idx$, " " AsynchronousService::RequestBidiStreaming($Idx$, "
"context, stream, cq, tag);\n"); "context, stream, new_call_cq, notification_cq, tag);\n");
printer->Print("}\n\n"); printer->Print("}\n\n");
} }
} }
@ -980,9 +996,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
(*vars)["MethodCount"] = as_string(service->method_count()); (*vars)["MethodCount"] = as_string(service->method_count());
printer->Print(*vars, printer->Print(*vars,
"$ns$$Service$::AsyncService::AsyncService(::grpc::" "$ns$$Service$::AsyncService::AsyncService() : "
"CompletionQueue* cq) : " "::grpc::AsynchronousService("
"::grpc::AsynchronousService(cq, "
"$prefix$$Service$_method_names, $MethodCount$) " "$prefix$$Service$_method_names, $MethodCount$) "
"{}\n\n"); "{}\n\n");

@ -74,16 +74,15 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct { typedef struct {
requested_call_type type; requested_call_type type;
void *tag; void *tag;
grpc_completion_queue *cq_bound_to_call;
grpc_completion_queue *cq_for_notification;
grpc_call **call;
union { union {
struct { struct {
grpc_completion_queue *cq_bind;
grpc_call **call;
grpc_call_details *details; grpc_call_details *details;
grpc_metadata_array *initial_metadata; grpc_metadata_array *initial_metadata;
} batch; } batch;
struct { struct {
grpc_completion_queue *cq_bind;
grpc_call **call;
registered_method *registered_method; registered_method *registered_method;
gpr_timespec *deadline; gpr_timespec *deadline;
grpc_metadata_array *initial_metadata; grpc_metadata_array *initial_metadata;
@ -103,7 +102,6 @@ struct registered_method {
char *host; char *host;
call_data *pending; call_data *pending;
requested_call_array requested; requested_call_array requested;
grpc_completion_queue *cq;
registered_method *next; registered_method *next;
}; };
@ -130,7 +128,6 @@ struct grpc_server {
size_t channel_filter_count; size_t channel_filter_count;
const grpc_channel_filter **channel_filters; const grpc_channel_filter **channel_filters;
grpc_channel_args *channel_args; grpc_channel_args *channel_args;
grpc_completion_queue *unregistered_cq;
grpc_completion_queue **cqs; grpc_completion_queue **cqs;
grpc_pollset **pollsets; grpc_pollset **pollsets;
@ -600,7 +597,8 @@ static const grpc_channel_filter server_surface_filter = {
destroy_channel_elem, "server", destroy_channel_elem, "server",
}; };
static void addcq(grpc_server *server, grpc_completion_queue *cq) { void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq) {
size_t i, n; size_t i, n;
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return; if (server->cqs[i] == cq) return;
@ -612,8 +610,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) {
server->cqs[n] = cq; server->cqs[n] = cq;
} }
grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
grpc_channel_filter **filters,
size_t filter_count, size_t filter_count,
const grpc_channel_args *args) { const grpc_channel_args *args) {
size_t i; size_t i;
@ -624,12 +621,10 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
memset(server, 0, sizeof(grpc_server)); memset(server, 0, sizeof(grpc_server));
if (cq) addcq(server, cq);
gpr_mu_init(&server->mu); gpr_mu_init(&server->mu);
gpr_cv_init(&server->cv); gpr_cv_init(&server->cv);
server->unregistered_cq = cq;
/* decremented by grpc_server_destroy */ /* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1); gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev = server->root_channel_data.next = server->root_channel_data.prev =
@ -665,8 +660,7 @@ static int streq(const char *a, const char *b) {
} }
void *grpc_server_register_method(grpc_server *server, const char *method, void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host, const char *host) {
grpc_completion_queue *cq_new_rpc) {
registered_method *m; registered_method *m;
if (!method) { if (!method) {
gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
@ -679,13 +673,11 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
return NULL; return NULL;
} }
} }
addcq(server, cq_new_rpc);
m = gpr_malloc(sizeof(registered_method)); m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m)); memset(m, 0, sizeof(*m));
m->method = gpr_strdup(method); m->method = gpr_strdup(method);
m->host = gpr_strdup(host); m->host = gpr_strdup(host);
m->next = server->registered_methods; m->next = server->registered_methods;
m->cq = cq_new_rpc;
server->registered_methods = m; server->registered_methods = m;
return m; return m;
} }
@ -1010,17 +1002,18 @@ static grpc_call_error queue_call_request(grpc_server *server,
} }
} }
grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_call(
grpc_call_details *details, grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *initial_metadata, grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bind, grpc_completion_queue *cq_bound_to_call,
void *tag) { grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc; requested_call rc;
grpc_cq_begin_op(server->unregistered_cq, NULL); grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = BATCH_CALL; rc.type = BATCH_CALL;
rc.tag = tag; rc.tag = tag;
rc.data.batch.cq_bind = cq_bind; rc.cq_bound_to_call = cq_bound_to_call;
rc.data.batch.call = call; rc.cq_for_notification = cq_for_notification;
rc.call = call;
rc.data.batch.details = details; rc.data.batch.details = details;
rc.data.batch.initial_metadata = initial_metadata; rc.data.batch.initial_metadata = initial_metadata;
return queue_call_request(server, &rc); return queue_call_request(server, &rc);
@ -1029,14 +1022,16 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_error grpc_server_request_registered_call( grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline, grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bind, void *tag) { grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc; requested_call rc;
registered_method *registered_method = rm; registered_method *registered_method = rm;
grpc_cq_begin_op(registered_method->cq, NULL); grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = REGISTERED_CALL; rc.type = REGISTERED_CALL;
rc.tag = tag; rc.tag = tag;
rc.data.registered.cq_bind = cq_bind; rc.cq_bound_to_call = cq_bound_to_call;
rc.data.registered.call = call; rc.cq_for_notification = cq_for_notification;
rc.call = call;
rc.data.registered.registered_method = registered_method; rc.data.registered.registered_method = registered_method;
rc.data.registered.deadline = deadline; rc.data.registered.deadline = deadline;
rc.data.registered.initial_metadata = initial_metadata; rc.data.registered.initial_metadata = initial_metadata;
@ -1073,6 +1068,9 @@ static void begin_call(grpc_server *server, call_data *calld,
fill in the metadata array passed by the client, we need to perform fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */ an ioreq op, that should complete immediately. */
grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
*rc->call = calld->call;
calld->cq_new = rc->cq_for_notification;
switch (rc->type) { switch (rc->type) {
case BATCH_CALL: case BATCH_CALL:
cpstr(&rc->data.batch.details->host, cpstr(&rc->data.batch.details->host,
@ -1080,18 +1078,13 @@ static void begin_call(grpc_server *server, call_data *calld,
cpstr(&rc->data.batch.details->method, cpstr(&rc->data.batch.details->method,
&rc->data.batch.details->method_capacity, calld->path); &rc->data.batch.details->method_capacity, calld->path);
rc->data.batch.details->deadline = calld->deadline; rc->data.batch.details->deadline = calld->deadline;
grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
*rc->data.batch.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata; r->data.recv_metadata = rc->data.batch.initial_metadata;
r++; r++;
calld->cq_new = server->unregistered_cq;
publish = publish_registered_or_batch; publish = publish_registered_or_batch;
break; break;
case REGISTERED_CALL: case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline; *rc->data.registered.deadline = calld->deadline;
grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
*rc->data.registered.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.registered.initial_metadata; r->data.recv_metadata = rc->data.registered.initial_metadata;
r++; r++;
@ -1100,7 +1093,6 @@ static void begin_call(grpc_server *server, call_data *calld,
r->data.recv_message = rc->data.registered.optional_payload; r->data.recv_message = rc->data.registered.optional_payload;
r++; r++;
} }
calld->cq_new = rc->data.registered.registered_method->cq;
publish = publish_registered_or_batch; publish = publish_registered_or_batch;
break; break;
} }
@ -1111,19 +1103,16 @@ static void begin_call(grpc_server *server, call_data *calld,
} }
static void fail_call(grpc_server *server, requested_call *rc) { static void fail_call(grpc_server *server, requested_call *rc) {
*rc->call = NULL;
switch (rc->type) { switch (rc->type) {
case BATCH_CALL: case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0; rc->data.batch.initial_metadata->count = 0;
grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, 0);
break; break;
case REGISTERED_CALL: case REGISTERED_CALL:
*rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0; rc->data.registered.initial_metadata->count = 0;
grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL,
0);
break; break;
} }
grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
} }
static void publish_registered_or_batch(grpc_call *call, int success, static void publish_registered_or_batch(grpc_call *call, int success,

@ -39,8 +39,7 @@
#include "src/core/transport/transport.h" #include "src/core/transport/transport.h"
/* Create a server */ /* Create a server */
grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
grpc_channel_filter **filters,
size_t filter_count, size_t filter_count,
const grpc_channel_args *args); const grpc_channel_args *args);

@ -35,7 +35,6 @@
#include "src/core/surface/completion_queue.h" #include "src/core/surface/completion_queue.h"
#include "src/core/surface/server.h" #include "src/core/surface/server.h"
grpc_server *grpc_server_create(grpc_completion_queue *cq, grpc_server *grpc_server_create(const grpc_channel_args *args) {
const grpc_channel_args *args) { return grpc_server_create_from_filters(NULL, 0, args);
return grpc_server_create_from_filters(cq, NULL, 0, args);
} }

@ -39,12 +39,10 @@ namespace grpc {
void AsyncGenericService::RequestCall( void AsyncGenericService::RequestCall(
GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer,
CompletionQueue* cq, void* tag) { CompletionQueue* call_cq, ServerCompletionQueue* notification_cq,
server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag); void* tag) {
} server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq,
tag);
CompletionQueue* AsyncGenericService::completion_queue() {
return &server_->cq_;
} }
} // namespace grpc } // namespace grpc

@ -78,7 +78,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd; return mrd;
} }
void Request(grpc_server* server) { void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(!in_flight_); GPR_ASSERT(!in_flight_);
in_flight_ = true; in_flight_ = true;
cq_ = grpc_completion_queue_create(); cq_ = grpc_completion_queue_create();
@ -86,7 +86,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_server_request_registered_call( grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_, server, tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_, has_request_payload_ ? &request_payload_ : nullptr, cq_,
this)); notify_cq, this));
} }
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
@ -179,16 +179,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_; grpc_completion_queue* cq_;
}; };
grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) { static grpc_server* CreateServer(int max_message_size) {
if (max_message_size > 0) { if (max_message_size > 0) {
grpc_arg arg; grpc_arg arg;
arg.type = GRPC_ARG_INTEGER; arg.type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
arg.value.integer = max_message_size; arg.value.integer = max_message_size;
grpc_channel_args args = {1, &arg}; grpc_channel_args args = {1, &arg};
return grpc_server_create(cq, &args); return grpc_server_create(&args);
} else { } else {
return grpc_server_create(cq, nullptr); return grpc_server_create(nullptr);
} }
} }
@ -199,9 +199,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
shutdown_(false), shutdown_(false),
num_running_cb_(0), num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>), sync_methods_(new std::list<SyncRequest>),
server_(CreateServer(cq_.cq(), max_message_size)), server_(CreateServer(max_message_size)),
thread_pool_(thread_pool), thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {} thread_pool_owned_(thread_pool_owned) {
grpc_server_register_completion_queue(server_, cq_.cq());
}
Server::~Server() { Server::~Server() {
{ {
@ -221,8 +223,7 @@ Server::~Server() {
bool Server::RegisterService(RpcService* service) { bool Server::RegisterService(RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) { for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i); RpcServiceMethod* method = service->GetMethod(i);
void* tag = void* tag = grpc_server_register_method(server_, method->name(), nullptr);
grpc_server_register_method(server_, method->name(), nullptr, cq_.cq());
if (!tag) { if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name()); method->name());
@ -240,9 +241,8 @@ bool Server::RegisterAsyncService(AsynchronousService* service) {
service->dispatch_impl_ = this; service->dispatch_impl_ = this;
service->request_args_ = new void*[service->method_count_]; service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) { for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = void* tag = grpc_server_register_method(server_, service->method_names_[i],
grpc_server_register_method(server_, service->method_names_[i], nullptr, nullptr);
service->completion_queue()->cq());
if (!tag) { if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
service->method_names_[i]); service->method_names_[i]);
@ -273,7 +273,7 @@ bool Server::Start() {
// Start processing rpcs. // Start processing rpcs.
if (!sync_methods_->empty()) { if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
m->Request(server_); m->Request(server_, cq_.cq());
} }
ScheduleCallback(); ScheduleCallback();
@ -316,12 +316,12 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
public: public:
AsyncRequest(Server* server, void* registered_method, ServerContext* ctx, AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
grpc::protobuf::Message* request, grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, CompletionQueue* cq, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
void* tag) ServerCompletionQueue* notification_cq, void* tag)
: tag_(tag), : tag_(tag),
request_(request), request_(request),
stream_(stream), stream_(stream),
cq_(cq), call_cq_(call_cq),
ctx_(ctx), ctx_(ctx),
generic_ctx_(nullptr), generic_ctx_(nullptr),
server_(server), server_(server),
@ -329,18 +329,21 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
payload_(nullptr) { payload_(nullptr) {
memset(&array_, 0, sizeof(array_)); memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_); grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
grpc_server_request_registered_call( grpc_server_request_registered_call(
server->server_, registered_method, &call_, &call_details_.deadline, server->server_, registered_method, &call_, &call_details_.deadline,
&array_, request ? &payload_ : nullptr, cq->cq(), this); &array_, request ? &payload_ : nullptr, call_cq->cq(),
notification_cq->cq(), this);
} }
AsyncRequest(Server* server, GenericServerContext* ctx, AsyncRequest(Server* server, GenericServerContext* ctx,
ServerAsyncStreamingInterface* stream, CompletionQueue* cq, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
void* tag) ServerCompletionQueue* notification_cq, void* tag)
: tag_(tag), : tag_(tag),
request_(nullptr), request_(nullptr),
stream_(stream), stream_(stream),
cq_(cq), call_cq_(call_cq),
ctx_(nullptr), ctx_(nullptr),
generic_ctx_(ctx), generic_ctx_(ctx),
server_(server), server_(server),
@ -348,8 +351,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
payload_(nullptr) { payload_(nullptr) {
memset(&array_, 0, sizeof(array_)); memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_); grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
grpc_server_request_call(server->server_, &call_, &call_details_, &array_, grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
cq->cq(), this); call_cq->cq(), notification_cq->cq(), this);
} }
~AsyncRequest() { ~AsyncRequest() {
@ -392,8 +397,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
} }
} }
ctx->call_ = call_; ctx->call_ = call_;
ctx->cq_ = cq_; ctx->cq_ = call_cq_;
Call call(call_, server_, cq_, server_->max_message_size_); Call call(call_, server_, call_cq_, server_->max_message_size_);
if (orig_status && call_) { if (orig_status && call_) {
ctx->BeginCompletionOp(&call); ctx->BeginCompletionOp(&call);
} }
@ -407,7 +412,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_; void* const tag_;
grpc::protobuf::Message* const request_; grpc::protobuf::Message* const request_;
ServerAsyncStreamingInterface* const stream_; ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const cq_; CompletionQueue* const call_cq_;
ServerContext* const ctx_; ServerContext* const ctx_;
GenericServerContext* const generic_ctx_; GenericServerContext* const generic_ctx_;
Server* const server_; Server* const server_;
@ -420,14 +425,19 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
void Server::RequestAsyncCall(void* registered_method, ServerContext* context, void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
grpc::protobuf::Message* request, grpc::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) { CompletionQueue* call_cq,
new AsyncRequest(this, registered_method, context, request, stream, cq, tag); ServerCompletionQueue* notification_cq,
void* tag) {
new AsyncRequest(this, registered_method, context, request, stream, call_cq,
notification_cq, tag);
} }
void Server::RequestAsyncGenericCall(GenericServerContext* context, void Server::RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) { CompletionQueue* call_cq,
new AsyncRequest(this, context, stream, cq, tag); ServerCompletionQueue* notification_cq,
void* tag) {
new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
} }
void Server::ScheduleCallback() { void Server::ScheduleCallback() {
@ -449,7 +459,7 @@ void Server::RunRpc() {
{ {
grpc::unique_lock<grpc::mutex> lock(mu_); grpc::unique_lock<grpc::mutex> lock(mu_);
if (!shutdown_) { if (!shutdown_) {
mrd->Request(server_); mrd->Request(server_, cq_.cq());
} }
} }
cd.Run(); cd.Run();

@ -44,6 +44,12 @@ namespace grpc {
ServerBuilder::ServerBuilder() ServerBuilder::ServerBuilder()
: max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
ServerCompletionQueue* cq = new ServerCompletionQueue();
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
void ServerBuilder::RegisterService(SynchronousService* service) { void ServerBuilder::RegisterService(SynchronousService* service) {
services_.push_back(service->service()); services_.push_back(service->service());
} }
@ -88,6 +94,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
} }
std::unique_ptr<Server> server( std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_)); new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq());
}
for (auto service = services_.begin(); service != services_.end(); for (auto service = services_.begin(); service != services_.end();
service++) { service++) {
if (!server->RegisterService(*service)) { if (!server->RegisterService(*service)) {

@ -663,7 +663,9 @@ grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) {
GPR_EXPORT grpc_server *GPR_CALLTYPE GPR_EXPORT grpc_server *GPR_CALLTYPE
grpcsharp_server_create(grpc_completion_queue *cq, grpcsharp_server_create(grpc_completion_queue *cq,
const grpc_channel_args *args) { const grpc_channel_args *args) {
return grpc_server_create(cq, args); grpc_server *server = grpc_server_create(args);
grpc_server_register_completion_queue(server, cq);
return server;
} }
GPR_EXPORT gpr_int32 GPR_CALLTYPE GPR_EXPORT gpr_int32 GPR_CALLTYPE
@ -699,7 +701,7 @@ grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
return grpc_server_request_call( return grpc_server_request_call(
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details), server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, ctx); &(ctx->server_rpc_new.request_metadata), cq, cq, ctx);
} }
/* Security */ /* Security */

@ -33,25 +33,25 @@ syntax = "proto3";
package math; package math;
message DivArgs { message DivArgs {
optional int64 dividend = 1; int64 dividend = 1;
optional int64 divisor = 2; int64 divisor = 2;
} }
message DivReply { message DivReply {
optional int64 quotient = 1; int64 quotient = 1;
optional int64 remainder = 2; int64 remainder = 2;
} }
message FibArgs { message FibArgs {
optional int64 limit = 1; int64 limit = 1;
} }
message Num { message Num {
optional int64 num = 1; int64 num = 1;
} }
message FibReply { message FibReply {
optional int64 count = 1; int64 count = 1;
} }
service Math { service Math {

@ -66,18 +66,18 @@ service RouteGuide {
// Latitudes should be in the range +/- 90 degrees and longitude should be in // Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive). // the range +/- 180 degrees (inclusive).
message Point { message Point {
optional int32 latitude = 1; int32 latitude = 1;
optional int32 longitude = 2; int32 longitude = 2;
} }
// A latitude-longitude rectangle, represented as two diagonally opposite // A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi". // points "lo" and "hi".
message Rectangle { message Rectangle {
// One corner of the rectangle. // One corner of the rectangle.
optional Point lo = 1; Point lo = 1;
// The other corner of the rectangle. // The other corner of the rectangle.
optional Point hi = 2; Point hi = 2;
} }
// A feature names something at a given point. // A feature names something at a given point.
@ -85,19 +85,19 @@ message Rectangle {
// If a feature could not be named, the name is empty. // If a feature could not be named, the name is empty.
message Feature { message Feature {
// The name of the feature. // The name of the feature.
optional string name = 1; string name = 1;
// The point where the feature is detected. // The point where the feature is detected.
optional Point location = 2; Point location = 2;
} }
// A RouteNote is a message sent while at a given point. // A RouteNote is a message sent while at a given point.
message RouteNote { message RouteNote {
// The location from which the message is sent. // The location from which the message is sent.
optional Point location = 1; Point location = 1;
// The message to be sent. // The message to be sent.
optional string message = 2; string message = 2;
} }
// A RouteSummary is received in response to a RecordRoute rpc. // A RouteSummary is received in response to a RecordRoute rpc.
@ -107,14 +107,14 @@ message RouteNote {
// the distance between each point. // the distance between each point.
message RouteSummary { message RouteSummary {
// The number of points received. // The number of points received.
optional int32 point_count = 1; int32 point_count = 1;
// The number of known features passed while traversing the route. // The number of known features passed while traversing the route.
optional int32 feature_count = 2; int32 feature_count = 2;
// The distance covered in metres. // The distance covered in metres.
optional int32 distance = 3; int32 distance = 3;
// The duration of the traversal in seconds. // The duration of the traversal in seconds.
optional int32 elapsed_time = 4; int32 elapsed_time = 4;
} }

@ -33,13 +33,13 @@ package examples;
// Protocol type definitions // Protocol type definitions
message StockRequest { message StockRequest {
optional string symbol = 1; string symbol = 1;
optional int32 num_trades_to_watch = 2 [default=0]; int32 num_trades_to_watch = 2;
} }
message StockReply { message StockReply {
optional float price = 1; float price = 1;
optional string symbol = 2; string symbol = 2;
} }

@ -161,7 +161,7 @@ NAN_METHOD(Server::New) {
grpc_server *wrapped_server; grpc_server *wrapped_server;
grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue(); grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue();
if (args[0]->IsUndefined()) { if (args[0]->IsUndefined()) {
wrapped_server = grpc_server_create(queue, NULL); wrapped_server = grpc_server_create(NULL);
} else if (args[0]->IsObject()) { } else if (args[0]->IsObject()) {
Handle<Object> args_hash(args[0]->ToObject()); Handle<Object> args_hash(args[0]->ToObject());
Handle<Array> keys(args_hash->GetOwnPropertyNames()); Handle<Array> keys(args_hash->GetOwnPropertyNames());
@ -190,11 +190,12 @@ NAN_METHOD(Server::New) {
return NanThrowTypeError("Arg values must be strings"); return NanThrowTypeError("Arg values must be strings");
} }
} }
wrapped_server = grpc_server_create(queue, &channel_args); wrapped_server = grpc_server_create(&channel_args);
free(channel_args.args); free(channel_args.args);
} else { } else {
return NanThrowTypeError("Server expects an object"); return NanThrowTypeError("Server expects an object");
} }
grpc_server_register_completion_queue(wrapped_server, queue);
Server *server = new Server(wrapped_server); Server *server = new Server(wrapped_server);
server->Wrap(args.This()); server->Wrap(args.This());
NanReturnValue(args.This()); NanReturnValue(args.This());
@ -212,6 +213,7 @@ NAN_METHOD(Server::RequestCall) {
grpc_call_error error = grpc_server_request_call( grpc_call_error error = grpc_server_request_call(
server->wrapped_server, &op->call, &op->details, &op->request_metadata, server->wrapped_server, &op->call, &op->details, &op->request_metadata,
CompletionQueueAsyncWorker::GetQueue(), CompletionQueueAsyncWorker::GetQueue(),
CompletionQueueAsyncWorker::GetQueue(),
new struct tag(new NanCallback(args[0].As<Function>()), ops.release(), new struct tag(new NanCallback(args[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr))); shared_ptr<Resources>(nullptr)));
if (error != GRPC_CALL_OK) { if (error != GRPC_CALL_OK) {

@ -28,7 +28,7 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto2"; syntax = "proto3";
package grpc.testing; package grpc.testing;

@ -86,7 +86,7 @@ function emptyUnary(client, done) {
*/ */
function largeUnary(client, done) { function largeUnary(client, done) {
var arg = { var arg = {
response_type: testProto.PayloadType.COMPRESSABLE, response_type: 'COMPRESSABLE',
response_size: 314159, response_size: 314159,
payload: { payload: {
body: zeroBuffer(271828) body: zeroBuffer(271828)
@ -94,9 +94,8 @@ function largeUnary(client, done) {
}; };
var call = client.unaryCall(arg, function(err, resp) { var call = client.unaryCall(arg, function(err, resp) {
assert.ifError(err); assert.ifError(err);
assert.strictEqual(resp.payload.type, testProto.PayloadType.COMPRESSABLE); assert.strictEqual(resp.payload.type, 'COMPRESSABLE');
assert.strictEqual(resp.payload.body.limit - resp.payload.body.offset, assert.strictEqual(resp.payload.body.length, 314159);
314159);
}); });
call.on('status', function(status) { call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK); assert.strictEqual(status.code, grpc.status.OK);
@ -138,7 +137,7 @@ function clientStreaming(client, done) {
*/ */
function serverStreaming(client, done) { function serverStreaming(client, done) {
var arg = { var arg = {
response_type: testProto.PayloadType.COMPRESSABLE, response_type: 'COMPRESSABLE',
response_parameters: [ response_parameters: [
{size: 31415}, {size: 31415},
{size: 9}, {size: 9},
@ -150,8 +149,8 @@ function serverStreaming(client, done) {
var resp_index = 0; var resp_index = 0;
call.on('data', function(value) { call.on('data', function(value) {
assert(resp_index < 4); assert(resp_index < 4);
assert.strictEqual(value.payload.type, testProto.PayloadType.COMPRESSABLE); assert.strictEqual(value.payload.type, 'COMPRESSABLE');
assert.strictEqual(value.payload.body.limit - value.payload.body.offset, assert.strictEqual(value.payload.body.length,
arg.response_parameters[resp_index].size); arg.response_parameters[resp_index].size);
resp_index += 1; resp_index += 1;
}); });
@ -182,23 +181,21 @@ function pingPong(client, done) {
}); });
var index = 0; var index = 0;
call.write({ call.write({
response_type: testProto.PayloadType.COMPRESSABLE, response_type: 'COMPRESSABLE',
response_parameters: [ response_parameters: [
{size: response_sizes[index]} {size: response_sizes[index]}
], ],
payload: {body: zeroBuffer(payload_sizes[index])} payload: {body: zeroBuffer(payload_sizes[index])}
}); });
call.on('data', function(response) { call.on('data', function(response) {
assert.strictEqual(response.payload.type, assert.strictEqual(response.payload.type, 'COMPRESSABLE');
testProto.PayloadType.COMPRESSABLE); assert.equal(response.payload.body.length, response_sizes[index]);
assert.equal(response.payload.body.limit - response.payload.body.offset,
response_sizes[index]);
index += 1; index += 1;
if (index === 4) { if (index === 4) {
call.end(); call.end();
} else { } else {
call.write({ call.write({
response_type: testProto.PayloadType.COMPRESSABLE, response_type: 'COMPRESSABLE',
response_parameters: [ response_parameters: [
{size: response_sizes[index]} {size: response_sizes[index]}
], ],
@ -251,7 +248,7 @@ function cancelAfterBegin(client, done) {
function cancelAfterFirstResponse(client, done) { function cancelAfterFirstResponse(client, done) {
var call = client.fullDuplexCall(); var call = client.fullDuplexCall();
call.write({ call.write({
response_type: testProto.PayloadType.COMPRESSABLE, response_type: 'COMPRESSABLE',
response_parameters: [ response_parameters: [
{size: 31415} {size: 31415}
], ],
@ -270,18 +267,19 @@ function cancelAfterFirstResponse(client, done) {
* Run one of the authentication tests. * Run one of the authentication tests.
* @param {string} expected_user The expected username in the response * @param {string} expected_user The expected username in the response
* @param {Client} client The client to test against * @param {Client} client The client to test against
* @param {?string} scope The scope to apply to the credentials
* @param {function} done Callback to call when the test is completed. Included * @param {function} done Callback to call when the test is completed. Included
* primarily for use with mocha * primarily for use with mocha
*/ */
function authTest(expected_user, client, done) { function authTest(expected_user, client, scope, done) {
(new GoogleAuth()).getApplicationDefault(function(err, credential) { (new GoogleAuth()).getApplicationDefault(function(err, credential) {
assert.ifError(err); assert.ifError(err);
if (credential.createScopedRequired()) { if (credential.createScopedRequired() && scope) {
credential = credential.createScoped(AUTH_SCOPE); credential = credential.createScoped(scope);
} }
client.updateMetadata = grpc.getGoogleAuthDelegate(credential); client.updateMetadata = grpc.getGoogleAuthDelegate(credential);
var arg = { var arg = {
response_type: testProto.PayloadType.COMPRESSABLE, response_type: 'COMPRESSABLE',
response_size: 314159, response_size: 314159,
payload: { payload: {
body: zeroBuffer(271828) body: zeroBuffer(271828)
@ -291,9 +289,8 @@ function authTest(expected_user, client, done) {
}; };
var call = client.unaryCall(arg, function(err, resp) { var call = client.unaryCall(arg, function(err, resp) {
assert.ifError(err); assert.ifError(err);
assert.strictEqual(resp.payload.type, testProto.PayloadType.COMPRESSABLE); assert.strictEqual(resp.payload.type, 'COMPRESSABLE');
assert.strictEqual(resp.payload.body.limit - resp.payload.body.offset, assert.strictEqual(resp.payload.body.length, 314159);
314159);
assert.strictEqual(resp.username, expected_user); assert.strictEqual(resp.username, expected_user);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE); assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
}); });
@ -318,8 +315,9 @@ var test_cases = {
empty_stream: emptyStream, empty_stream: emptyStream,
cancel_after_begin: cancelAfterBegin, cancel_after_begin: cancelAfterBegin,
cancel_after_first_response: cancelAfterFirstResponse, cancel_after_first_response: cancelAfterFirstResponse,
compute_engine_creds: _.partial(authTest, COMPUTE_ENGINE_USER), compute_engine_creds: _.partial(authTest, COMPUTE_ENGINE_USER, null),
service_account_creds: _.partial(authTest, AUTH_USER) service_account_creds: _.partial(authTest, AUTH_USER, AUTH_SCOPE),
jwt_token_creds: _.partial(authTest, AUTH_USER, null)
}; };
/** /**

@ -72,10 +72,9 @@ function handleUnary(call, callback) {
var req = call.request; var req = call.request;
var zeros = zeroBuffer(req.response_size); var zeros = zeroBuffer(req.response_size);
var payload_type = req.response_type; var payload_type = req.response_type;
if (payload_type === testProto.PayloadType.RANDOM) { if (payload_type === 'RANDOM') {
payload_type = [ payload_type = ['COMPRESSABLE',
testProto.PayloadType.COMPRESSABLE, 'UNCOMPRESSABLE'][Math.random() < 0.5 ? 0 : 1];
testProto.PayloadType.UNCOMPRESSABLE][Math.random() < 0.5 ? 0 : 1];
} }
callback(null, {payload: {type: payload_type, body: zeros}}); callback(null, {payload: {type: payload_type, body: zeros}});
} }
@ -89,7 +88,7 @@ function handleUnary(call, callback) {
function handleStreamingInput(call, callback) { function handleStreamingInput(call, callback) {
var aggregate_size = 0; var aggregate_size = 0;
call.on('data', function(value) { call.on('data', function(value) {
aggregate_size += value.payload.body.limit - value.payload.body.offset; aggregate_size += value.payload.body.length;
}); });
call.on('end', function() { call.on('end', function() {
callback(null, {aggregated_payload_size: aggregate_size}); callback(null, {aggregated_payload_size: aggregate_size});
@ -103,10 +102,9 @@ function handleStreamingInput(call, callback) {
function handleStreamingOutput(call) { function handleStreamingOutput(call) {
var req = call.request; var req = call.request;
var payload_type = req.response_type; var payload_type = req.response_type;
if (payload_type === testProto.PayloadType.RANDOM) { if (payload_type === 'RANDOM') {
payload_type = [ payload_type = ['COMPRESSABLE',
testProto.PayloadType.COMPRESSABLE, 'UNCOMPRESSABLE'][Math.random() < 0.5 ? 0 : 1];
testProto.PayloadType.UNCOMPRESSABLE][Math.random() < 0.5 ? 0 : 1];
} }
_.each(req.response_parameters, function(resp_param) { _.each(req.response_parameters, function(resp_param) {
call.write({ call.write({
@ -127,10 +125,9 @@ function handleStreamingOutput(call) {
function handleFullDuplex(call) { function handleFullDuplex(call) {
call.on('data', function(value) { call.on('data', function(value) {
var payload_type = value.response_type; var payload_type = value.response_type;
if (payload_type === testProto.PayloadType.RANDOM) { if (payload_type === 'RANDOM') {
payload_type = [ payload_type = ['COMPRESSABLE',
testProto.PayloadType.COMPRESSABLE, 'UNCOMPRESSABLE'][Math.random() < 0.5 ? 0 : 1];
testProto.PayloadType.UNCOMPRESSABLE][Math.random() < 0.5 ? 0 : 1];
} }
_.each(value.response_parameters, function(resp_param) { _.each(value.response_parameters, function(resp_param) {
call.write({ call.write({

@ -30,7 +30,7 @@
// Message definitions to be used by integration test service definitions. // Message definitions to be used by integration test service definitions.
syntax = "proto2"; syntax = "proto3";
package grpc.testing; package grpc.testing;
@ -49,46 +49,46 @@ enum PayloadType {
// A block of data, to simply increase gRPC message size. // A block of data, to simply increase gRPC message size.
message Payload { message Payload {
// The type of data in body. // The type of data in body.
optional PayloadType type = 1 [default = COMPRESSABLE]; PayloadType type = 1;
// Primary contents of payload. // Primary contents of payload.
optional bytes body = 2; bytes body = 2;
} }
// Unary request. // Unary request.
message SimpleRequest { message SimpleRequest {
// Desired payload type in the response from the server. // Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats. // If response_type is RANDOM, server randomly chooses one from other formats.
optional PayloadType response_type = 1 [default = COMPRESSABLE]; PayloadType response_type = 1;
// Desired payload size in the response from the server. // Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression. // If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 response_size = 2; int32 response_size = 2;
// Optional input payload sent along with the request. // Optional input payload sent along with the request.
optional Payload payload = 3; Payload payload = 3;
// Whether SimpleResponse should include username. // Whether SimpleResponse should include username.
optional bool fill_username = 4; bool fill_username = 4;
// Whether SimpleResponse should include OAuth scope. // Whether SimpleResponse should include OAuth scope.
optional bool fill_oauth_scope = 5; bool fill_oauth_scope = 5;
} }
// Unary response, as configured by the request. // Unary response, as configured by the request.
message SimpleResponse { message SimpleResponse {
// Payload to increase message size. // Payload to increase message size.
optional Payload payload = 1; Payload payload = 1;
// The user the request came from, for verifying authentication was // The user the request came from, for verifying authentication was
// successful when the client expected it. // successful when the client expected it.
optional string username = 2; string username = 2;
// OAuth scope. // OAuth scope.
optional string oauth_scope = 3; string oauth_scope = 3;
} }
// Client-streaming request. // Client-streaming request.
message StreamingInputCallRequest { message StreamingInputCallRequest {
// Optional input payload sent along with the request. // Optional input payload sent along with the request.
optional Payload payload = 1; Payload payload = 1;
// Not expecting any payload from the response. // Not expecting any payload from the response.
} }
@ -96,18 +96,18 @@ message StreamingInputCallRequest {
// Client-streaming response. // Client-streaming response.
message StreamingInputCallResponse { message StreamingInputCallResponse {
// Aggregated size of payloads received from the client. // Aggregated size of payloads received from the client.
optional int32 aggregated_payload_size = 1; int32 aggregated_payload_size = 1;
} }
// Configuration for a particular response. // Configuration for a particular response.
message ResponseParameters { message ResponseParameters {
// Desired payload sizes in responses from the server. // Desired payload sizes in responses from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression. // If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 size = 1; int32 size = 1;
// Desired interval between consecutive responses in the response stream in // Desired interval between consecutive responses in the response stream in
// microseconds. // microseconds.
optional int32 interval_us = 2; int32 interval_us = 2;
} }
// Server-streaming request. // Server-streaming request.
@ -116,17 +116,17 @@ message StreamingOutputCallRequest {
// If response_type is RANDOM, the payload from each response in the stream // If response_type is RANDOM, the payload from each response in the stream
// might be of different types. This is to simulate a mixed type of payload // might be of different types. This is to simulate a mixed type of payload
// stream. // stream.
optional PayloadType response_type = 1 [default = COMPRESSABLE]; PayloadType response_type = 1;
// Configuration for each expected response message. // Configuration for each expected response message.
repeated ResponseParameters response_parameters = 2; repeated ResponseParameters response_parameters = 2;
// Optional input payload sent along with the request. // Optional input payload sent along with the request.
optional Payload payload = 3; Payload payload = 3;
} }
// Server-streaming response, as configured by the request and parameters. // Server-streaming response, as configured by the request and parameters.
message StreamingOutputCallResponse { message StreamingOutputCallResponse {
// Payload to increase response size. // Payload to increase response size.
optional Payload payload = 1; Payload payload = 1;
} }

@ -30,7 +30,8 @@
// An integration test service that covers all the method signature permutations // An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses. // of unary/streaming requests/responses.
syntax = "proto2";
syntax = "proto3";
import "empty.proto"; import "empty.proto";
import "messages.proto"; import "messages.proto";

@ -1,6 +1,6 @@
{ {
"name": "grpc", "name": "grpc",
"version": "0.7.0", "version": "0.8.0",
"author": "Google Inc.", "author": "Google Inc.",
"description": "gRPC Library for Node", "description": "gRPC Library for Node",
"homepage": "http://www.grpc.io/", "homepage": "http://www.grpc.io/",
@ -26,7 +26,7 @@
"dependencies": { "dependencies": {
"bindings": "^1.2.0", "bindings": "^1.2.0",
"nan": "^1.5.0", "nan": "^1.5.0",
"protobufjs": "^4.0.0-b2", "protobufjs": "dcodeIO/ProtoBuf.js",
"underscore": "^1.6.0", "underscore": "^1.6.0",
"underscore.string": "^3.0.0" "underscore.string": "^3.0.0"
}, },

@ -50,7 +50,7 @@ function deserializeCls(cls) {
* @return {cls} The resulting object * @return {cls} The resulting object
*/ */
return function deserialize(arg_buf) { return function deserialize(arg_buf) {
return cls.decode(arg_buf); return cls.decode(arg_buf).toRaw();
}; };
} }

@ -0,0 +1,39 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
message EchoMessage {
string value = 1;
int32 value2 = 2;
}
service EchoService {
rpc Echo (EchoMessage) returns (EchoMessage);
}

@ -99,6 +99,36 @@ describe('Surface server constructor', function() {
}, /math.Math/); }, /math.Math/);
}); });
}); });
describe('Echo service', function() {
var server;
var client;
before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/echo_service.proto');
var echo_service = test_proto.lookup('EchoService');
var Server = grpc.buildServer([echo_service]);
server = new Server({
'EchoService': {
echo: function(call, callback) {
callback(null, call.request);
}
}
});
var port = server.bind('localhost:0');
var Client = surface_client.makeProtobufClientConstructor(echo_service);
client = new Client('localhost:' + port);
server.listen();
});
after(function() {
server.shutdown();
});
it('should echo the recieved message directly', function(done) {
client.echo({value: 'test value', value2: 3}, function(error, response) {
assert.ifError(error);
assert.deepEqual(response, {value: 'test value', value2: 3});
done();
});
});
});
describe('Generic client and server', function() { describe('Generic client and server', function() {
function toString(val) { function toString(val) {
return val.toString(); return val.toString();

@ -27,14 +27,14 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto2"; syntax = "proto3";
message Request { message Request {
optional bool error = 1; bool error = 1;
} }
message Response { message Response {
optional int32 count = 1; int32 count = 1;
} }
service TestService { service TestService {

@ -114,12 +114,13 @@ PHP_METHOD(Server, __construct) {
} }
server->queue = grpc_completion_queue_create(); server->queue = grpc_completion_queue_create();
if (args_array == NULL) { if (args_array == NULL) {
server->wrapped = grpc_server_create(server->queue, NULL); server->wrapped = grpc_server_create(NULL);
} else { } else {
php_grpc_read_args_array(args_array, &args); php_grpc_read_args_array(args_array, &args);
server->wrapped = grpc_server_create(server->queue, &args); server->wrapped = grpc_server_create(&args);
efree(args.args); efree(args.args);
} }
grpc_server_register_completion_queue(server->wrapped, server->queue);
} }
/** /**
@ -141,8 +142,9 @@ PHP_METHOD(Server, requestCall) {
object_init(result); object_init(result);
grpc_call_details_init(&details); grpc_call_details_init(&details);
grpc_metadata_array_init(&metadata); grpc_metadata_array_init(&metadata);
error_code = grpc_server_request_call(server->wrapped, &call, &details, error_code =
&metadata, server->queue, NULL); grpc_server_request_call(server->wrapped, &call, &details, &metadata,
server->queue, server->queue, NULL);
if (error_code != GRPC_CALL_OK) { if (error_code != GRPC_CALL_OK) {
zend_throw_exception(spl_ce_LogicException, "request_call failed", zend_throw_exception(spl_ce_LogicException, "request_call failed",
(long)error_code TSRMLS_CC); (long)error_code TSRMLS_CC);

@ -7,7 +7,7 @@ The Python facility of gRPC.
Status Status
------- -------
Usable with limitations, Pre-Alpha Usable with limitations, Alpha
Prerequisites Prerequisites
----------------------- -----------------------

@ -51,8 +51,9 @@ static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) {
&completion_queue)) { &completion_queue)) {
return -1; return -1;
} }
self->c_server = grpc_server_create( self->c_server = grpc_server_create(NULL);
completion_queue->c_completion_queue, NULL); grpc_server_register_completion_queue(self->c_server,
completion_queue->c_completion_queue);
self->completion_queue = completion_queue; self->completion_queue = completion_queue;
Py_INCREF(completion_queue); Py_INCREF(completion_queue);
return 0; return 0;
@ -122,7 +123,7 @@ static const PyObject *pygrpc_server_service(Server *self, PyObject *tag) {
call_error = grpc_server_request_call( call_error = grpc_server_request_call(
self->c_server, &c_tag->call->c_call, &c_tag->call->call_details, self->c_server, &c_tag->call->c_call, &c_tag->call->call_details,
&c_tag->call->recv_metadata, self->completion_queue->c_completion_queue, &c_tag->call->recv_metadata, self->completion_queue->c_completion_queue,
c_tag); self->completion_queue->c_completion_queue, c_tag);
result = pygrpc_translate_call_error(call_error); result = pygrpc_translate_call_error(call_error);
if (result != NULL) { if (result != NULL) {

@ -123,7 +123,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
wrapper); wrapper);
grpc_rb_hash_convert_to_channel_args(channel_args, &args); grpc_rb_hash_convert_to_channel_args(channel_args, &args);
srv = grpc_server_create(cq, &args); srv = grpc_server_create(&args);
if (args.args != NULL) { if (args.args != NULL) {
xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */ xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
@ -131,6 +131,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
if (srv == NULL) { if (srv == NULL) {
rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why"); rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why");
} }
grpc_server_register_completion_queue(srv, cq);
wrapper->wrapped = srv; wrapper->wrapped = srv;
/* Add the cq as the server's mark object. This ensures the ruby cq can't be /* Add the cq as the server's mark object. This ensures the ruby cq can't be
@ -218,6 +219,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
err = grpc_server_request_call( err = grpc_server_request_call(
s->wrapped, &call, &st.details, &st.md_ary, s->wrapped, &call, &st.details, &st.md_ary,
grpc_rb_get_wrapped_completion_queue(cqueue), grpc_rb_get_wrapped_completion_queue(cqueue),
grpc_rb_get_wrapped_completion_queue(cqueue),
ROBJECT(tag_new)); ROBJECT(tag_new));
if (err != GRPC_CALL_OK) { if (err != GRPC_CALL_OK) {
grpc_request_call_stack_cleanup(&st); grpc_request_call_stack_cleanup(&st);

@ -94,7 +94,8 @@ void test_connect(const char *server_host, const char *client_host, int port,
/* Create server. */ /* Create server. */
server_cq = grpc_completion_queue_create(); server_cq = grpc_completion_queue_create();
server = grpc_server_create(server_cq, NULL); server = grpc_server_create(NULL);
grpc_server_register_completion_queue(server, server_cq);
GPR_ASSERT((got_port = grpc_server_add_http2_port(server, server_hostport)) > GPR_ASSERT((got_port = grpc_server_add_http2_port(server, server_hostport)) >
0); 0);
if (port == 0) { if (port == 0) {
@ -150,10 +151,10 @@ void test_connect(const char *server_host, const char *client_host, int port,
if (expect_ok) { if (expect_ok) {
/* Check for a successful request. */ /* Check for a successful request. */
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, server_cq,
server_cq, tag(101))); server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -82,8 +82,8 @@ static void chttp2_init_server_secure_fullstack(
if (f->server) { if (f->server) {
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
} }
f->server = f->server = grpc_server_create(server_args);
grpc_server_create(f->server_cq, server_args); grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds)); GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds));
grpc_server_credentials_release(server_creds); grpc_server_credentials_release(server_creds);
grpc_server_start(f->server); grpc_server_start(f->server);

@ -83,7 +83,8 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
if (f->server) { if (f->server) {
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
} }
f->server = grpc_server_create(f->server_cq, server_args); f->server = grpc_server_create(server_args);
grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr)); GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
grpc_server_start(f->server); grpc_server_start(f->server);
} }

@ -88,7 +88,8 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
if (f->server) { if (f->server) {
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
} }
f->server = grpc_server_create(f->server_cq, server_args); f->server = grpc_server_create(server_args);
grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr)); GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr));
grpc_server_start(f->server); grpc_server_start(f->server);
} }

@ -85,8 +85,8 @@ static void chttp2_init_server_secure_fullstack(
if (f->server) { if (f->server) {
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
} }
f->server = f->server = grpc_server_create(server_args);
grpc_server_create(f->server_cq, server_args); grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds)); GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds));
grpc_server_credentials_release(server_creds); grpc_server_credentials_release(server_creds);
grpc_server_start(f->server); grpc_server_start(f->server);

@ -83,8 +83,8 @@ static void chttp2_init_server_secure_fullstack(
if (f->server) { if (f->server) {
grpc_server_destroy(f->server); grpc_server_destroy(f->server);
} }
f->server = f->server = grpc_server_create(server_args);
grpc_server_create(f->server_cq, server_args); grpc_server_register_completion_queue(f->server, f->server_cq);
GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds)); GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds));
grpc_server_credentials_release(server_creds); grpc_server_credentials_release(server_creds);
grpc_server_start(f->server); grpc_server_start(f->server);

@ -117,8 +117,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *server_args) { grpc_channel_args *server_args) {
grpc_endpoint_pair *sfd = f->fixture_data; grpc_endpoint_pair *sfd = f->fixture_data;
GPR_ASSERT(!f->server); GPR_ASSERT(!f->server);
f->server = f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->server_cq);
grpc_server_start(f->server); grpc_server_start(f->server);
grpc_create_chttp2_transport(server_setup_transport, f, server_args, grpc_create_chttp2_transport(server_setup_transport, f, server_args,
sfd->server, NULL, 0, grpc_mdctx_create(), 0); sfd->server, NULL, 0, grpc_mdctx_create(), 0);

@ -117,8 +117,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *server_args) { grpc_channel_args *server_args) {
grpc_endpoint_pair *sfd = f->fixture_data; grpc_endpoint_pair *sfd = f->fixture_data;
GPR_ASSERT(!f->server); GPR_ASSERT(!f->server);
f->server = f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->server_cq);
grpc_server_start(f->server); grpc_server_start(f->server);
grpc_create_chttp2_transport(server_setup_transport, f, server_args, grpc_create_chttp2_transport(server_setup_transport, f, server_args,
sfd->server, NULL, 0, grpc_mdctx_create(), 0); sfd->server, NULL, 0, grpc_mdctx_create(), 0);

@ -156,9 +156,10 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( GPR_ASSERT(GRPC_CALL_OK ==
f.server, &s, &call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.server_cq, tag(2))); &request_metadata_recv, f.server_cq,
f.server_cq, tag(2)));
cq_expect_completion(v_server, tag(2), 1); cq_expect_completion(v_server, tag(2), 1);
cq_verify(v_server); cq_verify(v_server);

@ -158,9 +158,10 @@ static void test_cancel_after_accept_and_writes_closed(
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( GPR_ASSERT(GRPC_CALL_OK ==
f.server, &s, &call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.server_cq, tag(2))); &request_metadata_recv, f.server_cq,
f.server_cq, tag(2)));
cq_expect_completion(v_server, tag(2), 1); cq_expect_completion(v_server, tag(2), 1);
cq_verify(v_server); cq_verify(v_server);

@ -137,10 +137,10 @@ static void test_body(grpc_end2end_test_fixture f) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -128,10 +128,10 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f->server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f->server_cq,
f->server_cq, tag(101))); f->server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -143,10 +143,10 @@ static void test_early_server_shutdown_finishes_inflight_calls(
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -110,10 +110,10 @@ static void test_early_server_shutdown_finishes_tags(
/* upon shutdown, the server should finish all requested calls indicating /* upon shutdown, the server should finish all requested calls indicating
no new call */ no new call */
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
grpc_server_shutdown(f.server); grpc_server_shutdown(f.server);
cq_expect_completion(v_server, tag(101), 0); cq_expect_completion(v_server, tag(101), 0);
cq_verify(v_server); cq_verify(v_server);

@ -142,10 +142,10 @@ static void test_early_server_shutdown_finishes_inflight_calls(
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -160,10 +160,10 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -140,10 +140,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);
@ -249,10 +249,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
"foo.test.google.fr:1234", deadline); "foo.test.google.fr:1234", deadline);
GPR_ASSERT(c2); GPR_ASSERT(c2);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s1, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s1, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
op = ops; op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA; op->op = GRPC_OP_SEND_INITIAL_METADATA;
@ -335,10 +335,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
cq_expect_completion(v_client, tag(live_call + 1), 1); cq_expect_completion(v_client, tag(live_call + 1), 1);
cq_verify(v_client); cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s2, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s2, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(201))); f.server_cq, tag(201)));
cq_expect_completion(v_server, tag(201), 1); cq_expect_completion(v_server, tag(201), 1);
cq_verify(v_server); cq_verify(v_server);

@ -159,10 +159,10 @@ static void test_max_message_length(grpc_end2end_test_config config) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -148,10 +148,10 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(100))); f.server_cq, tag(100)));
cq_expect_completion(v_server, tag(100), 1); cq_expect_completion(v_server, tag(100), 1);
cq_verify(v_server); cq_verify(v_server);

@ -141,10 +141,10 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -176,10 +176,10 @@ static void test_request_response_with_metadata_and_payload(
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -162,10 +162,10 @@ static void test_request_response_with_metadata_and_payload(
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -154,10 +154,10 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -212,12 +212,13 @@ static void request_response_with_payload_and_call_creds(
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details, &call_details,
&request_metadata_recv, &request_metadata_recv,
f.server_cq, tag(101))); f.server_cq, f.server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);
/* Cannot set creds on the server call object. */ /* Cannot set creds on the server call object. */
GPR_ASSERT(grpc_call_set_credentials(s, NULL) != GRPC_CALL_OK); GPR_ASSERT(!grpc_call_set_credentials(s, NULL));
op = ops; op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA; op->op = GRPC_OP_SEND_INITIAL_METADATA;

@ -164,8 +164,14 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details, &call_details,
&request_metadata_recv, &request_metadata_recv,
<<<<<<< HEAD
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
=======
f.server_cq, f.server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
>>>>>>> a468c36601dd5997580129bbd66b5ebed02521f8
cq_verify(v_server); cq_verify(v_server);
op = ops; op = ops;

@ -158,10 +158,10 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -149,10 +149,10 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -136,10 +136,10 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
config.init_server(f, server_args); config.init_server(f, server_args);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f->server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f->server_cq,
f->server_cq, tag(101))); f->server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -142,10 +142,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -142,10 +142,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
op++; op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, GPR_ASSERT(GRPC_CALL_OK ==
&call_details, grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, &request_metadata_recv, f.server_cq,
f.server_cq, tag(101))); f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1); cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server); cq_verify(v_server);

@ -92,7 +92,7 @@ typedef struct {
static void request_call(void) { static void request_call(void) {
grpc_metadata_array_init(&request_metadata_recv); grpc_metadata_array_init(&request_metadata_recv);
grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, grpc_server_request_call(server, &call, &call_details, &request_metadata_recv,
cq, tag(FLING_SERVER_NEW_REQUEST)); cq, cq, tag(FLING_SERVER_NEW_REQUEST));
} }
static void handle_unary_method(void) { static void handle_unary_method(void) {
@ -211,13 +211,14 @@ int main(int argc, char **argv) {
test_server1_cert}; test_server1_cert};
grpc_server_credentials *ssl_creds = grpc_server_credentials *ssl_creds =
grpc_ssl_server_credentials_create(NULL, &pem_key_cert_pair, 1); grpc_ssl_server_credentials_create(NULL, &pem_key_cert_pair, 1);
server = grpc_server_create(cq, NULL); server = grpc_server_create(NULL);
GPR_ASSERT(grpc_server_add_secure_http2_port(server, addr, ssl_creds)); GPR_ASSERT(grpc_server_add_secure_http2_port(server, addr, ssl_creds));
grpc_server_credentials_release(ssl_creds); grpc_server_credentials_release(ssl_creds);
} else { } else {
server = grpc_server_create(cq, NULL); server = grpc_server_create(NULL);
GPR_ASSERT(grpc_server_add_http2_port(server, addr)); GPR_ASSERT(grpc_server_add_http2_port(server, addr));
} }
grpc_server_register_completion_queue(server, cq);
grpc_server_start(server); grpc_server_start(server);
gpr_free(addr_buf); gpr_free(addr_buf);

@ -91,7 +91,7 @@ void verify_timed_ok(
class AsyncEnd2endTest : public ::testing::Test { class AsyncEnd2endTest : public ::testing::Test {
protected: protected:
AsyncEnd2endTest() : service_(&srv_cq_) {} AsyncEnd2endTest() {}
void SetUp() GRPC_OVERRIDE { void SetUp() GRPC_OVERRIDE {
int port = grpc_pick_unused_port_or_die(); int port = grpc_pick_unused_port_or_die();
@ -100,6 +100,7 @@ class AsyncEnd2endTest : public ::testing::Test {
ServerBuilder builder; ServerBuilder builder;
builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials()); builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials());
builder.RegisterAsyncService(&service_); builder.RegisterAsyncService(&service_);
srv_cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
} }
@ -108,10 +109,10 @@ class AsyncEnd2endTest : public ::testing::Test {
void* ignored_tag; void* ignored_tag;
bool ignored_ok; bool ignored_ok;
cli_cq_.Shutdown(); cli_cq_.Shutdown();
srv_cq_.Shutdown(); srv_cq_->Shutdown();
while (cli_cq_.Next(&ignored_tag, &ignored_ok)) while (cli_cq_.Next(&ignored_tag, &ignored_ok))
; ;
while (srv_cq_.Next(&ignored_tag, &ignored_ok)) while (srv_cq_->Next(&ignored_tag, &ignored_ok))
; ;
} }
@ -121,9 +122,9 @@ class AsyncEnd2endTest : public ::testing::Test {
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
} }
void server_ok(int i) { verify_ok(&srv_cq_, i, true); } void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
void client_ok(int i) { verify_ok(&cli_cq_, i, true); } void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
void server_fail(int i) { verify_ok(&srv_cq_, i, false); } void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
void client_fail(int i) { verify_ok(&cli_cq_, i, false); } void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
void SendRpc(int num_rpcs) { void SendRpc(int num_rpcs) {
@ -142,8 +143,8 @@ class AsyncEnd2endTest : public ::testing::Test {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer,
tag(2)); srv_cq_.get(), srv_cq_.get(), tag(2));
server_ok(2); server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
@ -161,7 +162,7 @@ class AsyncEnd2endTest : public ::testing::Test {
} }
CompletionQueue cli_cq_; CompletionQueue cli_cq_;
CompletionQueue srv_cq_; std::unique_ptr<ServerCompletionQueue> srv_cq_;
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<Server> server_; std::unique_ptr<Server> server_;
grpc::cpp::test::util::TestService::AsyncService service_; grpc::cpp::test::util::TestService::AsyncService service_;
@ -200,18 +201,18 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
std::chrono::system_clock::now()); std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit( std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10)); std::chrono::system_clock::now() + std::chrono::seconds(10));
verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); verify_timed_ok(srv_cq_.get(), -1, true, time_now, CompletionQueue::TIMEOUT);
verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
tag(2)); srv_cq_.get(), tag(2));
verify_timed_ok(&srv_cq_, 2, true, time_limit); verify_timed_ok(srv_cq_.get(), 2, true, time_limit);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
verify_timed_ok(&srv_cq_, 3, true); verify_timed_ok(srv_cq_.get(), 3, true);
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
verify_timed_ok(&cli_cq_, 4, true); verify_timed_ok(&cli_cq_, 4, true);
@ -237,7 +238,8 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream( std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1))); stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1)));
service_.RequestRequestStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); service_.RequestRequestStream(&srv_ctx, &srv_stream, srv_cq_.get(),
srv_cq_.get(), tag(2));
server_ok(2); server_ok(2);
client_ok(1); client_ok(1);
@ -290,8 +292,8 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) {
std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream( std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, &srv_cq_, service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
tag(2)); srv_cq_.get(), srv_cq_.get(), tag(2));
server_ok(2); server_ok(2);
client_ok(1); client_ok(1);
@ -341,7 +343,8 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> > std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> >
cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1))); cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); service_.RequestBidiStream(&srv_ctx, &srv_stream, srv_cq_.get(),
srv_cq_.get(), tag(2));
server_ok(2); server_ok(2);
client_ok(1); client_ok(1);
@ -399,8 +402,8 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
tag(2)); srv_cq_.get(), tag(2));
server_ok(2); server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata(); auto client_initial_metadata = srv_ctx.client_metadata();
@ -440,8 +443,8 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
tag(2)); srv_cq_.get(), tag(2));
server_ok(2); server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
@ -487,8 +490,8 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
tag(2)); srv_cq_.get(), tag(2));
server_ok(2); server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
@ -547,8 +550,8 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_)); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
tag(2)); srv_cq_.get(), tag(2));
server_ok(2); server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata(); auto client_initial_metadata = srv_ctx.client_metadata();

@ -109,6 +109,7 @@ class GenericEnd2endTest : public ::testing::Test {
ServerBuilder builder; ServerBuilder builder;
builder.AddListeningPort(server_address_.str(), InsecureServerCredentials()); builder.AddListeningPort(server_address_.str(), InsecureServerCredentials());
builder.RegisterAsyncGenericService(&generic_service_); builder.RegisterAsyncGenericService(&generic_service_);
srv_cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
} }
@ -117,10 +118,10 @@ class GenericEnd2endTest : public ::testing::Test {
void* ignored_tag; void* ignored_tag;
bool ignored_ok; bool ignored_ok;
cli_cq_.Shutdown(); cli_cq_.Shutdown();
srv_cq_.Shutdown(); srv_cq_->Shutdown();
while (cli_cq_.Next(&ignored_tag, &ignored_ok)) while (cli_cq_.Next(&ignored_tag, &ignored_ok))
; ;
while (srv_cq_.Next(&ignored_tag, &ignored_ok)) while (srv_cq_->Next(&ignored_tag, &ignored_ok))
; ;
} }
@ -130,9 +131,9 @@ class GenericEnd2endTest : public ::testing::Test {
generic_stub_.reset(new GenericStub(channel)); generic_stub_.reset(new GenericStub(channel));
} }
void server_ok(int i) { verify_ok(&srv_cq_, i, true); } void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
void client_ok(int i) { verify_ok(&cli_cq_, i, true); } void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
void server_fail(int i) { verify_ok(&srv_cq_, i, false); } void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
void client_fail(int i) { verify_ok(&cli_cq_, i, false); } void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
void SendRpc(int num_rpcs) { void SendRpc(int num_rpcs) {
@ -160,9 +161,10 @@ class GenericEnd2endTest : public ::testing::Test {
call->WritesDone(tag(3)); call->WritesDone(tag(3));
client_ok(3); client_ok(3);
generic_service_.RequestCall(&srv_ctx, &stream, &srv_cq_, tag(4)); generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
srv_cq_.get(), tag(4));
verify_ok(generic_service_.completion_queue(), 4, true); verify_ok(srv_cq_.get(), 4, true);
EXPECT_EQ(server_address_.str(), srv_ctx.host()); EXPECT_EQ(server_address_.str(), srv_ctx.host());
EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_EQ(kMethodName, srv_ctx.method());
ByteBuffer recv_buffer; ByteBuffer recv_buffer;
@ -193,7 +195,7 @@ class GenericEnd2endTest : public ::testing::Test {
} }
CompletionQueue cli_cq_; CompletionQueue cli_cq_;
CompletionQueue srv_cq_; std::unique_ptr<ServerCompletionQueue> srv_cq_;
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<grpc::GenericStub> generic_stub_; std::unique_ptr<grpc::GenericStub> generic_stub_;
std::unique_ptr<Server> server_; std::unique_ptr<Server> server_;
@ -230,9 +232,10 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1)); generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));
client_ok(1); client_ok(1);
generic_service_.RequestCall(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
srv_cq_.get(), tag(2));
verify_ok(generic_service_.completion_queue(), 2, true); verify_ok(srv_cq_.get(), 2, true);
EXPECT_EQ(server_address_.str(), srv_ctx.host()); EXPECT_EQ(server_address_.str(), srv_ctx.host());
EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_EQ(kMethodName, srv_ctx.method());

@ -0,0 +1,79 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc/support/log.h>
#include <signal.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
namespace grpc {
namespace testing {
static const int WARMUP = 5;
static const int BENCHMARK = 10;
static void RunAsyncStreamingPingPong() {
gpr_log(GPR_INFO, "Running Async Streaming Ping Pong");
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(STREAMING);
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
server_config.set_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
ReportQPS(result);
ReportLatency(result);
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncStreamingPingPong();
return 0;
}

@ -63,9 +63,7 @@ namespace testing {
class AsyncQpsServerTest : public Server { class AsyncQpsServerTest : public Server {
public: public:
AsyncQpsServerTest(const ServerConfig& config, int port) AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) {
: srv_cq_(), async_service_(&srv_cq_), server_(nullptr),
shutdown_(false) {
char* server_address = NULL; char* server_address = NULL;
gpr_join_host_port(&server_address, "::", port); gpr_join_host_port(&server_address, "::", port);
@ -74,15 +72,17 @@ class AsyncQpsServerTest : public Server {
gpr_free(server_address); gpr_free(server_address);
builder.RegisterAsyncService(&async_service_); builder.RegisterAsyncService(&async_service_);
srv_cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
using namespace std::placeholders; using namespace std::placeholders;
request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall, request_unary_ =
&async_service_, _1, _2, _3, &srv_cq_, _4); std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_,
_1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4);
request_streaming_ = request_streaming_ =
std::bind(&TestService::AsyncService::RequestStreamingCall, std::bind(&TestService::AsyncService::RequestStreamingCall,
&async_service_, _1, _2, &srv_cq_, _3); &async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
contexts_.push_front( contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
@ -96,7 +96,7 @@ class AsyncQpsServerTest : public Server {
// Wait until work is available or we are shutting down // Wait until work is available or we are shutting down
bool ok; bool ok;
void* got_tag; void* got_tag;
while (srv_cq_.Next(&got_tag, &ok)) { while (srv_cq_->Next(&got_tag, &ok)) {
ServerRpcContext* ctx = detag(got_tag); ServerRpcContext* ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke // The tag is a pointer to an RPC context to invoke
if (ctx->RunNextState(ok) == false) { if (ctx->RunNextState(ok) == false) {
@ -116,7 +116,7 @@ class AsyncQpsServerTest : public Server {
{ {
std::lock_guard<std::mutex> g(shutdown_mutex_); std::lock_guard<std::mutex> g(shutdown_mutex_);
shutdown_ = true; shutdown_ = true;
srv_cq_.Shutdown(); srv_cq_->Shutdown();
} }
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join(); thr->join();
@ -290,10 +290,10 @@ class AsyncQpsServerTest : public Server {
} }
return Status::OK; return Status::OK;
} }
CompletionQueue srv_cq_;
TestService::AsyncService async_service_;
std::vector<std::thread> threads_; std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_; std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_;
TestService::AsyncService async_service_;
std::function<void(ServerContext*, SimpleRequest*, std::function<void(ServerContext*, SimpleRequest*,
grpc::ServerAsyncResponseWriter<SimpleResponse>*, void*)> grpc::ServerAsyncResponseWriter<SimpleResponse>*, void*)>
request_unary_; request_unary_;

@ -0,0 +1,62 @@
#!/bin/bash
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
thisfile=$(readlink -ne "${BASH_SOURCE[0]}")
test_case=$1
client_vm=$2
result=cloud_prod_result.$1
cur=$(date "+%Y-%m-%d-%H-%M-%S")
log_link=https://pantheon.corp.google.com/m/cloudstorage/b/stoked-keyword-656-output/o/prod_result/$test_case/$cur
main() {
source grpc_docker.sh
clients=(cxx java go ruby node csharp_mono python php)
for client in "${clients[@]}"
do
log_file_name=cloud_{$test_case}_{$client}.txt
if grpc_cloud_prod_test $test_case $client_vm $client > /tmp/$log_file_name 2>&1
then
echo " ['$test_case', '$client', 'prod', true, '<a href="$log_link/$log_file_name">log</a>']," >> /tmp/$result.txt
else
echo " ['$test_case', '$client', 'prod', false, '<a href="$log_link/$log_file_name">log</a>']," >> /tmp/$result.txt
fi
gsutil cp /tmp/$log_file_name gs://stoked-keyword-656-output/prod_result/$test_case/$cur/$log_file_name
rm /tmp/$log_file_name
done
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
cat pre.html /tmp/$result.txt post.html > /tmp/$result.html
gsutil cp /tmp/$result.html gs://stoked-keyword-656-output/prod_result/$test_case/$cur/$result.html
rm /tmp/$result.txt
rm /tmp/$result.html
fi
}
set -x
main "$@"

@ -384,6 +384,7 @@ grpc_interop_test_args() {
[[ -n $1 ]] && { # client_type [[ -n $1 ]] && { # client_type
case $1 in case $1 in
cxx|go|java|node|php|python|ruby|csharp_mono) cxx|go|java|node|php|python|ruby|csharp_mono)
grpc_client_platform='Docker'
grpc_gen_test_cmd="grpc_interop_gen_$1_cmd" grpc_gen_test_cmd="grpc_interop_gen_$1_cmd"
declare -F $grpc_gen_test_cmd >> /dev/null || { declare -F $grpc_gen_test_cmd >> /dev/null || {
echo "-f: test_func for $1 => $grpc_gen_test_cmd is not defined" 1>&2 echo "-f: test_func for $1 => $grpc_gen_test_cmd is not defined" 1>&2
@ -391,6 +392,11 @@ grpc_interop_test_args() {
} }
shift shift
;; ;;
csharp_dotnet)
grpc_client_platform='Windows'
grpc_gen_test_cmd="grpc_interop_gen_$1_cmd"
shift
;;
*) *)
echo "bad client_type: $1" 1>&2 echo "bad client_type: $1" 1>&2
return 1 return 1
@ -456,6 +462,7 @@ grpc_cloud_prod_test_args() {
[[ -n $1 ]] && { # client_type [[ -n $1 ]] && { # client_type
case $1 in case $1 in
cxx|go|java|node|php|python|ruby|csharp_mono) cxx|go|java|node|php|python|ruby|csharp_mono)
grpc_client_platform='Docker'
grpc_gen_test_cmd="grpc_cloud_prod_gen_$1_cmd" grpc_gen_test_cmd="grpc_cloud_prod_gen_$1_cmd"
declare -F $grpc_gen_test_cmd >> /dev/null || { declare -F $grpc_gen_test_cmd >> /dev/null || {
echo "-f: test_func for $1 => $grpc_gen_test_cmd is not defined" 1>&2 echo "-f: test_func for $1 => $grpc_gen_test_cmd is not defined" 1>&2
@ -463,6 +470,11 @@ grpc_cloud_prod_test_args() {
} }
shift shift
;; ;;
csharp_dotnet)
grpc_client_platform='Windows'
grpc_gen_test_cmd="grpc_cloud_prod_gen_$1_cmd"
shift
;;
*) *)
echo "bad client_type: $1" 1>&2 echo "bad client_type: $1" 1>&2
return 1 return 1
@ -851,12 +863,23 @@ grpc_launch_servers() {
test_runner() { test_runner() {
local project_opt="--project $grpc_project" local project_opt="--project $grpc_project"
local zone_opt="--zone $grpc_zone" local zone_opt="--zone $grpc_zone"
local ssh_cmd="bash -l -c \"$cmd\""
echo "will run:"
echo " $ssh_cmd"
echo "on $host"
[[ $dry_run == 1 ]] && return 0 # don't run the command on a dry run [[ $dry_run == 1 ]] && return 0 # don't run the command on a dry run
gcloud compute $project_opt ssh $zone_opt $host --command "$cmd" & if [ "$grpc_client_platform" != "Windows" ]
then
echo "will run:"
echo " $cmd"
echo "on $host"
gcloud compute $project_opt ssh $zone_opt $host --command "$cmd" &
else
# gcloud's auto-uploading of RSA keys doesn't work for Windows VMs.
# So we have a linux machine that is authorized to access the Windows
# machine through ssh and we use gcloud auth support to logon to the proxy.
echo "will run:"
echo " $cmd"
echo "on $host (through grpc-windows-proxy)"
gcloud compute $project_opt ssh $zone_opt stoked-keyword-656@grpc-windows-proxy --command "ssh $host '$cmd'" &
fi
#
PID=$! PID=$!
echo "pid is $PID" echo "pid is $PID"
for x in {0..5} for x in {0..5}
@ -924,7 +947,7 @@ grpc_interop_test() {
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# grpc_interop_test_args # grpc_interop_test_args
local test_case host grpc_gen_test_cmd grpc_server grpc_port local test_case host grpc_gen_test_cmd grpc_server grpc_port grpc_client_platform
# set the project zone and check that all necessary args are provided # set the project zone and check that all necessary args are provided
_grpc_set_project_and_zone -f grpc_interop_test_args "$@" || return 1 _grpc_set_project_and_zone -f grpc_interop_test_args "$@" || return 1
@ -966,7 +989,7 @@ grpc_cloud_prod_test() {
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# grpc_cloud_prod_test_args # grpc_cloud_prod_test_args
local test_case host grpc_gen_test_cmd local test_case host grpc_gen_test_cmd grpc_client_platform
# set the project zone and check that all necessary args are provided # set the project zone and check that all necessary args are provided
_grpc_set_project_and_zone -f grpc_cloud_prod_test_args "$@" || return 1 _grpc_set_project_and_zone -f grpc_cloud_prod_test_args "$@" || return 1
@ -1431,6 +1454,18 @@ grpc_interop_gen_csharp_mono_cmd() {
echo $the_cmd echo $the_cmd
} }
# constructs the csharp-dotnet interop test cmd.
#
# call-seq:
# flags= .... # generic flags to include the command
# cmd=$($grpc_gen_test_cmd $flags)
grpc_interop_gen_csharp_dotnet_cmd() {
local set_workdir="cd /cygdrive/c/github/grpc/src/csharp/Grpc.IntegrationTesting.Client/bin/Debug &&"
local test_script="./Grpc.IntegrationTesting.Client.exe --use_tls=true --use_test_ca=true";
local the_cmd="$set_workdir $test_script $@";
echo $the_cmd
}
# constructs the full dockerized csharp-mono gce=>prod interop test cmd. # constructs the full dockerized csharp-mono gce=>prod interop test cmd.
# #
# call-seq: # call-seq:
@ -1446,6 +1481,20 @@ grpc_cloud_prod_gen_csharp_mono_cmd() {
echo $the_cmd echo $the_cmd
} }
# constructs the csharp-dotnet gce=>prod interop test cmd.
#
# call-seq:
# flags= .... # generic flags to include the command
# cmd=$($grpc_gen_test_cmd $flags)
grpc_cloud_prod_gen_csharp_dotnet_cmd() {
local set_workdir="cd /cygdrive/c/github/grpc/src/csharp/Grpc.IntegrationTesting.Client/bin/Debug &&"
local test_script="./Grpc.IntegrationTesting.Client.exe --use_tls=true";
local set_certfile="SSL_CERT_FILE=/cacerts/roots.pem "
local gfe_flags=$(_grpc_prod_gfe_flags);
local the_cmd="$set_workdir $set_certfile $test_script $gfe_flags $@";
echo $the_cmd
}
# constructs the full dockerized csharp-mono service_account auth interop test cmd. # constructs the full dockerized csharp-mono service_account auth interop test cmd.
# #
# call-seq: # call-seq:

@ -0,0 +1,67 @@
#!/bin/bash
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
thisfile=$(readlink -ne "${BASH_SOURCE[0]}")
test_case=$1
client_vm=$2
server_vm=$3
result=interop_result.$1
cur=$(date "+%Y-%m-%d-%H-%M-%S")
log_link=https://pantheon.corp.google.com/m/cloudstorage/b/stoked-keyword-656-output/o/interop_result/$test_case/$cur
main() {
source grpc_docker.sh
clients=(cxx java go ruby node csharp_mono python php)
servers=(cxx java go ruby node python csharp_mono)
for client in "${clients[@]}"
do
for server in "${servers[@]}"
do
log_file_name=cloud_{$test_case}_{$client}_{$server}.txt
if grpc_interop_test $test_case $client_vm $client $server_vm $server> /tmp/$log_file_name 2>&1
then
echo " ['$test_case', '$client', '$server', true, '<a href="$log_link/$log_file_name">log</a>']," >> /tmp/$result.txt
else
echo " ['$test_case', '$client', '$server', false, '<a href="$log_link/$log_file_name">log</a>']," >> /tmp/$result.txt
fi
gsutil cp /tmp/$log_file_name gs://stoked-keyword-656-output/interop_result/$test_case/$cur/$log_file_name
rm /tmp/$log_file_name
done
done
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
cat pre.html /tmp/$result.txt post.html > /tmp/$result.html
gsutil cp /tmp/$result.html gs://stoked-keyword-656-output/interop_result/$test_case/$cur/$result.html
rm /tmp/$result.txt
rm /tmp/$result.html
fi
}
set -x
main "$@"
Loading…
Cancel
Save