Expand benchmark coverage to isolated grpc_call scenarios against a dummy filter stack

pull/10189/head
Craig Tiller 8 years ago
parent c290bc73f9
commit b7f35a658b
  1. 37
      src/core/lib/surface/channel.c
  2. 5
      src/core/lib/surface/channel.h
  3. 208
      test/cpp/microbenchmarks/bm_call_create.cc

@ -83,19 +83,10 @@ struct grpc_channel {
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *input_args,
grpc_channel_stack_type channel_stack_type,
grpc_transport *optional_transport) {
grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder,
input_args);
grpc_channel_stack_builder_set_target(builder, target);
grpc_channel_stack_builder_set_transport(builder, optional_transport);
if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) {
grpc_channel_stack_builder_destroy(exec_ctx, builder);
return NULL;
}
grpc_channel *grpc_channel_create_with_builder(
grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder,
grpc_channel_stack_type channel_stack_type) {
char *target = gpr_strdup(grpc_channel_stack_builder_get_target(builder));
grpc_channel_args *args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
grpc_channel *channel;
@ -106,11 +97,12 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
gpr_log(GPR_ERROR, "channel stack builder failed: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
gpr_free(target);
goto done;
}
memset(channel, 0, sizeof(*channel));
channel->target = gpr_strdup(target);
channel->target = target;
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
@ -177,6 +169,23 @@ done:
return channel;
}
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *input_args,
grpc_channel_stack_type channel_stack_type,
grpc_transport *optional_transport) {
grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder,
input_args);
grpc_channel_stack_builder_set_target(builder, target);
grpc_channel_stack_builder_set_transport(builder, optional_transport);
if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) {
grpc_channel_stack_builder_destroy(exec_ctx, builder);
return NULL;
}
return grpc_channel_create_with_builder(exec_ctx, builder,
channel_stack_type);
}
char *grpc_channel_get_target(grpc_channel *channel) {
GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel));
return gpr_strdup(channel->target);

