Latency traces

pull/3767/head
Craig Tiller 10 years ago
parent 9bb0161251
commit 86253ca1da
  1. 4
      src/core/iomgr/exec_ctx.c
  2. 2
      src/core/iomgr/fd_posix.c
  3. 7
      src/core/iomgr/pollset_multipoller_with_epoll.c
  4. 7
      src/core/iomgr/pollset_posix.c
  5. 4
      src/core/iomgr/tcp_posix.c
  6. 32
      src/core/profiling/basic_timers.c
  7. 59
      src/core/profiling/timers.h
  8. 7
      src/core/surface/call.c
  9. 11
      src/core/surface/completion_queue.c
  10. 4
      src/core/transport/chttp2_transport.c
  11. 3
      src/cpp/client/channel.cc
  12. 4
      src/cpp/server/server.cc
  13. 7
      test/cpp/qps/client_sync.cc
  14. 233
      tools/profile_analyzer/profile_analyzer.py

@ -35,8 +35,11 @@
#include <grpc/support/log.h>
#include "src/core/profiling/timers.h"
int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
int did_something = 0;
GRPC_TIMER_BEGIN(GRPC_PTAG_EXEC_CTX_FLUSH, 0);
while (!grpc_closure_list_empty(exec_ctx->closure_list)) {
grpc_closure *c = exec_ctx->closure_list.head;
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
@ -47,6 +50,7 @@ int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
c = next;
}
}
GRPC_TIMER_END(GRPC_PTAG_EXEC_CTX_FLUSH, 0);
return did_something;
}

@ -45,6 +45,8 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/profiling/timers.h"
enum descriptor_state {
NOT_READY = 0,
READY = 1

@ -41,10 +41,11 @@
#include <sys/epoll.h>
#include <unistd.h>
#include "src/core/iomgr/fd_posix.h"
#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/iomgr/fd_posix.h"
#include "src/core/support/block_annotate.h"
#include "src/core/profiling/timers.h"
typedef struct wakeup_fd_hdl {
grpc_wakeup_fd wakeup_fd;
@ -182,9 +183,11 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
/* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
even going into the blocking annotation if possible */
GRPC_TIMER_BEGIN(GRPC_PTAG_POLL, 0);
GRPC_SCHEDULING_START_BLOCKING_REGION;
poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
GRPC_SCHEDULING_END_BLOCKING_REGION;
GRPC_TIMER_END(GRPC_PTAG_POLL, 0);
if (poll_rv < 0) {
if (errno != EINTR) {

@ -195,6 +195,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* pollset->mu already held */
int added_worker = 0;
int locked = 1;
GRPC_TIMER_BEGIN(GRPC_PTAG_POLLSET_WORK, 0);
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
@ -223,8 +224,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
added_worker = 1;
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
GRPC_TIMER_BEGIN(GRPC_PTAG_POLLSET_WORK, 0);
pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, deadline,
now);
GRPC_TIMER_END(GRPC_PTAG_POLLSET_WORK, 0);
locked = 0;
gpr_tls_set(&g_current_thread_poller, 0);
gpr_tls_set(&g_current_thread_worker, 0);
@ -261,6 +264,7 @@ done:
gpr_mu_lock(&pollset->mu);
}
}
GRPC_TIMER_END(GRPC_PTAG_POLLSET_WORK, 0);
}
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@ -492,10 +496,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
even going into the blocking annotation if possible */
/* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */
GRPC_TIMER_BEGIN(GRPC_PTAG_POLL, 0);
GRPC_SCHEDULING_START_BLOCKING_REGION;
r = grpc_poll_function(pfd, nfds, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
GRPC_TIMER_END(GRPC_PTAG_POLL, 0);
if (fd) {
grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN,

@ -199,7 +199,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
msg.msg_controllen = 0;
msg.msg_flags = 0;
GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0);
GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 1);
do {
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
@ -316,7 +316,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
msg.msg_controllen = 0;
msg.msg_flags = 0;
GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0);
GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 1);
do {
/* TODO(klempner): Cork if this is a partial write */
sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);

