From f9e6adf998ed36479ccbb8eb3cdc58b02cc161dd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 6 May 2015 11:45:59 -0700 Subject: [PATCH] Completion queue binding for new requests API change Move completion queue binding for new requests to the new request request time, not server instantiation time. --- Makefile | 2 +- build.json | 2 +- include/grpc++/async_generic_service.h | 6 +- include/grpc++/completion_queue.h | 7 ++ include/grpc++/impl/service_type.h | 37 +++--- include/grpc++/server.h | 8 +- include/grpc++/server_builder.h | 7 ++ include/grpc/grpc.h | 18 ++- src/compiler/cpp_generator.cc | 105 ++++++++++-------- src/core/surface/server.c | 67 +++++------ src/core/surface/server.h | 3 +- src/core/surface/server_create.c | 5 +- src/cpp/server/async_generic_service.cc | 10 +- src/cpp/server/server.cc | 71 +++++++----- src/cpp/server/server_builder.cc | 9 ++ test/core/end2end/dualstack_socket_test.c | 11 +- .../end2end/fixtures/chttp2_fake_security.c | 4 +- test/core/end2end/fixtures/chttp2_fullstack.c | 3 +- .../end2end/fixtures/chttp2_fullstack_uds.c | 3 +- .../fixtures/chttp2_simple_ssl_fullstack.c | 4 +- .../chttp2_simple_ssl_with_oauth2_fullstack.c | 4 +- .../end2end/fixtures/chttp2_socket_pair.c | 4 +- .../chttp2_socket_pair_one_byte_at_a_time.c | 4 +- test/core/end2end/tests/cancel_after_accept.c | 7 +- .../cancel_after_accept_and_writes_closed.c | 7 +- .../end2end/tests/census_simple_request.c | 8 +- test/core/end2end/tests/disappearing_server.c | 8 +- ..._server_shutdown_finishes_inflight_calls.c | 8 +- .../early_server_shutdown_finishes_tags.c | 8 +- .../end2end/tests/graceful_server_shutdown.c | 8 +- .../core/end2end/tests/invoke_large_request.c | 8 +- .../end2end/tests/max_concurrent_streams.c | 24 ++-- test/core/end2end/tests/max_message_length.c | 8 +- test/core/end2end/tests/ping_pong_streaming.c | 8 +- test/core/end2end/tests/registered_call.c | 8 +- ...esponse_with_binary_metadata_and_payload.c | 8 +- ...quest_response_with_metadata_and_payload.c | 8 +- .../tests/request_response_with_payload.c | 8 +- .../tests/request_with_large_metadata.c | 8 +- .../core/end2end/tests/request_with_payload.c | 8 +- .../end2end/tests/simple_delayed_request.c | 8 +- test/core/end2end/tests/simple_request.c | 8 +- ...equest_with_high_initial_sequence_number.c | 8 +- test/core/fling/server.c | 7 +- test/cpp/end2end/async_end2end_test.cc | 53 ++++----- test/cpp/end2end/generic_end2end_test.cc | 21 ++-- test/cpp/qps/server_async.cc | 22 ++-- 47 files changed, 366 insertions(+), 305 deletions(-) diff --git a/Makefile b/Makefile index 244a211652c..5f95aabb926 100644 --- a/Makefile +++ b/Makefile @@ -305,7 +305,7 @@ E = @echo Q = @ endif -VERSION = 0.7.0.0 +VERSION = 0.8.0.0 CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES)) CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS) diff --git a/build.json b/build.json index 10fd72d99e4..217d84cdea7 100644 --- a/build.json +++ b/build.json @@ -6,7 +6,7 @@ "#": "The public version number of the library.", "version": { "major": 0, - "minor": 7, + "minor": 8, "micro": 0, "build": 0 } diff --git a/include/grpc++/async_generic_service.h b/include/grpc++/async_generic_service.h index 911d31cb1f4..b435c6e73d2 100644 --- a/include/grpc++/async_generic_service.h +++ b/include/grpc++/async_generic_service.h @@ -65,10 +65,8 @@ class AsyncGenericService GRPC_FINAL { void RequestCall(GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, - CompletionQueue* cq, void* tag); - - // The new rpc event should be obtained from this completion queue. - CompletionQueue* completion_queue(); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag); private: friend class Server; diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 5c2b1cce93d..e8429c8f417 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -58,6 +58,7 @@ class ServerReaderWriter; class CompletionQueue; class Server; +class ServerBuilder; class ServerContext; class CompletionQueueTag { @@ -137,6 +138,12 @@ class CompletionQueue : public GrpcLibrary { grpc_completion_queue* cq_; // owned }; +class ServerCompletionQueue : public CompletionQueue { + private: + friend class ServerBuilder; + ServerCompletionQueue() {} +}; + } // namespace grpc #endif // GRPCXX_COMPLETION_QUEUE_H diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index 7cd3ddad6b7..bc39bb82ac3 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -39,8 +39,10 @@ namespace grpc { class Call; +class CompletionQueue; class RpcService; class Server; +class ServerCompletionQueue; class ServerContext; class Status; @@ -70,52 +72,55 @@ class AsynchronousService { ServerContext* context, ::grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) = 0; + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) = 0; }; - AsynchronousService(CompletionQueue* cq, const char** method_names, - size_t method_count) - : cq_(cq), - dispatch_impl_(nullptr), + AsynchronousService(const char** method_names, size_t method_count) + : dispatch_impl_(nullptr), method_names_(method_names), method_count_(method_count), request_args_(nullptr) {} ~AsynchronousService() { delete[] request_args_; } - CompletionQueue* completion_queue() const { return cq_; } - protected: void RequestAsyncUnary(int index, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } void RequestClientStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } void RequestServerStreaming(int index, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } void RequestBidiStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, cq, tag); + stream, call_cq, notification_cq, tag); } private: friend class Server; - CompletionQueue* const cq_; DispatchImpl* dispatch_impl_; const char** const method_names_; size_t method_count_; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index b2b9044dcab..50a24163219 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -101,11 +101,15 @@ class Server GRPC_FINAL : public GrpcLibrary, void RequestAsyncCall(void* registered_method, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) GRPC_OVERRIDE; + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) GRPC_OVERRIDE; void RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag); + CompletionQueue* cq, + ServerCompletionQueue* notification_cq, + void* tag); const int max_message_size_; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 7155c7fd462..ecee475e3e4 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -46,6 +46,7 @@ class AsynchronousService; class CompletionQueue; class RpcService; class Server; +class ServerCompletionQueue; class ServerCredentials; class SynchronousService; class ThreadPoolInterface; @@ -82,6 +83,11 @@ class ServerBuilder { // Does not take ownership. void SetThreadPool(ThreadPoolInterface* thread_pool); + // Add a completion queue for handling asynchronous services + // Caller is required to keep this completion queue live until calling + // BuildAndStart() + std::unique_ptr AddCompletionQueue(); + // Return a running server which is ready for processing rpcs. std::unique_ptr BuildAndStart(); @@ -96,6 +102,7 @@ class ServerBuilder { std::vector services_; std::vector async_services_; std::vector ports_; + std::vector cqs_; std::shared_ptr creds_; AsyncGenericService* generic_service_; ThreadPoolInterface* thread_pool_; diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 9bb826f3237..be12356414c 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -460,7 +460,8 @@ void grpc_call_destroy(grpc_call *call); grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, - grpc_completion_queue *cq_bound_to_call, void *tag_new); + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag_new); /* Registers a method in the server. Methods to this (host, method) pair will not be reported by @@ -470,21 +471,26 @@ grpc_call_error grpc_server_request_call( Must be called before grpc_server_start. Returns NULL on failure. */ void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host, - grpc_completion_queue *new_call_cq); + const char *host); /* Request notification of a new pre-registered call */ grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *cq_bound_to_call, void *tag_new); + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag_new); /* Create a server. Additional configuration for each incoming channel can be specified with args. If no additional configuration is needed, args can be NULL. See grpc_channel_args for more. */ -grpc_server *grpc_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args); +grpc_server *grpc_server_create(const grpc_channel_args *args); + +/* Register a completion queue with the server. Must be done for any completion + queue that is passed to grpc_server_request_* call. Must be performed prior + to grpc_server_start. */ +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq); /* Add a HTTP2 over plaintext over tcp listener. Returns bound port number on success, 0 on failure. diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 735e7e58a82..46c842a7d64 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -162,6 +162,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, "class CompletionQueue;\n" "class ChannelInterface;\n" "class RpcService;\n" + "class ServerCompletionQueue;\n" "class ServerContext;\n"; if (HasUnaryCalls(file)) { temp.append( @@ -260,7 +261,7 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer, "std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>> " "$Method$(::grpc::ClientContext* context);\n"); printer->Print(*vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " + "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " "$Request$, $Response$>> " "Async$Method$(::grpc::ClientContext* context, " "::grpc::CompletionQueue* cq, void* tag);\n"); @@ -318,30 +319,37 @@ void PrintHeaderServerMethodAsync( (*vars)["Response"] = grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { - printer->Print(*vars, - "void Request$Method$(" - "::grpc::ServerContext* context, $Request$* request, " - "::grpc::ServerAsyncResponseWriter< $Response$>* response, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + printer->Print( + *vars, + "void Request$Method$(" + "::grpc::ServerContext* context, $Request$* request, " + "::grpc::ServerAsyncResponseWriter< $Response$>* response, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } else if (ClientOnlyStreaming(method)) { - printer->Print(*vars, - "void Request$Method$(" - "::grpc::ServerContext* context, " - "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + printer->Print( + *vars, + "void Request$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } else if (ServerOnlyStreaming(method)) { - printer->Print(*vars, - "void Request$Method$(" - "::grpc::ServerContext* context, $Request$* request, " - "::grpc::ServerAsyncWriter< $Response$>* writer, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + printer->Print( + *vars, + "void Request$Method$(" + "::grpc::ServerContext* context, $Request$* request, " + "::grpc::ServerAsyncWriter< $Response$>* writer, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, "void Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " - "::grpc::CompletionQueue* cq, void *tag);\n"); + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n"); } } @@ -403,7 +411,7 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer, " public:\n"); printer->Indent(); (*vars)["MethodCount"] = as_string(service->method_count()); - printer->Print("explicit AsyncService(::grpc::CompletionQueue* cq);\n"); + printer->Print("explicit AsyncService();\n"); printer->Print("~AsyncService() {};\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderServerMethodAsync(printer, service->method(i), vars); @@ -686,36 +694,43 @@ void PrintSourceServerAsyncMethod( (*vars)["Response"] = grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { - printer->Print(*vars, - "void $ns$$Service$::AsyncService::Request$Method$(" - "::grpc::ServerContext* context, " - "$Request$* request, " - "::grpc::ServerAsyncResponseWriter< $Response$>* response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print( + *vars, + "void $ns$$Service$::AsyncService::Request$Method$(" + "::grpc::ServerContext* context, " + "$Request$* request, " + "::grpc::ServerAsyncResponseWriter< $Response$>* response, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); printer->Print(*vars, " AsynchronousService::RequestAsyncUnary($Idx$, context, " - "request, response, cq, tag);\n"); + "request, response, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } else if (ClientOnlyStreaming(method)) { - printer->Print(*vars, - "void $ns$$Service$::AsyncService::Request$Method$(" - "::grpc::ServerContext* context, " - "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print( + *vars, + "void $ns$$Service$::AsyncService::Request$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); printer->Print(*vars, " AsynchronousService::RequestClientStreaming($Idx$, " - "context, reader, cq, tag);\n"); + "context, reader, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } else if (ServerOnlyStreaming(method)) { - printer->Print(*vars, - "void $ns$$Service$::AsyncService::Request$Method$(" - "::grpc::ServerContext* context, " - "$Request$* request, " - "::grpc::ServerAsyncWriter< $Response$>* writer, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print(*vars, - " AsynchronousService::RequestServerStreaming($Idx$, " - "context, request, writer, cq, tag);\n"); + printer->Print( + *vars, + "void $ns$$Service$::AsyncService::Request$Method$(" + "::grpc::ServerContext* context, " + "$Request$* request, " + "::grpc::ServerAsyncWriter< $Response$>* writer, " + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); + printer->Print( + *vars, + " AsynchronousService::RequestServerStreaming($Idx$, " + "context, request, writer, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } else if (BidiStreaming(method)) { printer->Print( @@ -723,10 +738,11 @@ void PrintSourceServerAsyncMethod( "void $ns$$Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " - "::grpc::CompletionQueue* cq, void *tag) {\n"); + "::grpc::CompletionQueue* new_call_cq, " + "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n"); printer->Print(*vars, " AsynchronousService::RequestBidiStreaming($Idx$, " - "context, stream, cq, tag);\n"); + "context, stream, new_call_cq, notification_cq, tag);\n"); printer->Print("}\n\n"); } } @@ -788,9 +804,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, (*vars)["MethodCount"] = as_string(service->method_count()); printer->Print(*vars, - "$ns$$Service$::AsyncService::AsyncService(::grpc::" - "CompletionQueue* cq) : " - "::grpc::AsynchronousService(cq, " + "$ns$$Service$::AsyncService::AsyncService() : " + "::grpc::AsynchronousService(" "$prefix$$Service$_method_names, $MethodCount$) " "{}\n\n"); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 01644b44718..96c1b7c3eb2 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -74,16 +74,15 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct { requested_call_type type; void *tag; + grpc_completion_queue *cq_bound_to_call; + grpc_completion_queue *cq_for_notification; + grpc_call **call; union { struct { - grpc_completion_queue *cq_bind; - grpc_call **call; grpc_call_details *details; grpc_metadata_array *initial_metadata; } batch; struct { - grpc_completion_queue *cq_bind; - grpc_call **call; registered_method *registered_method; gpr_timespec *deadline; grpc_metadata_array *initial_metadata; @@ -103,7 +102,6 @@ struct registered_method { char *host; call_data *pending; requested_call_array requested; - grpc_completion_queue *cq; registered_method *next; }; @@ -130,7 +128,6 @@ struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; grpc_channel_args *channel_args; - grpc_completion_queue *unregistered_cq; grpc_completion_queue **cqs; grpc_pollset **pollsets; @@ -602,7 +599,8 @@ static const grpc_channel_filter server_surface_filter = { destroy_channel_elem, "server", }; -static void addcq(grpc_server *server, grpc_completion_queue *cq) { +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq) { size_t i, n; for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; @@ -614,8 +612,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { server->cqs[n] = cq; } -grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, - grpc_channel_filter **filters, +grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args) { size_t i; @@ -626,12 +623,10 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); memset(server, 0, sizeof(grpc_server)); - if (cq) addcq(server, cq); gpr_mu_init(&server->mu); gpr_cv_init(&server->cv); - server->unregistered_cq = cq; /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = @@ -667,8 +662,7 @@ static int streq(const char *a, const char *b) { } void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host, - grpc_completion_queue *cq_new_rpc) { + const char *host) { registered_method *m; if (!method) { gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); @@ -681,13 +675,11 @@ void *grpc_server_register_method(grpc_server *server, const char *method, return NULL; } } - addcq(server, cq_new_rpc); m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; - m->cq = cq_new_rpc; server->registered_methods = m; return m; } @@ -1012,17 +1004,18 @@ static grpc_call_error queue_call_request(grpc_server *server, } } -grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq_bind, - void *tag) { +grpc_call_error grpc_server_request_call( + grpc_server *server, grpc_call **call, grpc_call_details *details, + grpc_metadata_array *initial_metadata, + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; - grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); rc.type = BATCH_CALL; rc.tag = tag; - rc.data.batch.cq_bind = cq_bind; - rc.data.batch.call = call; + rc.cq_bound_to_call = cq_bound_to_call; + rc.cq_for_notification = cq_for_notification; + rc.call = call; rc.data.batch.details = details; rc.data.batch.initial_metadata = initial_metadata; return queue_call_request(server, &rc); @@ -1031,14 +1024,16 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *cq_bind, void *tag) { + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; registered_method *registered_method = rm; - grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); rc.type = REGISTERED_CALL; rc.tag = tag; - rc.data.registered.cq_bind = cq_bind; - rc.data.registered.call = call; + rc.cq_bound_to_call = cq_bound_to_call; + rc.cq_for_notification = cq_for_notification; + rc.call = call; rc.data.registered.registered_method = registered_method; rc.data.registered.deadline = deadline; rc.data.registered.initial_metadata = initial_metadata; @@ -1076,6 +1071,9 @@ static void begin_call(grpc_server *server, call_data *calld, fill in the metadata array passed by the client, we need to perform an ioreq op, that should complete immediately. */ + grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); + *rc->call = calld->call; + calld->cq_new = rc->cq_for_notification; switch (rc->type) { case BATCH_CALL: cpstr(&rc->data.batch.details->host, @@ -1083,18 +1081,13 @@ static void begin_call(grpc_server *server, call_data *calld, cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); rc->data.batch.details->deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); - *rc->data.batch.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; r++; - calld->cq_new = server->unregistered_cq; publish = publish_registered_or_batch; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind); - *rc->data.registered.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.registered.initial_metadata; r++; @@ -1103,7 +1096,6 @@ static void begin_call(grpc_server *server, call_data *calld, r->data.recv_message = rc->data.registered.optional_payload; r++; } - calld->cq_new = rc->data.registered.registered_method->cq; publish = publish_registered_or_batch; break; } @@ -1114,20 +1106,17 @@ static void begin_call(grpc_server *server, call_data *calld, } static void fail_call(grpc_server *server, requested_call *rc) { + *rc->call = NULL; switch (rc->type) { case BATCH_CALL: - *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, - GRPC_OP_ERROR); break; case REGISTERED_CALL: - *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL, - do_nothing, NULL, GRPC_OP_ERROR); break; } + grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); } static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 2cfa38fa436..c6331033e01 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -39,8 +39,7 @@ #include "src/core/transport/transport.h" /* Create a server */ -grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, - grpc_channel_filter **filters, +grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args); diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index f629c7c72de..b7390675adb 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -35,7 +35,6 @@ #include "src/core/surface/completion_queue.h" #include "src/core/surface/server.h" -grpc_server *grpc_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args) { - return grpc_server_create_from_filters(cq, NULL, 0, args); +grpc_server *grpc_server_create(const grpc_channel_args *args) { + return grpc_server_create_from_filters(NULL, 0, args); } diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc index 07cb9337152..2e99afcb5f1 100644 --- a/src/cpp/server/async_generic_service.cc +++ b/src/cpp/server/async_generic_service.cc @@ -39,12 +39,10 @@ namespace grpc { void AsyncGenericService::RequestCall( GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, - CompletionQueue* cq, void* tag) { - server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag); -} - -CompletionQueue* AsyncGenericService::completion_queue() { - return &server_->cq_; + CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, + void* tag) { + server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq, + tag); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 08c956601c3..e9c4f4eaafe 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -78,7 +78,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - void Request(grpc_server* server) { + void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(!in_flight_); in_flight_ = true; cq_ = grpc_completion_queue_create(); @@ -86,7 +86,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_server_request_registered_call( server, tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, - this)); + notify_cq, this)); } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { @@ -179,16 +179,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -grpc_server* CreateServer(grpc_completion_queue* cq, int max_message_size) { +static grpc_server* CreateServer(int max_message_size) { if (max_message_size > 0) { grpc_arg arg; arg.type = GRPC_ARG_INTEGER; arg.key = const_cast(GRPC_ARG_MAX_MESSAGE_LENGTH); arg.value.integer = max_message_size; grpc_channel_args args = {1, &arg}; - return grpc_server_create(cq, &args); + return grpc_server_create(&args); } else { - return grpc_server_create(cq, nullptr); + return grpc_server_create(nullptr); } } @@ -199,9 +199,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, shutdown_(false), num_running_cb_(0), sync_methods_(new std::list), - server_(CreateServer(cq_.cq(), max_message_size)), + server_(CreateServer(max_message_size)), thread_pool_(thread_pool), - thread_pool_owned_(thread_pool_owned) {} + thread_pool_owned_(thread_pool_owned) { + grpc_server_register_completion_queue(server_, cq_.cq()); +} Server::~Server() { { @@ -221,8 +223,7 @@ Server::~Server() { bool Server::RegisterService(RpcService* service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod* method = service->GetMethod(i); - void* tag = - grpc_server_register_method(server_, method->name(), nullptr, cq_.cq()); + void* tag = grpc_server_register_method(server_, method->name(), nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -240,9 +241,8 @@ bool Server::RegisterAsyncService(AsynchronousService* service) { service->dispatch_impl_ = this; service->request_args_ = new void*[service->method_count_]; for (size_t i = 0; i < service->method_count_; ++i) { - void* tag = - grpc_server_register_method(server_, service->method_names_[i], nullptr, - service->completion_queue()->cq()); + void* tag = grpc_server_register_method(server_, service->method_names_[i], + nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", service->method_names_[i]); @@ -273,7 +273,7 @@ bool Server::Start() { // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { - m->Request(server_); + m->Request(server_, cq_.cq()); } ScheduleCallback(); @@ -316,12 +316,13 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { public: AsyncRequest(Server* server, void* registered_method, ServerContext* ctx, grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, CompletionQueue* cq, - void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) : tag_(tag), request_(request), stream_(stream), - cq_(cq), + call_cq_(call_cq), + notification_cq_(notification_cq), ctx_(ctx), generic_ctx_(nullptr), server_(server), @@ -329,18 +330,22 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { payload_(nullptr) { memset(&array_, 0, sizeof(array_)); grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); grpc_server_request_registered_call( server->server_, registered_method, &call_, &call_details_.deadline, - &array_, request ? &payload_ : nullptr, cq->cq(), this); + &array_, request ? &payload_ : nullptr, call_cq->cq(), + notification_cq->cq(), this); } AsyncRequest(Server* server, GenericServerContext* ctx, - ServerAsyncStreamingInterface* stream, CompletionQueue* cq, - void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) : tag_(tag), request_(nullptr), stream_(stream), - cq_(cq), + call_cq_(call_cq), + notification_cq_(notification_cq), ctx_(nullptr), generic_ctx_(ctx), server_(server), @@ -348,8 +353,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { payload_(nullptr) { memset(&array_, 0, sizeof(array_)); grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); grpc_server_request_call(server->server_, &call_, &call_details_, &array_, - cq->cq(), this); + call_cq->cq(), notification_cq->cq(), this); } ~AsyncRequest() { @@ -392,8 +399,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { } } ctx->call_ = call_; - ctx->cq_ = cq_; - Call call(call_, server_, cq_, server_->max_message_size_); + ctx->cq_ = call_cq_; + Call call(call_, server_, call_cq_, server_->max_message_size_); if (orig_status && call_) { ctx->BeginCompletionOp(&call); } @@ -407,7 +414,8 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { void* const tag_; grpc::protobuf::Message* const request_; ServerAsyncStreamingInterface* const stream_; - CompletionQueue* const cq_; + CompletionQueue* const call_cq_; + ServerCompletionQueue* const notification_cq_; ServerContext* const ctx_; GenericServerContext* const generic_ctx_; Server* const server_; @@ -420,14 +428,19 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { void Server::RequestAsyncCall(void* registered_method, ServerContext* context, grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { - new AsyncRequest(this, registered_method, context, request, stream, cq, tag); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + new AsyncRequest(this, registered_method, context, request, stream, call_cq, + notification_cq, tag); } void Server::RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, void* tag) { - new AsyncRequest(this, context, stream, cq, tag); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { + new AsyncRequest(this, context, stream, call_cq, notification_cq, tag); } void Server::ScheduleCallback() { @@ -446,7 +459,7 @@ void Server::RunRpc() { ScheduleCallback(); if (ok) { SyncRequest::CallData cd(this, mrd); - mrd->Request(server_); + mrd->Request(server_, cq_.cq()); cd.Run(); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index e48d1eeb426..4bcbd829521 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -44,6 +44,12 @@ namespace grpc { ServerBuilder::ServerBuilder() : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} +std::unique_ptr ServerBuilder::AddCompletionQueue() { + ServerCompletionQueue* cq = new ServerCompletionQueue(); + cqs_.push_back(cq); + return std::unique_ptr(cq); +} + void ServerBuilder::RegisterService(SynchronousService* service) { services_.push_back(service->service()); } @@ -88,6 +94,9 @@ std::unique_ptr ServerBuilder::BuildAndStart() { } std::unique_ptr server( new Server(thread_pool_, thread_pool_owned, max_message_size_)); + for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq()); + } for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService(*service)) { diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 5e278ca66c9..ad970845024 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -99,7 +99,8 @@ void test_connect(const char *server_host, const char *client_host, int port, /* Create server. */ server_cq = grpc_completion_queue_create(); - server = grpc_server_create(server_cq, NULL); + server = grpc_server_create(NULL); + grpc_server_register_completion_queue(server, server_cq); GPR_ASSERT((got_port = grpc_server_add_http2_port(server, server_hostport)) > 0); if (port == 0) { @@ -155,10 +156,10 @@ void test_connect(const char *server_host, const char *client_host, int port, if (expect_ok) { /* Check for a successful request. */ - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(server, &s, - &call_details, - &request_metadata_recv, - server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(server, &s, &call_details, + &request_metadata_recv, server_cq, + server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/fixtures/chttp2_fake_security.c b/test/core/end2end/fixtures/chttp2_fake_security.c index 929f1f50db5..c94ee94d53e 100644 --- a/test/core/end2end/fixtures/chttp2_fake_security.c +++ b/test/core/end2end/fixtures/chttp2_fake_security.c @@ -82,8 +82,8 @@ static void chttp2_init_server_secure_fullstack( if (f->server) { grpc_server_destroy(f->server); } - f->server = - grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds)); grpc_server_credentials_release(server_creds); grpc_server_start(f->server); diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c index d7de5e54348..f92b40efebf 100644 --- a/test/core/end2end/fixtures/chttp2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_fullstack.c @@ -83,7 +83,8 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f, if (f->server) { grpc_server_destroy(f->server); } - f->server = grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr)); grpc_server_start(f->server); } diff --git a/test/core/end2end/fixtures/chttp2_fullstack_uds.c b/test/core/end2end/fixtures/chttp2_fullstack_uds.c index 53803b0f1d8..876782df846 100644 --- a/test/core/end2end/fixtures/chttp2_fullstack_uds.c +++ b/test/core/end2end/fixtures/chttp2_fullstack_uds.c @@ -88,7 +88,8 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f, if (f->server) { grpc_server_destroy(f->server); } - f->server = grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr)); grpc_server_start(f->server); } diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c index 9c4086d79d4..36ac4e46a39 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c @@ -85,8 +85,8 @@ static void chttp2_init_server_secure_fullstack( if (f->server) { grpc_server_destroy(f->server); } - f->server = - grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds)); grpc_server_credentials_release(server_creds); grpc_server_start(f->server); diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c index e9e1c5f838a..4bfd923e838 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c @@ -83,8 +83,8 @@ static void chttp2_init_server_secure_fullstack( if (f->server) { grpc_server_destroy(f->server); } - f->server = - grpc_server_create(f->server_cq, server_args); + f->server = grpc_server_create(server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr, server_creds)); grpc_server_credentials_release(server_creds); grpc_server_start(f->server); diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index d19ceb178bc..43ebf7eed58 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -117,8 +117,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_endpoint_pair *sfd = f->fixture_data; GPR_ASSERT(!f->server); - f->server = - grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); + f->server = grpc_server_create_from_filters(NULL, 0, server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); grpc_server_start(f->server); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index ddde585b839..385d5a4e811 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -117,8 +117,8 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_endpoint_pair *sfd = f->fixture_data; GPR_ASSERT(!f->server); - f->server = - grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); + f->server = grpc_server_create_from_filters(NULL, 0, server_args); + grpc_server_register_completion_queue(f->server, f->server_cq); grpc_server_start(f->server); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index 21057969d9d..0adc437db06 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -161,9 +161,10 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( - f.server, &s, &call_details, - &request_metadata_recv, f.server_cq, tag(2))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(2))); cq_expect_completion(v_server, tag(2), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c index f8733ef4445..0b20a975598 100644 --- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c +++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c @@ -163,9 +163,10 @@ static void test_cancel_after_accept_and_writes_closed( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( - f.server, &s, &call_details, - &request_metadata_recv, f.server_cq, tag(2))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(2))); cq_expect_completion(v_server, tag(2), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c index 67c769c08be..13bf31584d4 100644 --- a/test/core/end2end/tests/census_simple_request.c +++ b/test/core/end2end/tests/census_simple_request.c @@ -142,10 +142,10 @@ static void test_body(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c index c8e22ce11ca..29c023c72a6 100644 --- a/test/core/end2end/tests/disappearing_server.c +++ b/test/core/end2end/tests/disappearing_server.c @@ -133,10 +133,10 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s, - &call_details, - &request_metadata_recv, - f->server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f->server, &s, &call_details, + &request_metadata_recv, f->server_cq, + f->server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c index 2c2d2e895b9..c2935516636 100644 --- a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c +++ b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c @@ -148,10 +148,10 @@ static void test_early_server_shutdown_finishes_inflight_calls( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c index 96978a8cb94..8801dae98a4 100644 --- a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c +++ b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c @@ -115,10 +115,10 @@ static void test_early_server_shutdown_finishes_tags( /* upon shutdown, the server should finish all requested calls indicating no new call */ - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); grpc_server_shutdown(f.server); cq_expect_completion(v_server, tag(101), GRPC_OP_ERROR); cq_verify(v_server); diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c index d084530a9c0..2a8cf098ebf 100644 --- a/test/core/end2end/tests/graceful_server_shutdown.c +++ b/test/core/end2end/tests/graceful_server_shutdown.c @@ -147,10 +147,10 @@ static void test_early_server_shutdown_finishes_inflight_calls( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index d9d9e934cb2..98bcf9ada9c 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -165,10 +165,10 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index 6e95a6c5f82..e25b115d33e 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -145,10 +145,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); @@ -254,10 +254,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { "foo.test.google.fr:1234", deadline); GPR_ASSERT(c2); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s1, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s1, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -342,10 +342,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { cq_expect_completion(v_client, tag(live_call + 1), GRPC_OP_OK); cq_verify(v_client); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s2, - &call_details, - &request_metadata_recv, - f.server_cq, tag(201))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s2, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(201))); cq_expect_completion(v_server, tag(201), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c index 6291f773b39..3f8112d3414 100644 --- a/test/core/end2end/tests/max_message_length.c +++ b/test/core/end2end/tests/max_message_length.c @@ -164,10 +164,10 @@ static void test_max_message_length(grpc_end2end_test_config config) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c index fe02f25875f..c125664115e 100644 --- a/test/core/end2end/tests/ping_pong_streaming.c +++ b/test/core/end2end/tests/ping_pong_streaming.c @@ -153,10 +153,10 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(100))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(100))); cq_expect_completion(v_server, tag(100), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/registered_call.c b/test/core/end2end/tests/registered_call.c index 05b7a1dad0c..04c3d5293cc 100644 --- a/test/core/end2end/tests/registered_call.c +++ b/test/core/end2end/tests/registered_call.c @@ -146,10 +146,10 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c index 0169d520597..281091cdf9c 100644 --- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c @@ -181,10 +181,10 @@ static void test_request_response_with_metadata_and_payload( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c index dc49242d39a..1590aa23fa9 100644 --- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c @@ -167,10 +167,10 @@ static void test_request_response_with_metadata_and_payload( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c index 92036590a77..b94b6761ebe 100644 --- a/test/core/end2end/tests/request_response_with_payload.c +++ b/test/core/end2end/tests/request_response_with_payload.c @@ -159,10 +159,10 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c index c5b4e0c57ee..bf8309914eb 100644 --- a/test/core/end2end/tests/request_with_large_metadata.c +++ b/test/core/end2end/tests/request_with_large_metadata.c @@ -163,10 +163,10 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index 63b7c5ee40b..5fe69e9109b 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -154,10 +154,10 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index 0dbb35d4546..e025fd1a1e3 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -141,10 +141,10 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, config.init_server(f, server_args); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s, - &call_details, - &request_metadata_recv, - f->server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f->server, &s, &call_details, + &request_metadata_recv, f->server_cq, + f->server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 4d4d48a2112..271bdc56ca4 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -147,10 +147,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c index 538291a5f24..3b5393f6606 100644 --- a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c +++ b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c @@ -147,10 +147,10 @@ static void simple_request_body(grpc_end2end_test_fixture f) { op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); - GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, - &call_details, - &request_metadata_recv, - f.server_cq, tag(101))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/fling/server.c b/test/core/fling/server.c index 63c7bd7f884..8eab534177e 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -89,7 +89,7 @@ typedef struct { static void request_call(void) { grpc_metadata_array_init(&request_metadata_recv); grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, - cq, tag(FLING_SERVER_NEW_REQUEST)); + cq, cq, tag(FLING_SERVER_NEW_REQUEST)); } static void handle_unary_method(void) { @@ -206,13 +206,14 @@ int main(int argc, char **argv) { test_server1_cert}; grpc_server_credentials *ssl_creds = grpc_ssl_server_credentials_create(NULL, &pem_key_cert_pair, 1); - server = grpc_server_create(cq, NULL); + server = grpc_server_create(NULL); GPR_ASSERT(grpc_server_add_secure_http2_port(server, addr, ssl_creds)); grpc_server_credentials_release(ssl_creds); } else { - server = grpc_server_create(cq, NULL); + server = grpc_server_create(NULL); GPR_ASSERT(grpc_server_add_http2_port(server, addr)); } + grpc_server_register_completion_queue(server, cq); grpc_server_start(server); gpr_free(addr_buf); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 6c0dfadbb91..d7c190dade8 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -91,7 +91,7 @@ void verify_timed_ok( class AsyncEnd2endTest : public ::testing::Test { protected: - AsyncEnd2endTest() : service_(&srv_cq_) {} + AsyncEnd2endTest() {} void SetUp() GRPC_OVERRIDE { int port = grpc_pick_unused_port_or_die(); @@ -100,6 +100,7 @@ class AsyncEnd2endTest : public ::testing::Test { ServerBuilder builder; builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials()); builder.RegisterAsyncService(&service_); + srv_cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); } @@ -108,10 +109,10 @@ class AsyncEnd2endTest : public ::testing::Test { void* ignored_tag; bool ignored_ok; cli_cq_.Shutdown(); - srv_cq_.Shutdown(); + srv_cq_->Shutdown(); while (cli_cq_.Next(&ignored_tag, &ignored_ok)) ; - while (srv_cq_.Next(&ignored_tag, &ignored_ok)) + while (srv_cq_->Next(&ignored_tag, &ignored_ok)) ; } @@ -121,9 +122,9 @@ class AsyncEnd2endTest : public ::testing::Test { stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel)); } - void server_ok(int i) { verify_ok(&srv_cq_, i, true); } + void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } void client_ok(int i) { verify_ok(&cli_cq_, i, true); } - void server_fail(int i) { verify_ok(&srv_cq_, i, false); } + void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); } void client_fail(int i) { verify_ok(&cli_cq_, i, false); } void SendRpc(int num_rpcs) { @@ -142,8 +143,8 @@ class AsyncEnd2endTest : public ::testing::Test { std::unique_ptr > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, + srv_cq_.get(), srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); @@ -162,7 +163,7 @@ class AsyncEnd2endTest : public ::testing::Test { } CompletionQueue cli_cq_; - CompletionQueue srv_cq_; + std::unique_ptr srv_cq_; std::unique_ptr stub_; std::unique_ptr server_; grpc::cpp::test::util::TestService::AsyncService service_; @@ -200,19 +201,19 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) { std::chrono::system_clock::time_point time_now( std::chrono::system_clock::now()), time_limit(std::chrono::system_clock::now() + std::chrono::seconds(5)); - verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); + verify_timed_ok(srv_cq_.get(), -1, true, time_now, CompletionQueue::TIMEOUT); verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); - verify_timed_ok(&srv_cq_, 2, true, time_limit); + verify_timed_ok(srv_cq_.get(), 2, true, time_limit); EXPECT_EQ(send_request.message(), recv_request.message()); verify_timed_ok(&cli_cq_, 1, true, time_limit); send_response.set_message(recv_request.message()); response_writer.Finish(send_response, Status::OK, tag(3)); - verify_timed_ok(&srv_cq_, 3, true); + verify_timed_ok(srv_cq_.get(), 3, true); response_reader->Finish(&recv_response, &recv_status, tag(4)); verify_timed_ok(&cli_cq_, 4, true); @@ -238,7 +239,8 @@ TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { std::unique_ptr > cli_stream( stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1))); - service_.RequestRequestStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); + service_.RequestRequestStream(&srv_ctx, &srv_stream, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); client_ok(1); @@ -291,8 +293,8 @@ TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { std::unique_ptr > cli_stream( stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, &srv_cq_, - tag(2)); + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + srv_cq_.get(), srv_cq_.get(), tag(2)); server_ok(2); client_ok(1); @@ -342,7 +344,8 @@ TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { std::unique_ptr > cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1))); - service_.RequestBidiStream(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); + service_.RequestBidiStream(&srv_ctx, &srv_stream, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); client_ok(1); @@ -400,8 +403,8 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { std::unique_ptr > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); @@ -442,8 +445,8 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { std::unique_ptr > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); srv_ctx.AddInitialMetadata(meta1.first, meta1.second); @@ -490,8 +493,8 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { std::unique_ptr > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); response_writer.SendInitialMetadata(tag(3)); @@ -551,8 +554,8 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { std::unique_ptr > response_reader( stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); - service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, - tag(2)); + service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), + srv_cq_.get(), tag(2)); server_ok(2); EXPECT_EQ(send_request.message(), recv_request.message()); auto client_initial_metadata = srv_ctx.client_metadata(); diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 103f613f70e..80e43fd8544 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -109,6 +109,7 @@ class GenericEnd2endTest : public ::testing::Test { ServerBuilder builder; builder.AddListeningPort(server_address_.str(), InsecureServerCredentials()); builder.RegisterAsyncGenericService(&generic_service_); + srv_cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); } @@ -117,10 +118,10 @@ class GenericEnd2endTest : public ::testing::Test { void* ignored_tag; bool ignored_ok; cli_cq_.Shutdown(); - srv_cq_.Shutdown(); + srv_cq_->Shutdown(); while (cli_cq_.Next(&ignored_tag, &ignored_ok)) ; - while (srv_cq_.Next(&ignored_tag, &ignored_ok)) + while (srv_cq_->Next(&ignored_tag, &ignored_ok)) ; } @@ -130,9 +131,9 @@ class GenericEnd2endTest : public ::testing::Test { generic_stub_.reset(new GenericStub(channel)); } - void server_ok(int i) { verify_ok(&srv_cq_, i, true); } + void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } void client_ok(int i) { verify_ok(&cli_cq_, i, true); } - void server_fail(int i) { verify_ok(&srv_cq_, i, false); } + void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); } void client_fail(int i) { verify_ok(&cli_cq_, i, false); } void SendRpc(int num_rpcs) { @@ -160,9 +161,10 @@ class GenericEnd2endTest : public ::testing::Test { call->WritesDone(tag(3)); client_ok(3); - generic_service_.RequestCall(&srv_ctx, &stream, &srv_cq_, tag(4)); + generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), + srv_cq_.get(), tag(4)); - verify_ok(generic_service_.completion_queue(), 4, true); + verify_ok(srv_cq_.get(), 4, true); EXPECT_EQ(server_address_.str(), srv_ctx.host()); EXPECT_EQ(kMethodName, srv_ctx.method()); ByteBuffer recv_buffer; @@ -193,7 +195,7 @@ class GenericEnd2endTest : public ::testing::Test { } CompletionQueue cli_cq_; - CompletionQueue srv_cq_; + std::unique_ptr srv_cq_; std::unique_ptr stub_; std::unique_ptr generic_stub_; std::unique_ptr server_; @@ -230,9 +232,10 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1)); client_ok(1); - generic_service_.RequestCall(&srv_ctx, &srv_stream, &srv_cq_, tag(2)); + generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(), + srv_cq_.get(), tag(2)); - verify_ok(generic_service_.completion_queue(), 2, true); + verify_ok(srv_cq_.get(), 2, true); EXPECT_EQ(server_address_.str(), srv_ctx.host()); EXPECT_EQ(kMethodName, srv_ctx.method()); diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index b19c443c823..6cb3192908d 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -63,9 +63,7 @@ namespace testing { class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig& config, int port) - : srv_cq_(), async_service_(&srv_cq_), server_(nullptr), - shutdown_(false) { + AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) { char* server_address = NULL; gpr_join_host_port(&server_address, "::", port); @@ -74,15 +72,17 @@ class AsyncQpsServerTest : public Server { gpr_free(server_address); builder.RegisterAsyncService(&async_service_); + srv_cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); using namespace std::placeholders; - request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall, - &async_service_, _1, _2, _3, &srv_cq_, _4); + request_unary_ = + std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, + _1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4); request_streaming_ = - std::bind(&TestService::AsyncService::RequestStreamingCall, - &async_service_, _1, _2, &srv_cq_, _3); + std::bind(&TestService::AsyncService::RequestStreamingCall, + &async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3); for (int i = 0; i < 100; i++) { contexts_.push_front( new ServerRpcContextUnaryImpl( @@ -96,7 +96,7 @@ class AsyncQpsServerTest : public Server { // Wait until work is available or we are shutting down bool ok; void* got_tag; - while (srv_cq_.Next(&got_tag, &ok)) { + while (srv_cq_->Next(&got_tag, &ok)) { ServerRpcContext* ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke if (ctx->RunNextState(ok) == false) { @@ -116,7 +116,7 @@ class AsyncQpsServerTest : public Server { { std::lock_guard g(shutdown_mutex_); shutdown_ = true; - srv_cq_.Shutdown(); + srv_cq_->Shutdown(); } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); @@ -290,10 +290,10 @@ class AsyncQpsServerTest : public Server { } return Status::OK; } - CompletionQueue srv_cq_; - TestService::AsyncService async_service_; std::vector threads_; std::unique_ptr server_; + std::unique_ptr srv_cq_; + TestService::AsyncService async_service_; std::function*, void*)> request_unary_;