Update C++ code

reviewable/pr9972/r1
Sree Kuchibhotla 8 years ago
parent e2119ac808
commit f2c32150ef
  1. 2
      include/grpc++/impl/codegen/client_unary_call.h
  2. 19
      include/grpc++/impl/codegen/completion_queue.h
  3. 7
      include/grpc++/impl/codegen/core_codegen.h
  4. 7
      include/grpc++/impl/codegen/core_codegen_interface.h
  5. 8
      include/grpc++/impl/codegen/sync_stream.h
  6. 11
      src/cpp/common/core_codegen.cc
  7. 5
      src/cpp/server/server_cc.cc
  8. 13
      test/cpp/grpclb/grpclb_test.cc
  9. 3
      test/cpp/microbenchmarks/bm_call_create.cc
  10. 12
      test/cpp/microbenchmarks/bm_cq.cc

@ -52,7 +52,7 @@ template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const InputMessage& request,
OutputMessage* result) {
CompletionQueue cq(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING);
CompletionQueue cq(true); // Pluckable completion queue
Call call(channel->CreateCall(method, context, &cq));
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,

@ -103,7 +103,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
public:
/// Default constructor. Implicitly creates a \a grpc_completion_queue
/// instance.
CompletionQueue() : CompletionQueue(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING) {}
CompletionQueue() : CompletionQueue(false) {}
/// Wrap \a take, taking ownership of the instance.
///
@ -147,8 +147,9 @@ class CompletionQueue : private GrpcLibraryCodegen {
///
/// \return true if read a regular event, false if the queue is shutting down.
bool Next(void** tag, bool* ok) {
return (AsyncNextInternal(tag, ok, g_core_codegen_interface->gpr_inf_future(
GPR_CLOCK_REALTIME)) != SHUTDOWN);
return (AsyncNextInternal(tag, ok,
g_core_codegen_interface->gpr_inf_future(
GPR_CLOCK_REALTIME)) != SHUTDOWN);
}
/// Request the shutdown of the queue.
@ -217,10 +218,14 @@ class CompletionQueue : private GrpcLibraryCodegen {
OutputMessage* result);
/// Private constructor of CompletionQueue only visible to friend classes
CompletionQueue(grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type) {
cq_ = g_core_codegen_interface->grpc_completion_queue_create(
completion_type, polling_type, nullptr);
CompletionQueue(bool is_pluck) {
if (is_pluck) {
cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_pluck(
nullptr);
} else {
cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_next(
nullptr);
}
InitialAvalanching(); // reserve this for the future shutdown
}

@ -46,9 +46,10 @@ namespace grpc {
/// Implementation of the core codegen interface.
class CoreCodegen : public CoreCodegenInterface {
private:
grpc_completion_queue* grpc_completion_queue_create(
grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type, void* reserved) override;
grpc_completion_queue* grpc_completion_queue_create_for_next(
void* reserved) override;
grpc_completion_queue* grpc_completion_queue_create_for_pluck(
void* reserved) override;
void grpc_completion_queue_destroy(grpc_completion_queue* cq) override;
grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline,

@ -60,9 +60,10 @@ class CoreCodegenInterface {
virtual void assert_fail(const char* failed_assertion, const char* file,
int line) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create(
grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type, void* reserved) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create_for_next(
void* reserved) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create_for_pluck(
void* reserved) = 0;
virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0;
virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq,
void* tag,

@ -138,7 +138,7 @@ class ClientReader final : public ClientReaderInterface<R> {
ClientReader(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const W& request)
: context_(context),
cq_(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING),
cq_(true), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose>
@ -212,7 +212,7 @@ class ClientWriter : public ClientWriterInterface<W> {
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, R* response)
: context_(context),
cq_(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING),
cq_(true), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
@ -297,7 +297,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context),
cq_(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING),
cq_(true), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_,
@ -512,7 +512,7 @@ class ServerReaderWriterBody final {
Call* const call_;
ServerContext* const ctx_;
};
}
} // namespace internal
// class to represent the user API for a bidirectional streaming call
template <class W, class R>

@ -54,11 +54,14 @@ struct grpc_byte_buffer;
namespace grpc {
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next(
void* reserved) {
return ::grpc_completion_queue_create(completion_type, polling_type,
reserved);
return ::grpc_completion_queue_create_for_next(reserved);
}
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_pluck(
void* reserved) {
return ::grpc_completion_queue_create_for_pluck(reserved);
}
void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) {

@ -153,10 +153,7 @@ class Server::SyncRequest final : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
void SetupRequest() {
cq_ = grpc_completion_queue_create(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
nullptr);
}
void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);

@ -354,8 +354,9 @@ static void start_backend_server(server_fixture *sf) {
}
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
const string expected_token =
strlen(sf->lb_token_prefix) == 0 ? "" : sf->lb_token_prefix +
std::to_string(sf->port);
strlen(sf->lb_token_prefix) == 0
? ""
: sf->lb_token_prefix + std::to_string(sf->port);
GPR_ASSERT(contains_metadata(&request_metadata_recv, "lb-token",
expected_token.c_str()));
@ -593,8 +594,7 @@ static void setup_client(const server_fixture *lb_server,
grpc_channel_args_copy_and_add(NULL, &expected_target_arg, 1);
gpr_free(expected_target_names);
cf->cq =
grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
cf->cq = grpc_completion_queue_create_for_next(NULL);
cf->server_uri = lb_uri;
grpc_channel_credentials *fake_creds =
grpc_fake_transport_security_credentials_create();
@ -617,8 +617,7 @@ static void teardown_client(client_fixture *cf) {
static void setup_server(const char *host, server_fixture *sf) {
int assigned_port;
sf->cq =
grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
sf->cq = grpc_completion_queue_create_for_next(NULL);
const char *colon_idx = strchr(host, ':');
if (colon_idx) {
const char *port_str = colon_idx + 1;
@ -647,7 +646,7 @@ static void teardown_server(server_fixture *sf) {
gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport);
grpc_completion_queue *shutdown_cq =
grpc_completion_queue_create(GRPC_CQ_PLUCK, GRPC_CQ_NON_POLLING, NULL);
grpc_completion_queue_create_for_pluck(NULL);
grpc_server_shutdown_and_notify(sf->server, shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),

@ -114,8 +114,7 @@ template <class Fixture>
static void BM_CallCreateDestroy(benchmark::State &state) {
TrackCounters track_counters;
Fixture fixture;
grpc_completion_queue *cq =
grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
grpc_completion_queue *cq = grpc_completion_queue_create_for_next(NULL);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
void *method_hdl =
grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL);

@ -64,8 +64,7 @@ static void BM_CreateDestroyCore(benchmark::State& state) {
while (state.KeepRunning()) {
// TODO: sreek Templatize this benchmark and pass completion type and
// polling type as parameters
grpc_completion_queue_destroy(grpc_completion_queue_create(
GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL));
grpc_completion_queue_destroy(grpc_completion_queue_create_for_next(NULL));
}
track_counters.Finish(state);
}
@ -102,8 +101,7 @@ BENCHMARK(BM_Pass1Cpp);
static void BM_Pass1Core(benchmark::State& state) {
TrackCounters track_counters;
// TODO: sreek Templatize this benchmark and pass polling_type as a param
grpc_completion_queue* cq =
grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(NULL);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
while (state.KeepRunning()) {
grpc_cq_completion completion;
@ -122,8 +120,7 @@ BENCHMARK(BM_Pass1Core);
static void BM_Pluck1Core(benchmark::State& state) {
TrackCounters track_counters;
// TODO: sreek Templatize this benchmark and pass polling_type as a param
grpc_completion_queue* cq = grpc_completion_queue_create(
GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, NULL);
grpc_completion_queue* cq = grpc_completion_queue_create_for_pluck(NULL);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
while (state.KeepRunning()) {
grpc_cq_completion completion;
@ -142,8 +139,7 @@ BENCHMARK(BM_Pluck1Core);
static void BM_EmptyCore(benchmark::State& state) {
TrackCounters track_counters;
// TODO: sreek Templatize this benchmark and pass polling_type as a param
grpc_completion_queue* cq =
grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(NULL);
gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
while (state.KeepRunning()) {
grpc_completion_queue_next(cq, deadline, NULL);

Loading…
Cancel
Save