From 982a6f2b1c8ca5ec6d361776fc76472ef6728253 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 3 Mar 2017 02:19:31 -0800 Subject: [PATCH] C++ code changes in response to grpc_completion_queue_create() API change --- .../grpc++/impl/codegen/client_unary_call.h | 2 +- .../grpc++/impl/codegen/completion_queue.h | 19 ++++++++++++------ include/grpc++/impl/codegen/core_codegen.h | 5 ++++- .../impl/codegen/core_codegen_interface.h | 4 +++- include/grpc++/impl/codegen/sync_stream.h | 12 ++++++++--- include/grpc/impl/codegen/grpc_types.h | 7 +++++-- src/cpp/common/core_codegen.cc | 4 +++- src/cpp/server/server_cc.cc | 19 ++++++++++++++---- test/cpp/grpclb/grpclb_test.cc | 20 ++++++++++++------- test/cpp/microbenchmarks/bm_call_create.cc | 3 ++- test/cpp/microbenchmarks/bm_cq.cc | 20 +++++++++++++++---- 11 files changed, 84 insertions(+), 31 deletions(-) diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 201e52ae073..bf18a7d6449 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -52,7 +52,7 @@ template Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result) { - CompletionQueue cq; + CompletionQueue cq(GRPC_CQ_PLUCK, DEFAULT_POLLING); Call call(channel->CreateCall(method, context, &cq)); CallOpSet, diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 03cecdc21c6..a123af2cf69 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -52,6 +52,7 @@ #include #include #include +#include #include struct grpc_completion_queue; @@ -102,10 +103,7 @@ class CompletionQueue : private GrpcLibraryCodegen { public: /// Default constructor. Implicitly creates a \a grpc_completion_queue /// instance. - CompletionQueue() { - cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); - InitialAvalanching(); // reserve this for the future shutdown - } + CompletionQueue() : CompletionQueue(GRPC_CQ_NEXT, DEFAULT_POLLING) {} /// Wrap \a take, taking ownership of the instance. /// @@ -149,8 +147,9 @@ class CompletionQueue : private GrpcLibraryCodegen { /// /// \return true if read a regular event, false if the queue is shutting down. bool Next(void** tag, bool* ok) { - return (AsyncNextInternal(tag, ok, g_core_codegen_interface->gpr_inf_future( - GPR_CLOCK_REALTIME)) != SHUTDOWN); + return (AsyncNextInternal(tag, ok, + g_core_codegen_interface->gpr_inf_future( + GPR_CLOCK_REALTIME)) != SHUTDOWN); } /// Request the shutdown of the queue. @@ -218,6 +217,14 @@ class CompletionQueue : private GrpcLibraryCodegen { const InputMessage& request, OutputMessage* result); + /// Private constructor of CompletionQueue only visible to friend classes + CompletionQueue(grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type) { + cq_ = g_core_codegen_interface->grpc_completion_queue_create( + completion_type, polling_type, nullptr); + InitialAvalanching(); // reserve this for the future shutdown + } + NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 754bf14b259..4c8567f7702 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -38,6 +38,7 @@ #include #include +#include #include namespace grpc { @@ -45,7 +46,9 @@ namespace grpc { /// Implementation of the core codegen interface. class CoreCodegen : public CoreCodegenInterface { private: - grpc_completion_queue* grpc_completion_queue_create(void* reserved) override; + grpc_completion_queue* grpc_completion_queue_create( + grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type, void* reserved) override; void grpc_completion_queue_destroy(grpc_completion_queue* cq) override; grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index 45ea0403031..8f2b4a3b76e 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -36,6 +36,7 @@ #include #include +#include #include #include #include @@ -60,7 +61,8 @@ class CoreCodegenInterface { int line) = 0; virtual grpc_completion_queue* grpc_completion_queue_create( - void* reserved) = 0; + grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type, void* reserved) = 0; virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0; virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 4d9b074e95f..d3a6c7ff899 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -137,7 +137,9 @@ class ClientReader final : public ClientReaderInterface { template ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(GRPC_CQ_PLUCK, DEFAULT_POLLING), + call_(channel->CreateCall(method, context, &cq_)) { CallOpSet ops; @@ -209,7 +211,9 @@ class ClientWriter : public ClientWriterInterface { template ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, R* response) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(GRPC_CQ_PLUCK, DEFAULT_POLLING), + call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -292,7 +296,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { /// Blocking create a stream. ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(GRPC_CQ_PLUCK, DEFAULT_POLLING), + call_(channel->CreateCall(method, context, &cq_)) { CallOpSet ops; ops.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index e5c731304cd..061f59213ce 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -352,8 +352,11 @@ typedef enum grpc_completion_type { typedef struct grpc_event { /** The type of the completion. */ grpc_completion_type type; - /** non-zero if the operation was successful, 0 upon failure. - Only GRPC_OP_COMPLETE can succeed or fail. */ + /** If the grpc_completion_type is GRPC_OP_COMPLETE, this field indicates + whether the operation was successful or not; 0 in case of failure and + non-zero in case of success. + If grpc_completion_type is GRPC_QUEUE_SHUTDOWN or GRPC_QUEUE_TIMEOUT, this + field is guaranteed to be 0 */ int success; /** The tag passed to grpc_call_start_batch etc to start this operation. Only GRPC_OP_COMPLETE has a tag. */ diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 36e4c893540..81b32938b85 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -55,8 +55,10 @@ struct grpc_byte_buffer; namespace grpc { grpc_completion_queue* CoreCodegen::grpc_completion_queue_create( + grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, void* reserved) { - return ::grpc_completion_queue_create(reserved); + return ::grpc_completion_queue_create(completion_type, polling_type, + reserved); } void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) { diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 9e11a8a9e07..1b7ca82be08 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -124,6 +124,14 @@ class ShutdownTag : public CompletionQueueTag { bool FinalizeResult(void** tag, bool* status) { return false; } }; +class DummyTag : public CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool* status) { + *status = true; + return true; + } +}; + class Server::SyncRequest final : public CompletionQueueTag { public: SyncRequest(RpcServiceMethod* method, void* tag) @@ -145,7 +153,10 @@ class Server::SyncRequest final : public CompletionQueueTag { grpc_metadata_array_destroy(&request_metadata_); } - void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } + void SetupRequest() { + // TODO: sreek - Double check if this should be GRPC_CQ_PLUCK + cq_ = grpc_completion_queue_create(GRPC_CQ_PLUCK, DEFAULT_POLLING, nullptr); + } void TeardownRequest() { grpc_completion_queue_destroy(cq_); @@ -213,10 +224,10 @@ class Server::SyncRequest final : public CompletionQueueTag { MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; - void* ignored_tag; - bool ignored_ok; + DummyTag ignored_tag; cq_.Shutdown(); - GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false); + /* Ensure the cq_ is shutdown (else this will hang indefinitely) */ + GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); } private: diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 89ed9249adc..542f396d109 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -354,8 +354,9 @@ static void start_backend_server(server_fixture *sf) { } GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); const string expected_token = - strlen(sf->lb_token_prefix) == 0 ? "" : sf->lb_token_prefix + - std::to_string(sf->port); + strlen(sf->lb_token_prefix) == 0 + ? "" + : sf->lb_token_prefix + std::to_string(sf->port); GPR_ASSERT(contains_metadata(&request_metadata_recv, "lb-token", expected_token.c_str())); @@ -593,7 +594,7 @@ static void setup_client(const server_fixture *lb_server, grpc_channel_args_copy_and_add(NULL, &expected_target_arg, 1); gpr_free(expected_target_names); - cf->cq = grpc_completion_queue_create(NULL); + cf->cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); cf->server_uri = lb_uri; grpc_channel_credentials *fake_creds = grpc_fake_transport_security_credentials_create(); @@ -616,7 +617,7 @@ static void teardown_client(client_fixture *cf) { static void setup_server(const char *host, server_fixture *sf) { int assigned_port; - sf->cq = grpc_completion_queue_create(NULL); + sf->cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); const char *colon_idx = strchr(host, ':'); if (colon_idx) { const char *port_str = colon_idx + 1; @@ -643,10 +644,15 @@ static void teardown_server(server_fixture *sf) { if (!sf->server) return; gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport); - grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - sf->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + + grpc_completion_queue *shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); + grpc_server_shutdown_and_notify(sf->server, shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); grpc_server_destroy(sf->server); gpr_thd_join(sf->tid); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 76d50302769..ecfa20aa905 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -65,7 +65,8 @@ static struct Init { static void BM_InsecureChannelWithDefaults(benchmark::State &state) { grpc_channel *channel = grpc_insecure_channel_create("localhost:12345", NULL, NULL); - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, DEFAULT_POLLING, NULL); grpc_slice method = grpc_slice_from_static_string("/foo/bar"); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index c017474bf4a..c7bd8743a57 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -66,7 +66,10 @@ BENCHMARK(BM_CreateDestroyCpp); static void BM_CreateDestroyCore(benchmark::State& state) { while (state.KeepRunning()) { - grpc_completion_queue_destroy(grpc_completion_queue_create(NULL)); + // TODO: sreek Make this a templatized benchmark and pass completion type + // and polling type as parameters + grpc_completion_queue_destroy( + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL)); } } BENCHMARK(BM_CreateDestroyCore); @@ -98,7 +101,10 @@ static void BM_Pass1Cpp(benchmark::State& state) { BENCHMARK(BM_Pass1Cpp); static void BM_Pass1Core(benchmark::State& state) { - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); + // TODO: sreek Make this templatized benchmark and pass polling_type as a + // param + grpc_completion_queue* cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_cq_completion completion; @@ -114,7 +120,10 @@ static void BM_Pass1Core(benchmark::State& state) { BENCHMARK(BM_Pass1Core); static void BM_Pluck1Core(benchmark::State& state) { - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); + // TODO: sreek Make this templatized benchmark and pass polling_type as a + // param + grpc_completion_queue* cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, DEFAULT_POLLING, NULL); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_cq_completion completion; @@ -130,7 +139,10 @@ static void BM_Pluck1Core(benchmark::State& state) { BENCHMARK(BM_Pluck1Core); static void BM_EmptyCore(benchmark::State& state) { - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); + // TODO: sreek Make this a templatized benchmark and pass polling_type as a + // param + grpc_completion_queue* cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_completion_queue_next(cq, deadline, NULL);