@ -47,16 +47,16 @@
typedef enum {
BEGIN = '{',
END = '}',
MARK = '.',
IMPORTANT = '!'
MARK = '.'
} marker_type;
typedef struct grpc_timer_entry {
gpr_timespec tm;
const char *tagstr;
marker_type type;
const char *file;
int line;
char type;
gpr_uint8 important;
} grpc_timer_entry;
#define MAX_COUNT (1024 * 1024 / sizeof(grpc_timer_entry))
@ -81,10 +81,10 @@ static void log_report() {
grpc_timer_entry *entry = &(g_log[i]);
fprintf(output_file,
"{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": "
"\"%s\", \"file\": \"%s\", \"line\": %d}\n",
"\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n",
entry->tm.tv_sec, entry->tm.tv_nsec,
(void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr,
entry->file, entry->line);
entry->file, entry->line, entry->important);
}
/* Now clear out the log */
@ -92,7 +92,7 @@ static void log_report() {
}
static void grpc_timers_log_add(int tag, const char *tagstr, marker_type type,
void *id, const char *file, int line) {
int important, const char *file, int line) {
grpc_timer_entry *entry;
/* TODO (vpai) : Improve concurrency */
@ -107,34 +107,28 @@ static void grpc_timers_log_add(int tag, const char *tagstr, marker_type type,
entry->type = type;
entry->file = file;
entry->line = line;
entry->important = important != 0;
}
/* Latency profiler API implementation. */
void grpc_timer_add_mark(int tag, const char *tagstr, void *id,
void grpc_timer_add_mark(int tag, const char *tagstr, int important,
const char *file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(tag, tagstr, MARK, id, file, line);
grpc_timers_log_add(tag, tagstr, MARK, important, file, line);
}
}
void grpc_timer_add_important_mark(int tag, const char *tagstr, void *id,
const char *file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(tag, tagstr, IMPORTANT, id, file, line);
}
}
void grpc_timer_begin(int tag, const char *tagstr, void *id, const char *file,
void grpc_timer_begin(int tag, const char *tagstr, int important, const char *file,
int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(tag, tagstr, BEGIN, id, file, line);
grpc_timers_log_add(tag, tagstr, BEGIN, important, file, line);
}
}
void grpc_timer_end(int tag, const char *tagstr, void *id, const char *file,
void grpc_timer_end(int tag, const char *tagstr, int important, const char *file,
int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
grpc_timers_log_add(tag, tagstr, END, id, file, line);
grpc_timers_log_add(tag, tagstr, END, important, file, line);
}
}

