Merge pull request #4032 from ctiller/better-profile

Lower latency profiling
pull/4089/head^2
Nicolas Noble 9 years ago
commit 447c795bb8
  1. 1
      .gitignore
  2. 185
      src/core/profiling/basic_timers.c
  3. 2
      src/core/profiling/timers.h
  4. 9
      src/core/support/thd_posix.c
  5. 2
      src/core/transport/chttp2/writing.c
  6. 11
      test/core/fling/client.c
  7. 7
      test/core/fling/server.c
  8. 3
      tools/profiling/latency_profile/profile_analyzer.py

1
.gitignore vendored

@ -38,6 +38,7 @@ cache.mk
# Temporary test reports
report.xml
latency_trace.txt
latency_trace.*.txt
# port server log
portlog.txt

@ -53,50 +53,186 @@ typedef struct gpr_timer_entry {
short line;
char type;
gpr_uint8 important;
int thd;
} gpr_timer_entry;
#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry))
#define MAX_COUNT 1000000
static __thread gpr_timer_entry g_log[MAX_COUNT];
static __thread int g_count;
typedef struct gpr_timer_log {
size_t num_entries;
struct gpr_timer_log *next;
struct gpr_timer_log *prev;
gpr_timer_entry log[MAX_COUNT];
} gpr_timer_log;
typedef struct gpr_timer_log_list {
gpr_timer_log *head;
/* valid iff head!=NULL */
gpr_timer_log *tail;
} gpr_timer_log_list;
static __thread gpr_timer_log *g_thread_log;
static gpr_once g_once_init = GPR_ONCE_INIT;
static FILE *output_file;
static const char *output_filename = "latency_trace.txt";
static pthread_mutex_t g_mu;
static pthread_cond_t g_cv;
static gpr_timer_log_list g_in_progress_logs;
static gpr_timer_log_list g_done_logs;
static int g_shutdown;
static gpr_thd_id g_writing_thread;
static __thread int g_thread_id;
static int g_next_thread_id;
static void close_output() { fclose(output_file); }
static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) {
if (list->head == NULL) {
list->head = list->tail = log;
log->next = log->prev = NULL;
return 1;
} else {
log->prev = list->tail;
log->next = NULL;
list->tail->next = log;
list->tail = log;
return 0;
}
}
static void init_output() {
output_file = fopen("latency_trace.txt", "w");
GPR_ASSERT(output_file);
atexit(close_output);
static gpr_timer_log *timer_log_pop_front(gpr_timer_log_list *list) {
gpr_timer_log *out = list->head;
if (out != NULL) {
list->head = out->next;
if (list->head != NULL) {
list->head->prev = NULL;
} else {
list->tail = NULL;
}
}
return out;
}
static void log_report() {
int i;
gpr_once_init(&g_once_init, init_output);
for (i = 0; i < g_count; i++) {
gpr_timer_entry *entry = &(g_log[i]);
static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) {
if (log->prev == NULL) {
list->head = log->next;
if (list->head != NULL) {
list->head->prev = NULL;
}
} else {
log->prev->next = log->next;
}
if (log->next == NULL) {
list->tail = log->prev;
if (list->tail != NULL) {
list->tail->next = NULL;
}
} else {
log->next->prev = log->prev;
}
}
static void write_log(gpr_timer_log *log) {
size_t i;
if (output_file == NULL) {
output_file = fopen(output_filename, "w");
}
for (i = 0; i < log->num_entries; i++) {
gpr_timer_entry *entry = &(log->log[i]);
if (gpr_time_cmp(entry->tm, gpr_time_0(entry->tm.clock_type)) < 0) {
entry->tm = gpr_time_0(entry->tm.clock_type);
}
fprintf(output_file,
"{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": "
"{\"t\": %ld.%09d, \"thd\": \"%d\", \"type\": \"%c\", \"tag\": "
"\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n",
entry->tm.tv_sec, entry->tm.tv_nsec,
(void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr,
entry->file, entry->line, entry->important);
entry->tm.tv_sec, entry->tm.tv_nsec, entry->thd, entry->type,
entry->tagstr, entry->file, entry->line, entry->important);
}
}
static void writing_thread(void *unused) {
gpr_timer_log *log;
pthread_mutex_lock(&g_mu);
for (;;) {
while ((log = timer_log_pop_front(&g_done_logs)) == NULL && !g_shutdown) {
pthread_cond_wait(&g_cv, &g_mu);
}
if (log != NULL) {
pthread_mutex_unlock(&g_mu);
write_log(log);
free(log);
pthread_mutex_lock(&g_mu);
}
if (g_shutdown) {
pthread_mutex_unlock(&g_mu);
return;
}
}
}
/* Now clear out the log */
g_count = 0;
static void flush_logs(gpr_timer_log_list *list) {
gpr_timer_log *log;
while ((log = timer_log_pop_front(list)) != NULL) {
write_log(log);
free(log);
}
}
static void finish_writing() {
pthread_mutex_lock(&g_mu);
g_shutdown = 1;
pthread_cond_signal(&g_cv);
pthread_mutex_unlock(&g_mu);
gpr_thd_join(g_writing_thread);
gpr_log(GPR_INFO, "flushing logs");
pthread_mutex_lock(&g_mu);
flush_logs(&g_done_logs);
flush_logs(&g_in_progress_logs);
pthread_mutex_unlock(&g_mu);
if (output_file) {
fclose(output_file);
}
}
void gpr_timers_set_log_filename(const char *filename) {
output_filename = filename;
}
static void init_output() {
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options);
atexit(finish_writing);
}
static void rotate_log() {
gpr_timer_log *new = malloc(sizeof(*new));
gpr_once_init(&g_once_init, init_output);
new->num_entries = 0;
pthread_mutex_lock(&g_mu);
if (g_thread_log != NULL) {
timer_log_remove(&g_in_progress_logs, g_thread_log);
if (timer_log_push_back(&g_done_logs, g_thread_log)) {
pthread_cond_signal(&g_cv);
}
} else {
g_thread_id = g_next_thread_id++;
}
timer_log_push_back(&g_in_progress_logs, new);
pthread_mutex_unlock(&g_mu);
g_thread_log = new;
}
static void gpr_timers_log_add(const char *tagstr, marker_type type,
int important, const char *file, int line) {
gpr_timer_entry *entry;
/* TODO (vpai) : Improve concurrency */
if (g_count == MAX_COUNT) {
log_report();
if (g_thread_log == NULL || g_thread_log->num_entries == MAX_COUNT) {
rotate_log();
}
entry = &g_log[g_count++];
entry = &g_thread_log->log[g_thread_log->num_entries++];
entry->tm = gpr_now(GPR_CLOCK_PRECISE);
entry->tagstr = tagstr;
@ -104,6 +240,7 @@ static void gpr_timers_log_add(const char *tagstr, marker_type type,
entry->file = file;
entry->line = (short)line;
entry->important = important != 0;
entry->thd = g_thread_id;
}
/* Latency profiler API implementation. */
@ -131,4 +268,6 @@ void gpr_timers_global_destroy(void) {}
void gpr_timers_global_init(void) {}
void gpr_timers_global_destroy(void) {}
void gpr_timers_set_log_filename(const char *filename) {}
#endif /* GRPC_BASIC_PROFILER */

@ -48,6 +48,8 @@ void gpr_timer_begin(const char *tagstr, int important, const char *file,
void gpr_timer_end(const char *tagstr, int important, const char *file,
int line);
void gpr_timers_set_log_filename(const char *filename);
#if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER))
/* No profiling. No-op all the things. */
#define GPR_TIMER_MARK(tag, important) \

@ -53,7 +53,7 @@ struct thd_arg {
/* Body of every thread started via gpr_thd_new. */
static void *thread_body(void *v) {
struct thd_arg a = *(struct thd_arg *)v;
gpr_free(v);
free(v);
(*a.body)(a.arg);
return NULL;
}
@ -63,7 +63,10 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
int thread_started;
pthread_attr_t attr;
pthread_t p;
struct thd_arg *a = gpr_malloc(sizeof(*a));
/* don't use gpr_malloc as we may cause an infinite recursion with
* the profiling code */
struct thd_arg *a = malloc(sizeof(*a));
GPR_ASSERT(a != NULL);
a->body = thd_body;
a->arg = arg;
@ -78,7 +81,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
if (!thread_started) {
gpr_free(a);
free(a);
}
*t = (gpr_thd_id)p;
return thread_started;

@ -320,6 +320,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}
GPR_TIMER_END("finalize_outbuf", 0);
}
void grpc_chttp2_cleanup_writing(

@ -41,6 +41,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/profiling/timers.h"
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"
@ -89,6 +90,7 @@ static void init_ping_pong_request(void) {
}
static void step_ping_pong_request(void) {
GPR_TIMER_BEGIN("ping_pong", 1);
call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
"/Reflector/reflectUnary", "localhost",
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
@ -99,6 +101,7 @@ static void step_ping_pong_request(void) {
grpc_call_destroy(call);
grpc_byte_buffer_destroy(response_payload_recv);
call = NULL;
GPR_TIMER_END("ping_pong", 1);
}
static void init_ping_pong_stream(void) {
@ -122,10 +125,12 @@ static void init_ping_pong_stream(void) {
static void step_ping_pong_stream(void) {
grpc_call_error error;
GPR_TIMER_BEGIN("ping_pong", 1);
error = grpc_call_start_batch(call, stream_step_ops, 2, (void *)1, NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
grpc_byte_buffer_destroy(response_payload_recv);
GPR_TIMER_END("ping_pong", 1);
}
static double now(void) {
@ -159,12 +164,14 @@ int main(int argc, char **argv) {
char *scenario_name = "ping-pong-request";
scenario sc = {NULL, NULL, NULL};
gpr_timers_set_log_filename("latency_trace.fling_client.txt");
grpc_init();
GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc_test_init(1, fake_argv);
grpc_init();
cl = gpr_cmdline_create("fling client");
gpr_cmdline_add_int(cl, "payload_size", "Size of the payload to send",
&payload_size);

@ -44,15 +44,16 @@
#include <unistd.h>
#endif
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"
#include <grpc/support/alloc.h>
#include <grpc/support/cmdline.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/profiling/timers.h"
#include "test/core/util/port.h"
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"
static grpc_completion_queue *cq;
static grpc_server *server;
@ -192,6 +193,8 @@ int main(int argc, char **argv) {
char *fake_argv[1];
gpr_timers_set_log_filename("latency_trace.fling_server.txt");
GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc_test_init(1, fake_argv);

@ -49,7 +49,7 @@ class ScopeBuilder(object):
self.call_stack_builder.lines.append(line_item)
def finish(self, line):
assert line['tag'] == self.top_line.tag, 'expected %s, got %s' % (self.top_line.tag, line['tag'])
assert line['tag'] == self.top_line.tag, 'expected %s, got %s; thread=%s; t0=%f t1=%f' % (self.top_line.tag, line['tag'], line['thd'], self.top_line.start_time, line['t'])
final_time_stamp = line['t']
assert self.top_line.end_time is None
self.top_line.end_time = final_time_stamp
@ -84,6 +84,7 @@ class CallStackBuilder(object):
self.stk.append(ScopeBuilder(self, line))
return False
elif line_type == '}':
assert self.stk, 'expected non-empty stack for closing %s; thread=%s; t=%f' % (line['tag'], line['thd'], line['t'])
self.stk.pop().finish(line)
if not self.stk:
self.finish()

Loading…
Cancel
Save