@ -35,6 +35,7 @@
#define GRPC_CORE_LIB_SURFACE_CHANNEL_H
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_stack_type.h"
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
@ -42,6 +43,10 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
grpc_channel_stack_type channel_stack_type,
grpc_transport *optional_transport);
grpc_channel *grpc_channel_create_with_builder(
grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder,
grpc_channel_stack_type channel_stack_type);
/** Create a call given a grpc_channel, in order to call \a method.
Progress is tied to activity on \a pollset_set. The returned call object is
meant to be used with \a grpc_call_start_batch_and_execute, which relies on

@ -53,6 +53,7 @@ extern "C" {
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/channel/message_size_filter.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/transport_impl.h"
}
@ -85,6 +86,9 @@ BENCHMARK(BM_Zalloc)
->Arg(6144)
->Arg(7168);
////////////////////////////////////////////////////////////////////////////////
// Benchmarks creating full stacks
class BaseChannelFixture {
public:
BaseChannelFixture(grpc_channel *channel) : channel_(channel) {}
@ -130,6 +134,9 @@ static void BM_CallCreateDestroy(benchmark::State &state) {
BENCHMARK_TEMPLATE(BM_CallCreateDestroy, InsecureChannel);
BENCHMARK_TEMPLATE(BM_CallCreateDestroy, LameChannel);
////////////////////////////////////////////////////////////////////////////////
// Benchmarks isolating individual filters
static void *tag(int i) {
return reinterpret_cast<void *>(static_cast<intptr_t>(i));
}
@ -460,4 +467,205 @@ typedef Fixture<&grpc_load_reporting_filter, CHECKS_NOT_LAST>
BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, LoadReportingFilter, SendEmptyMetadata);
////////////////////////////////////////////////////////////////////////////////
// Benchmarks isolating grpc_call
namespace isolated_call_filter {
static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
if (op->recv_initial_metadata) {
grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
if (op->recv_message) {
grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_NONE);
}
grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE);
}
static void StartTransportOp(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_transport_op *op) {
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
}
static grpc_error *InitCallElem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
return GRPC_ERROR_NONE;
}
static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent) {}
static void DestroyCallElem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
void *and_free_memory) {
gpr_free(and_free_memory);
}
grpc_error *InitChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_channel_element_args *args) {
return GRPC_ERROR_NONE;
}
void DestroyChannelElem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {}
char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
return gpr_strdup("peer");
}
void GetChannelInfo(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
const grpc_channel_info *channel_info) {}
static const grpc_channel_filter isolated_call_filter = {
StartTransportStreamOp,
StartTransportOp,
0,
InitCallElem,
SetPollsetOrPollsetSet,
DestroyCallElem,
0,
InitChannelElem,
DestroyChannelElem,
GetPeer,
GetChannelInfo,
"isolated_call_filter"};
}
class IsolatedCallFixture : public TrackCounters {
public:
IsolatedCallFixture() {
grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_name(builder, "dummy");
grpc_channel_stack_builder_set_target(builder, "dummy_target");
GPR_ASSERT(grpc_channel_stack_builder_append_filter(
builder, &isolated_call_filter::isolated_call_filter, NULL, NULL));
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
channel_ = grpc_channel_create_with_builder(&exec_ctx, builder,
GRPC_CLIENT_CHANNEL);
grpc_exec_ctx_finish(&exec_ctx);
}
cq_ = grpc_completion_queue_create(NULL);
}
void Finish(benchmark::State &state) {
grpc_completion_queue_destroy(cq_);
grpc_channel_destroy(channel_);
TrackCounters::Finish(state);
}
grpc_channel *channel() const { return channel_; }
grpc_completion_queue *cq() const { return cq_; }
private:
grpc_completion_queue *cq_;
grpc_channel *channel_;
};
static void BM_IsolatedCall_NoOp(benchmark::State &state) {
IsolatedCallFixture fixture;
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
void *method_hdl =
grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL);
while (state.KeepRunning()) {
grpc_call_destroy(grpc_channel_create_registered_call(
fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(),
method_hdl, deadline, NULL));
}
fixture.Finish(state);
}
BENCHMARK(BM_IsolatedCall_NoOp);
static void BM_IsolatedCall_Unary(benchmark::State &state) {
IsolatedCallFixture fixture;
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
void *method_hdl =
grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL);
grpc_slice slice = grpc_slice_from_static_string("hello world");
grpc_byte_buffer *send_message = grpc_raw_byte_buffer_create(&slice, 1);
grpc_byte_buffer *recv_message = NULL;
grpc_status_code status_code;
grpc_slice status_details = grpc_empty_slice();
grpc_metadata_array recv_initial_metadata;
grpc_metadata_array_init(&recv_initial_metadata);
grpc_metadata_array recv_trailing_metadata;
grpc_metadata_array_init(&recv_trailing_metadata);
grpc_op ops[6];
memset(ops, 0, sizeof(ops));
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ops[1].data.send_message.send_message = send_message;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message.recv_message = &recv_message;
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[5].data.recv_status_on_client.status = &status_code;
ops[5].data.recv_status_on_client.status_details = &status_details;
ops[5].data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata;
while (state.KeepRunning()) {
grpc_call *call = grpc_channel_create_registered_call(
fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(),
method_hdl, deadline, NULL);
grpc_call_start_batch(call, ops, 6, tag(1), NULL);
grpc_completion_queue_next(fixture.cq(),
gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
grpc_call_destroy(call);
}
fixture.Finish(state);
grpc_metadata_array_destroy(&recv_initial_metadata);
grpc_metadata_array_destroy(&recv_trailing_metadata);
grpc_byte_buffer_destroy(send_message);
}
BENCHMARK(BM_IsolatedCall_Unary);
static void BM_IsolatedCall_StreamingSend(benchmark::State &state) {
IsolatedCallFixture fixture;
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
void *method_hdl =
grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL);
grpc_slice slice = grpc_slice_from_static_string("hello world");
grpc_byte_buffer *send_message = grpc_raw_byte_buffer_create(&slice, 1);
grpc_metadata_array recv_initial_metadata;
grpc_metadata_array_init(&recv_initial_metadata);
grpc_metadata_array recv_trailing_metadata;
grpc_metadata_array_init(&recv_trailing_metadata);
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata;
grpc_call *call = grpc_channel_create_registered_call(
fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(),
method_hdl, deadline, NULL);
grpc_call_start_batch(call, ops, 2, tag(1), NULL);
grpc_completion_queue_next(fixture.cq(), gpr_inf_future(GPR_CLOCK_MONOTONIC),
NULL);
memset(ops, 0, sizeof(ops));
ops[0].op = GRPC_OP_SEND_MESSAGE;
ops[0].data.send_message.send_message = send_message;
while (state.KeepRunning()) {
grpc_call_start_batch(call, ops, 1, tag(2), NULL);
grpc_completion_queue_next(fixture.cq(),
gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
}
grpc_call_destroy(call);
fixture.Finish(state);
grpc_metadata_array_destroy(&recv_initial_metadata);
grpc_metadata_array_destroy(&recv_trailing_metadata);
grpc_byte_buffer_destroy(send_message);
}
BENCHMARK(BM_IsolatedCall_StreamingSend);
BENCHMARK_MAIN();

Loading…
Cancel
Save