Add timing data

pull/7644/head
Craig Tiller 9 years ago
parent 9e1803ec0b
commit c69d27b67d
  1. 39
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 2
      src/core/ext/transport/chttp2/transport/writing.c
  3. 28
      src/core/lib/iomgr/combiner.c
  4. 8
      src/core/lib/iomgr/workqueue_posix.c

@ -494,6 +494,7 @@ static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) {
GPR_TIMER_BEGIN("init_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@ -539,6 +540,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream,
GRPC_ERROR_NONE);
GPR_TIMER_END("init_stream", 0);
return 0;
}
@ -607,6 +610,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, void *and_free_memory) {
GPR_TIMER_BEGIN("destroy_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@ -614,6 +618,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->destroy_stream,
GRPC_ERROR_NONE);
GPR_TIMER_END("destroy_stream", 0);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@ -691,6 +696,7 @@ static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
bool covered_by_poller, const char *reason) {
GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
switch (t->executor.write_state) {
case GRPC_CHTTP2_WRITES_CORKED:
@ -724,9 +730,11 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
}
break;
}
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_BEGIN("start_writing", 0);
GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
if (!t->closed &&
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
@ -744,6 +752,7 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write"));
UNREF_TRANSPORT(exec_ctx, t, "writing");
}
GPR_TIMER_END("start_writing", 0);
}
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
@ -788,6 +797,7 @@ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
grpc_chttp2_transport *t = tp;
allow_endpoint_shutdown_locked(exec_ctx, t);
@ -805,9 +815,11 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
case GRPC_CHTTP2_WRITE_SCHEDULED:
GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITING:
GPR_TIMER_MARK("state=writing", 0);
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing");
break;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
@ -815,6 +827,7 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
true);
break;
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
@ -824,13 +837,16 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
}
UNREF_TRANSPORT(exec_ctx, t, "writing");
GPR_TIMER_END("terminate_writing_with_lock", 0);
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
void *transport_writing, grpc_error *error) {
GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->terminate_writing,
GRPC_ERROR_REF(error));
GPR_TIMER_END("grpc_chttp2_terminate_writing", 0);
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
@ -1170,6 +1186,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
GPR_TIMER_BEGIN("perform_stream_op", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked,
@ -1179,6 +1196,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GRPC_CHTTP2_STREAM_REF(&s->global, "perform_stream_op");
grpc_combiner_execute(exec_ctx, t->executor.combiner,
&op->transport_private.closure, GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op", 0);
}
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@ -1315,6 +1333,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global) {
GPR_TIMER_BEGIN("check_read_ops", 0);
grpc_chttp2_stream_global *stream_global;
grpc_byte_stream *bs;
while (
@ -1389,6 +1408,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
}
}
GPR_TIMER_END("check_read_ops", 0);
}
static void decrement_active_streams_locked(
@ -1830,13 +1850,17 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
reading_action_locked ->
(parse_unlocked -> post_parse_locked)? ->
post_reading_action_locked */
GPR_TIMER_BEGIN("reading_action", 0);
grpc_chttp2_transport *t = tp;
grpc_combiner_execute(exec_ctx, t->executor.combiner,
&t->reading_action_locked, GRPC_ERROR_REF(error));
GPR_TIMER_END("reading_action", 0);
}
static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport *t = tp;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
@ -1853,6 +1877,8 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
} else {
post_reading_action_locked(exec_ctx, t, error);
}
GPR_TIMER_END("reading_action_locked", 0);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@ -1905,13 +1931,14 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
GRPC_ERROR_UNREF(errors[i]);
}
GPR_TIMER_END("reading_action.parse", 0);
grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked,
err);
GPR_TIMER_END("reading_action.parse", 0);
}
static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_TIMER_BEGIN("post_parse_locked", 0);
grpc_chttp2_transport *t = arg;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
@ -1956,10 +1983,12 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
post_reading_action_locked(exec_ctx, t, error);
GPR_TIMER_END("post_parse_locked", 0);
}
static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_TIMER_BEGIN("post_reading_action_locked", 0);
grpc_chttp2_transport *t = arg;
bool keep_reading = false;
GRPC_ERROR_REF(error);
@ -1992,6 +2021,8 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("post_reading_action_locked", 0);
}
/*******************************************************************************
@ -2112,6 +2143,7 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
gpr_ref(&bs->refs);
@ -2122,6 +2154,7 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
bs);
grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
&bs->next_action.closure, GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return 0;
}
@ -2140,12 +2173,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
bs);
grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
&bs->destroy_action, GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_destroy", 0);
}
typedef struct {
@ -2182,6 +2217,7 @@ static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, int from_parsing_thread) {
GPR_TIMER_BEGIN("grpc_chttp2_incoming_byte_stream_finished", 0);
if (from_parsing_thread) {
grpc_closure_init(&bs->finished_action,
incoming_byte_stream_finished_locked, bs);
@ -2190,6 +2226,7 @@ void grpc_chttp2_incoming_byte_stream_finished(
} else {
incoming_byte_stream_finished_locked(exec_ctx, bs, error);
}
GPR_TIMER_END("grpc_chttp2_incoming_byte_stream_finished", 0);
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(

@ -332,6 +332,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_cleanup_writing(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
@ -370,4 +371,5 @@ void grpc_chttp2_cleanup_writing(
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
}

@ -32,13 +32,15 @@
*/
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/workqueue.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
struct grpc_combiner {
grpc_workqueue *optional_workqueue;
gpr_mpscq queue;
@ -79,15 +81,18 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0);
grpc_combiner *lock = arg;
GPR_ASSERT(exec_ctx->active_combiner == NULL);
exec_ctx->active_combiner = lock;
if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
GPR_ASSERT(exec_ctx->active_combiner == lock);
exec_ctx->active_combiner = NULL;
GPR_TIMER_END("combiner.continue_executing_mainline", 0);
}
static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_TIMER_BEGIN("combiner.execute_final", 0);
grpc_closure *c = lock->final_list.head;
grpc_closure_list_init(&lock->final_list);
lock->take_async_break_before_final_list = false;
@ -98,10 +103,12 @@ static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GRPC_ERROR_UNREF(error);
c = next;
}
GPR_TIMER_END("combiner.execute_final", 0);
}
static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_TIMER_BEGIN("combiner.continue_executing_final", 0);
grpc_combiner *lock = arg;
GPR_ASSERT(exec_ctx->active_combiner == NULL);
exec_ctx->active_combiner = lock;
@ -115,23 +122,28 @@ static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
}
GPR_ASSERT(exec_ctx->active_combiner == lock);
exec_ctx->active_combiner = NULL;
GPR_TIMER_END("combiner.continue_executing_final", 0);
}
static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_TIMER_BEGIN("combiner.start_execute_final", 0);
GPR_ASSERT(exec_ctx->active_combiner == lock);
if (lock->take_async_break_before_final_list) {
grpc_closure_init(&lock->continue_finishing, continue_executing_final,
lock);
grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
GPR_TIMER_END("combiner.start_execute_final", 0);
return false;
} else {
execute_final(exec_ctx, lock);
GPR_TIMER_END("combiner.start_execute_final", 0);
return true;
}
}
static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0);
gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
GPR_ASSERT(exec_ctx->active_combiner == lock);
if (n == NULL) {
@ -141,18 +153,21 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
lock);
grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched"));
GPR_TIMER_END("combiner.maybe_finish_one", 0);
return false;
}
grpc_closure *cl = (grpc_closure *)n;
grpc_error *error = cl->error;
cl->cb(exec_ctx, cl->cb_arg, error);
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("combiner.maybe_finish_one", 0);
return true;
}
static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock) =
maybe_finish_one;
GPR_TIMER_BEGIN("combiner.finish",0);
do {
switch (gpr_atm_full_fetch_add(&lock->state, -2)) {
case 5: // we're down to one queued item: if it's the final list we
@ -162,9 +177,11 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
break;
case 3: // had one count, one unorphaned --> unlocked unorphaned
GPR_TIMER_END("combiner.finish", 0);
return;
case 2: // and one count, one orphaned --> unlocked and orphaned
really_destroy(exec_ctx, lock);
GPR_TIMER_END("combiner.finish", 0);
return;
case 1:
case 0:
@ -173,15 +190,19 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_UNREACHABLE_CODE(return );
}
} while (executor(exec_ctx, lock));
GPR_TIMER_END("combiner.finish", 0);
}
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *cl, grpc_error *error) {
GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GPR_ASSERT(last & 1); // ensure lock has not been destroyed
if (last == 1) {
exec_ctx->active_combiner = lock;
GPR_TIMER_BEGIN("combiner.execute_first_cb", 0);
cl->cb(exec_ctx, cl->cb_arg, error);
GPR_TIMER_END("combiner.execute_first_cb", 0);
GRPC_ERROR_UNREF(error);
finish(exec_ctx, lock);
GPR_ASSERT(exec_ctx->active_combiner == lock);
@ -190,6 +211,7 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
cl->error = error;
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
}
GPR_TIMER_END("combiner.execute", 0);
}
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
@ -201,9 +223,12 @@ static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
bool force_async_break) {
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
grpc_combiner_execute(exec_ctx, lock,
grpc_closure_create(enqueue_finally, closure), error);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
@ -214,6 +239,7 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
gpr_atm_full_fetch_add(&lock->state, 2);
}
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
void grpc_combiner_force_async_finally(grpc_combiner *lock) {

@ -43,6 +43,7 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/iomgr/ev_posix.h"
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
@ -121,6 +122,7 @@ static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
GPR_TIMER_MARK("workqueue.wakeup", 0);
grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) {
drain(exec_ctx, workqueue);
@ -128,6 +130,8 @@ static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.on_readable", 0);
grpc_workqueue *workqueue = arg;
if (error != GRPC_ERROR_NONE) {
@ -172,10 +176,13 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_UNREF(clerr);
}
}
GPR_TIMER_END("workqueue.on_readable", 0);
}
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2);
GPR_ASSERT(last & 1);
closure->error = error;
@ -183,6 +190,7 @@ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
if (last == 1) {
wakeup(exec_ctx, workqueue);
}
GPR_TIMER_END("workqueue.enqueue", 0);
}
#endif /* GPR_POSIX_SOCKET */

Loading…
Cancel
Save