@ -41,13 +41,11 @@ extern "C" {
void grpc_timers_global_init(void);
void grpc_timers_global_destroy(void);
void grpc_timer_add_mark(int tag, const char *tagstr, void *id,
void grpc_timer_add_mark(int tag, const char *tagstr, int important,
const char *file, int line);
void grpc_timer_add_important_mark(int tag, const char *tagstr, void *id,
const char *file, int line);
void grpc_timer_begin(int tag, const char *tagstr, void *id, const char *file,
void grpc_timer_begin(int tag, const char *tagstr, int important, const char *file,
int line);
void grpc_timer_end(int tag, const char *tagstr, void *id, const char *file,
void grpc_timer_end(int tag, const char *tagstr, int important, const char *file,
int line);
enum grpc_profiling_tags {
@ -60,21 +58,36 @@ enum grpc_profiling_tags {
/* Re. sockets. */
GRPC_PTAG_HANDLE_READ = 200 + GRPC_PTAG_IGNORE_THRESHOLD,
GRPC_PTAG_SENDMSG = 201 + GRPC_PTAG_IGNORE_THRESHOLD,
GRPC_PTAG_RECVMSG = 202 + GRPC_PTAG_IGNORE_THRESHOLD,
GRPC_PTAG_POLL_FINISHED = 203 + GRPC_PTAG_IGNORE_THRESHOLD,
GRPC_PTAG_SENDMSG = 201,
GRPC_PTAG_RECVMSG = 202,
GRPC_PTAG_POLL = 203,
GRPC_PTAG_TCP_CB_WRITE = 204 + GRPC_PTAG_IGNORE_THRESHOLD,
GRPC_PTAG_TCP_WRITE = 205 + GRPC_PTAG_IGNORE_THRESHOLD,
GRPC_PTAG_CALL_ON_DONE_RECV = 206,
GRPC_PTAG_BECOME_READABLE = 207,
/* C++ */
GRPC_PTAG_CPP_CALL_CREATED = 300 + GRPC_PTAG_IGNORE_THRESHOLD,
GRPC_PTAG_CPP_PERFORM_OPS = 301 + GRPC_PTAG_IGNORE_THRESHOLD,
GRPC_PTAG_CLIENT_UNARY_CALL = 302,
GRPC_PTAG_SERVER_CALL = 303,
GRPC_PTAG_SERVER_CALLBACK = 304,
/* Transports */
GRPC_PTAG_HTTP2_RECV_DATA = 400,
GRPC_PTAG_HTTP2_UNLOCK = 401 + GRPC_PTAG_IGNORE_THRESHOLD,
GRPC_PTAG_HTTP2_UNLOCK_CLEANUP = 402 + GRPC_PTAG_IGNORE_THRESHOLD,
/* Completion queue */
GRPC_PTAG_CQ_NEXT = 501,
GRPC_PTAG_CQ_PLUCK = 502,
GRPC_PTAG_POLLSET_WORK = 503,
GRPC_PTAG_EXEC_CTX_FLUSH = 504,
/* Surface */
GRPC_PTAG_CALL_START_BATCH = 600,
GRPC_PTAG_CALL_ON_DONE_RECV = 601,
GRPC_PTAG_CALL_UNLOCK = 602,
/* > 1024 Unassigned reserved. For any miscellaneous use.
* Use addition to generate tags from this base or take advantage of the 10
* zero'd bits for OR-ing. */
@ -83,19 +96,15 @@ enum grpc_profiling_tags {
#if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER))
/* No profiling. No-op all the things. */
#define GRPC_TIMER_MARK(tag, id) \
#define GRPC_TIMER_MARK(tag, important) \
do { \
} while (0)
#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \
do { \
} while (0)
#define GRPC_TIMER_BEGIN(tag, id) \
#define GRPC_TIMER_BEGIN(tag, important) \
do { \
} while (0)
#define GRPC_TIMER_END(tag, id) \
#define GRPC_TIMER_END(tag, important) \
do { \
} while (0)
@ -106,27 +115,21 @@ enum grpc_profiling_tags {
#endif
/* Generic profiling interface. */
#define GRPC_TIMER_MARK(tag, id) \
#define GRPC_TIMER_MARK(tag, important) \
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
grpc_timer_add_mark(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \
grpc_timer_add_mark(tag, #tag, important, __FILE__, \
__LINE__); \
}
#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
grpc_timer_add_important_mark(tag, #tag, ((void *)(gpr_intptr)(id)), \
__FILE__, __LINE__); \
}
#define GRPC_TIMER_BEGIN(tag, id) \
#define GRPC_TIMER_BEGIN(tag, important) \
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
grpc_timer_begin(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \
grpc_timer_begin(tag, #tag, important, __FILE__, \
__LINE__); \
}
#define GRPC_TIMER_END(tag, id) \
#define GRPC_TIMER_END(tag, important) \
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
grpc_timer_end(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
grpc_timer_end(tag, #tag, important, __FILE__, __LINE__); \
}
#ifdef GRPC_STAP_PROFILER

@ -607,6 +607,8 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) {
const size_t MAX_RECV_PEEK_AHEAD = 65536;
size_t buffered_bytes;
GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_UNLOCK, 0);
memset(&op, 0, sizeof(op));
op.cancel_with_status = call->cancel_with_status;
@ -677,6 +679,8 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) {
unlock(exec_ctx, call);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completing");
}
GRPC_TIMER_END(GRPC_PTAG_CALL_UNLOCK, 0);
}
static void get_final_status(grpc_call *call, grpc_ioreq_data out) {
@ -1589,6 +1593,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
grpc_call_error error;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_START_BATCH, 0);
GRPC_API_TRACE(
"grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)",
5, (call, ops, (unsigned long)nops, tag, reserved));
@ -1826,6 +1832,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
finish_func, tag);
done:
grpc_exec_ctx_finish(&exec_ctx);
GRPC_TIMER_END(GRPC_PTAG_CALL_START_BATCH, 0);
return error;
}

@ -42,6 +42,7 @@
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
#include "src/core/surface/surface_trace.h"
#include "src/core/profiling/timers.h"
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
@ -184,6 +185,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec now;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_TIMER_BEGIN(GRPC_PTAG_CQ_NEXT, 0);
GRPC_API_TRACE(
"grpc_completion_queue_next("
"cc=%p, "
@ -230,6 +233,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
grpc_exec_ctx_finish(&exec_ctx);
GRPC_TIMER_END(GRPC_PTAG_CQ_NEXT, 0);
return ret;
}
@ -268,6 +274,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
int first_loop = 1;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_TIMER_BEGIN(GRPC_PTAG_CQ_PLUCK, 0);
GRPC_API_TRACE(
"grpc_completion_queue_pluck("
"cc=%p, tag=%p, "
@ -333,6 +341,9 @@ done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
grpc_exec_ctx_finish(&exec_ctx);
GRPC_TIMER_END(GRPC_PTAG_CQ_PLUCK, 0);
return ret;
}

@ -1103,6 +1103,8 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
int keep_reading = 0;
grpc_chttp2_transport *t = tp;
GRPC_TIMER_BEGIN(GRPC_PTAG_HTTP2_RECV_DATA, 0);
lock(t);
i = 0;
GPR_ASSERT(!t->parsing_active);
@ -1154,6 +1156,8 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
} else {
UNREF_TRANSPORT(exec_ctx, t, "recv_data");
}
GRPC_TIMER_END(GRPC_PTAG_HTTP2_RECV_DATA, 0);
}
/*

@ -78,7 +78,6 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
context->raw_deadline(), nullptr);
}
grpc_census_call_set_context(c_call, context->census_context());
GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call);
context->set_call(c_call, shared_from_this());
return Call(c_call, this, cq);
}
@ -87,11 +86,9 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
}
void* Channel::RegisterMethod(const char* method) {

@ -541,6 +541,7 @@ void Server::ScheduleCallback() {
void Server::RunRpc() {
// Wait for one more incoming rpc.
bool ok;
GRPC_TIMER_BEGIN(GRPC_PTAG_SERVER_CALL, 0);
auto* mrd = SyncRequest::Wait(&cq_, &ok);
if (mrd) {
ScheduleCallback();
@ -556,9 +557,12 @@ void Server::RunRpc() {
mrd->TeardownRequest();
}
}
GRPC_TIMER_BEGIN(GRPC_PTAG_SERVER_CALLBACK, 0);
cd.Run();
GRPC_TIMER_END(GRPC_PTAG_SERVER_CALLBACK, 0);
}
}
GRPC_TIMER_END(GRPC_PTAG_SERVER_CALL, 0);
{
grpc::unique_lock<grpc::mutex> lock(mu_);

@ -59,6 +59,8 @@
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
#include "src/core/profiling/timers.h"
namespace grpc {
namespace testing {
@ -100,8 +102,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = Timer::Now();
grpc::ClientContext context;
GRPC_TIMER_BEGIN(GRPC_PTAG_CLIENT_UNARY_CALL, 0);
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
GRPC_TIMER_END(GRPC_PTAG_CLIENT_UNARY_CALL, 0);
histogram->Add((Timer::Now() - start) * 1e9);
return s.ok();
}
@ -136,11 +140,14 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
double start = Timer::Now();
GRPC_TIMER_BEGIN(GRPC_PTAG_CLIENT_UNARY_CALL, 0);
if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) {
GRPC_TIMER_END(GRPC_PTAG_CLIENT_UNARY_CALL, 0);
histogram->Add((Timer::Now() - start) * 1e9);
return true;
}
GRPC_TIMER_END(GRPC_PTAG_CLIENT_UNARY_CALL, 0);
return false;
}

@ -1,7 +1,11 @@
#!/usr/bin/env python2.7
import json
import collections
import hashlib
import itertools
import json
import math
import tabulate
import time
SELF_TIME = object()
@ -13,98 +17,187 @@ TIME_TO_STACK_END = object()
class LineItem(object):
def __init__(self, line, indent):
self.tag = line['tag']
self.indent = indent
self.time_stamp = line['t']
self.important = line['type'] == '!'
self.times = {}
def __init__(self, line, indent):
self.tag = line['tag']
self.indent = indent
self.start_time = line['t']
self.end_time = None
self.important = line['imp']
self.times = {}
class ScopeBuilder(object):
def __init__(self, call_stack_builder, line):
self.call_stack_builder = call_stack_builder
self.indent = len(call_stack_builder.stk)
self.top_line = LineItem(line, self.indent)
call_stack_builder.lines.append(self.top_line)
self.first_child_pos = len(call_stack_builder.lines)
def __init__(self, call_stack_builder, line):
self.call_stack_builder = call_stack_builder
self.indent = len(call_stack_builder.stk)
self.top_line = LineItem(line, self.indent)
call_stack_builder.lines.append(self.top_line)
self.first_child_pos = len(call_stack_builder.lines)
def mark(self, line):
pass
def mark(self, line):
line_item = LineItem(line, self.indent + 1)
line_item.end_time = line_item.start_time
self.call_stack_builder.lines.append(line_item)
def finish(self, line):
assert line['tag'] == self.top_line.tag
final_time_stamp = line['t']
assert SELF_TIME not in self.top_line.times
self.top_line.tims[SELF_TIME] = final_time_stamp - self.top_line.time_stamp
for line in self.call_stack_builder.lines[self.first_child_pos:]:
if TIME_FROM_SCOPE_START not in line.times:
line[TIME_FROM_SCOPE_START] = line.time_stamp - self.top_line.time_stamp
line[TIME_TO_SCOPE_END] = final_time_stamp - line.time_stamp
def finish(self, line):
assert line['tag'] == self.top_line.tag
final_time_stamp = line['t']
assert self.top_line.end_time is None
self.top_line.end_time = final_time_stamp
assert SELF_TIME not in self.top_line.times
self.top_line.times[SELF_TIME] = final_time_stamp - self.top_line.start_time
for line in self.call_stack_builder.lines[self.first_child_pos:]:
if TIME_FROM_SCOPE_START not in line.times:
line.times[TIME_FROM_SCOPE_START] = line.start_time - self.top_line.start_time
line.times[TIME_TO_SCOPE_END] = final_time_stamp - line.end_time
class CallStackBuilder(object):
def __init__(self):
self.stk = []
self.signature = ''
self.lines = []
def add(self, line):
line_type = line['type']
self.signature = '%s%s%s' % (self.signature, line_type, line['tag'])
if line_type == '{':
self.stk.append(ScopeBuilder(self, line))
return False
elif line_type == '}':
self.stk.pop().finish(line)
return not self.stk
elif line_type == '.' or line_type == '!':
self.stk[-1].mark(line, True)
return False
else:
raise Exception('Unknown line type: \'%s\'' % line_type)
def __init__(self):
self.stk = []
self.signature = hashlib.md5()
self.lines = []
def finish(self):
start_time = self.lines[0].start_time
end_time = self.lines[0].end_time
self.signature = self.signature.hexdigest()
for line in self.lines:
line.times[TIME_FROM_STACK_START] = line.start_time - start_time
line.times[TIME_TO_STACK_END] = end_time - line.end_time
def add(self, line):
line_type = line['type']
self.signature.update(line_type)
self.signature.update(line['tag'])
if line_type == '{':
self.stk.append(ScopeBuilder(self, line))
return False
elif line_type == '}':
self.stk.pop().finish(line)
if not self.stk:
self.finish()
return True
return False
elif line_type == '.' or line_type == '!':
self.stk[-1].mark(line)
return False
else:
raise Exception('Unknown line type: \'%s\'' % line_type)
class CallStack(object):
def __init__(self, initial_call_stack_builder):
self.count = 1
self.signature = initial_call_stack_builder.signature
self.lines = initial_call_stack_builder.lines
for line in lines:
for key, val in line.times.items():
line.times[key] = [val]
def add(self, call_stack_builder):
assert self.signature == call_stack_builder.signature
self.count += 1
assert len(self.lines) == len(call_stack_builder.lines)
for lsum, line in itertools.izip(self.lines, call_stack_builder.lines):
assert lsum.tag == line.tag
assert lsum.times.keys() == line.times.keys()
for k, lst in lsum.times.iterkeys():
lst.append(line.times[k])
class CallStack(object):
def __init__(self, initial_call_stack_builder):
self.count = 1
self.signature = initial_call_stack_builder.signature
self.lines = initial_call_stack_builder.lines
for line in self.lines:
for key, val in line.times.items():
line.times[key] = [val]
def add(self, call_stack_builder):
assert self.signature == call_stack_builder.signature
self.count += 1
assert len(self.lines) == len(call_stack_builder.lines)
for lsum, line in itertools.izip(self.lines, call_stack_builder.lines):
assert lsum.tag == line.tag
assert lsum.times.keys() == line.times.keys()
for k, lst in lsum.times.iteritems():
lst.append(line.times[k])
def finish(self):
for line in self.lines:
for lst in line.times.itervalues():
lst.sort()
builder = collections.defaultdict(CallStackBuilder)
call_stacks = collections.defaultdict(CallStack)
print 'Loading...'
lines = 0
start = time.time()
with open('latency_trace.txt') as f:
for line in f:
lines += 1
inf = json.loads(line)
thd = inf['thd']
cs = builder[thd]
if cs.add(inf):
if cs.signature in call_stacks:
call_stacks[cs.signature].add(cs)
else:
call_stacks[cs.signature] = CallStack(cs)
del builder[thd]
if cs.signature in call_stacks:
call_stacks[cs.signature].add(cs)
else:
call_stacks[cs.signature] = CallStack(cs)
del builder[thd]
time_taken = time.time() - start
print 'Read %d lines in %f seconds (%f lines/sec)' % (lines, time_taken, lines / time_taken)
print 'Analyzing...'
call_stacks = sorted(call_stacks.values(), key=lambda cs: cs.count, reverse=True)
for cs in call_stacks:
cs.finish()
print 'Writing report...'
def percentile(N, percent, key=lambda x:x):
"""
Find the percentile of a list of values.
@parameter N - is a list of values. Note N MUST BE already sorted.
@parameter percent - a float value from 0.0 to 1.0.
@parameter key - optional key function to compute value from each element of N.
@return - the percentile of the values
"""
if not N:
return None
k = (len(N)-1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return key(N[int(k)])
d0 = key(N[int(f)]) * (c-k)
d1 = key(N[int(c)]) * (k-f)
return d0+d1
def tidy_tag(tag):
if tag[0:10] == 'GRPC_PTAG_':
return tag[10:]
return tag
def time_string(values):
num_values = len(values)
return '%.1f/%.1f/%.1f' % (
1e6 * percentile(values, 0.5),
1e6 * percentile(values, 0.9),
1e6 * percentile(values, 0.99))
def time_format(idx):
def ent(line, idx=idx):
if idx in line.times:
return time_string(line.times[idx])
return ''
return ent
FORMAT = [
('TAG', lambda line: '..'*line.indent + tidy_tag(line.tag)),
('FROM_STACK_START', time_format(TIME_FROM_STACK_START)),
('SELF', time_format(SELF_TIME)),
('TO_STACK_END', time_format(TIME_TO_STACK_END)),
('FROM_SCOPE_START', time_format(TIME_FROM_SCOPE_START)),
('SELF', time_format(SELF_TIME)),
('TO_SCOPE_END', time_format(TIME_TO_SCOPE_END)),
]
for cs in call_stacks:
print cs.signature
print cs.count
print cs.count
header, _ = zip(*FORMAT)
table = []
for line in cs.lines:
fields = []
for _, fn in FORMAT:
fields.append(fn(line))
table.append(fields)
print tabulate.tabulate(table, header, tablefmt="simple")

Loading…
Cancel
Save