C implementation of Census trace store and stats store for grpc C lib.

Change on 2015/01/08 by hongyu <hongyu@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=83556470
pull/3/merge
hongyu 10 years ago committed by Nicolas Noble
parent 6edb547c99
commit 24200d3cbc
  1. 330
      Makefile
  2. 25
      build.json
  3. 2
      include/grpc/support/time.h
  4. 18
      src/core/channel/census_filter.c
  5. 17
      src/core/statistics/census_init.c
  6. 206
      src/core/statistics/census_rpc_stats.c
  7. 26
      src/core/statistics/census_rpc_stats.h
  8. 149
      src/core/statistics/census_tracing.c
  9. 59
      src/core/statistics/census_tracing.h
  10. 4
      src/core/support/time.c
  11. 1
      test/core/end2end/gen_build_json.py
  12. 176
      test/core/end2end/tests/census_simple_request.c
  13. 6
      test/core/statistics/census_stub_test.c
  14. 197
      test/core/statistics/rpc_stats_test.c
  15. 158
      test/core/statistics/trace_test.c
  16. 1
      vsprojects/vs2013/grpc.vcxproj
  17. 1
      vsprojects/vs2013/grpc_unsecure.vcxproj

File diff suppressed because one or more lines are too long

@ -141,6 +141,7 @@
"src/core/statistics/census_interface.h",
"src/core/statistics/census_log.h",
"src/core/statistics/census_rpc_stats.h",
"src/core/statistics/census_tracing.h",
"src/core/statistics/hash_table.h",
"src/core/statistics/window_stats.h",
"src/core/surface/call.h",
@ -847,6 +848,30 @@
"gpr"
]
},
{
"name": "census_trace_store_test",
"build": "executable",
"src": [
"test/core/statistics/trace_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr"
]
},
{
"name": "census_stats_store_test",
"build": "executable",
"src": [
"test/core/statistics/rpc_stats_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr"
]
},
{
"name": "census_window_stats_test",
"build": "test",

@ -107,6 +107,8 @@ struct timeval gpr_timeval_from_timespec(gpr_timespec t);
gpr_timespec gpr_timespec_from_timeval(struct timeval t);
double gpr_timespec_to_micros(gpr_timespec t);
#ifdef __cplusplus
}
#endif

@ -60,13 +60,11 @@ static void init_rpc_stats(census_rpc_stats* stats) {
stats->cnt = 1;
}
static double gpr_timespec_to_micros(gpr_timespec t) {
return t.tv_sec * GPR_US_PER_SEC + t.tv_nsec * 1e-3;
}
static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
channel_data* chand) {
if (op->data.metadata->key == chand->path_str) {
gpr_log(GPR_DEBUG,
(const char*)GPR_SLICE_START_PTR(op->data.metadata->value->slice));
census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
op->data.metadata->value->slice));
}
@ -78,7 +76,7 @@ static void client_call_op(grpc_call_element* elem,
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) && (calld->op_id.lower != 0));
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
switch (op->type) {
case GRPC_SEND_METADATA:
extract_and_annotate_method_tag(op, calld, chand);
@ -99,7 +97,7 @@ static void server_call_op(grpc_call_element* elem,
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) && (calld->op_id.lower != 0));
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
switch (op->type) {
case GRPC_RECV_METADATA:
extract_and_annotate_method_tag(op, calld, chand);
@ -171,7 +169,13 @@ static void init_channel_elem(grpc_channel_element* elem,
chand->path_str = grpc_mdstr_from_string(mdctx, ":path");
}
static void destroy_channel_elem(grpc_channel_element* elem) {}
static void destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = elem->channel_data;
GPR_ASSERT(chand != NULL);
if (chand->path_str != NULL) {
grpc_mdstr_unref(chand->path_str);
}
}
const grpc_channel_filter grpc_client_census_filter = {
client_call_op, channel_op,

@ -33,5 +33,18 @@
#include "src/core/statistics/census_interface.h"
void census_init() {}
void census_shutdown() {}
#include <grpc/support/log.h>
#include "src/core/statistics/census_rpc_stats.h"
#include "src/core/statistics/census_tracing.h"
void census_init() {
gpr_log(GPR_INFO, "Initialize census library.");
census_tracing_init();
census_stats_store_init();
}
void census_shutdown() {
gpr_log(GPR_INFO, "Shutdown census library.");
census_stats_store_shutdown();
census_tracing_shutdown();
}

@ -35,7 +35,85 @@
#include "src/core/statistics/census_interface.h"
#include "src/core/statistics/census_rpc_stats.h"
#include "src/core/statistics/hash_table.h"
#include "src/core/statistics/census_tracing.h"
#include "src/core/statistics/window_stats.h"
#include "src/core/support/murmur_hash.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string.h>
#include <grpc/support/sync.h>
#define NUM_INTERVALS 3
#define MINUTE_INTERVAL 0
#define HOUR_INTERVAL 1
#define TOTAL_INTERVAL 2
/* for easier typing */
typedef census_per_method_rpc_stats per_method_stats;
/* Ensure mu is only initialized once. */
static gpr_once g_stats_store_mu_init = GPR_ONCE_INIT;
/* Guards two stats stores. */
static gpr_mu g_mu;
static census_ht* g_client_stats_store = NULL;
static census_ht* g_server_stats_store = NULL;
static void init_mutex() { gpr_mu_init(&g_mu); }
static void init_mutex_once() {
gpr_once_init(&g_stats_store_mu_init, init_mutex);
}
static int cmp_str_keys(const void* k1, const void* k2) {
return strcmp((const char*)k1, (const char*)k2);
}
/* TODO(hongyu): replace it with cityhash64 */
static gpr_uint64 simple_hash(const void* k) {
size_t len = strlen(k);
gpr_uint64 higher = gpr_murmur_hash3((const char*)k, len / 2, 0);
return higher << 32 |
gpr_murmur_hash3((const char*)k + len / 2, len - len / 2, 0);
}
static void delete_stats(void* stats) {
census_window_stats_destroy((struct census_window_stats*)stats);
}
static void delete_key(void* key) { gpr_free(key); }
static const census_ht_option ht_opt = {
CENSUS_HT_POINTER /* key type */, 1999 /* n_of_buckets */,
simple_hash /* hash function */, cmp_str_keys /* key comparator */,
delete_stats /* data deleter */, delete_key /* key deleter */};
static void init_rpc_stats(void* stats) {
memset(stats, 0, sizeof(census_rpc_stats));
}
static void stat_add_proportion(double p, void* base, const void* addme) {
census_rpc_stats* b = (census_rpc_stats*)base;
census_rpc_stats* a = (census_rpc_stats*)addme;
b->cnt += p * a->cnt;
b->rpc_error_cnt += p * a->rpc_error_cnt;
b->app_error_cnt += p * a->app_error_cnt;
b->elapsed_time_ms += p * a->elapsed_time_ms;
b->api_request_bytes += p * a->api_request_bytes;
b->wire_request_bytes += p * a->wire_request_bytes;
b->api_response_bytes += p * a->api_response_bytes;
b->wire_response_bytes += p * a->wire_response_bytes;
}
static void stat_add(void* base, const void* addme) {
stat_add_proportion(1.0, base, addme);
}
static gpr_timespec min_hour_total_intervals[3] = {
{60, 0}, {3600, 0}, {36000000, 0}};
static const census_window_stats_stat_info window_stats_settings = {
sizeof(census_rpc_stats), init_rpc_stats, stat_add, stat_add_proportion};
census_rpc_stats* census_rpc_stats_create_empty() {
census_rpc_stats* ret =
@ -44,14 +122,132 @@ census_rpc_stats* census_rpc_stats_create_empty() {
return ret;
}
void census_aggregated_rpc_stats_destroy(census_aggregated_rpc_stats* data) {}
void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats* data) {
int i = 0;
for (i = 0; i < data->num_entries; i++) {
if (data->stats[i].method != NULL) {
gpr_free((void*)data->stats[i].method);
}
}
if (data->stats != NULL) {
gpr_free(data->stats);
}
data->num_entries = 0;
data->stats = NULL;
}
static void record_stats(census_ht* store, census_op_id op_id,
const census_rpc_stats* stats) {
gpr_mu_lock(&g_mu);
if (store != NULL) {
trace_obj* trace = NULL;
census_internal_lock_trace_store();
trace = census_get_trace_obj_locked(op_id);
if (trace != NULL) {
const char* method_name = census_get_trace_method_name(trace);
struct census_window_stats* window_stats = NULL;
census_ht_key key;
key.ptr = (void*)method_name;
window_stats = census_ht_find(store, key);
census_internal_unlock_trace_store();
if (window_stats == NULL) {
window_stats = census_window_stats_create(3, min_hour_total_intervals,
30, &window_stats_settings);
key.ptr = gpr_strdup(key.ptr);
census_ht_insert(store, key, (void*)window_stats);
}
census_window_stats_add(window_stats, gpr_now(), stats);
} else {
census_internal_unlock_trace_store();
}
}
gpr_mu_unlock(&g_mu);
}
void census_record_rpc_client_stats(census_op_id op_id,
const census_rpc_stats* stats) {}
const census_rpc_stats* stats) {
record_stats(g_client_stats_store, op_id, stats);
}
void census_record_rpc_server_stats(census_op_id op_id,
const census_rpc_stats* stats) {}
const census_rpc_stats* stats) {
record_stats(g_server_stats_store, op_id, stats);
}
void census_get_server_stats(census_aggregated_rpc_stats* data) {}
/* Get stats from input stats store */
static void get_stats(census_ht* store, census_aggregated_rpc_stats* data) {
GPR_ASSERT(data != NULL);
if (data->num_entries != 0) {
census_aggregated_rpc_stats_set_empty(data);
}
gpr_mu_lock(&g_mu);
if (store != NULL) {
size_t n;
int i, j;
gpr_timespec now = gpr_now();
census_ht_kv* kv = census_ht_get_all_elements(store, &n);
if (kv != NULL) {
data->num_entries = n;
data->stats = (per_method_stats*)gpr_malloc(sizeof(per_method_stats) * n);
for (i = 0; i < n; i++) {
census_window_stats_sums sums[NUM_INTERVALS];
for (j = 0; j < NUM_INTERVALS; j++) {
sums[j].statistic = (void*)census_rpc_stats_create_empty();
}
data->stats[i].method = gpr_strdup(kv[i].k.ptr);
census_window_stats_get_sums(kv[i].v, now, sums);
data->stats[i].minute_stats =
*(census_rpc_stats*)sums[MINUTE_INTERVAL].statistic;
data->stats[i].hour_stats =
*(census_rpc_stats*)sums[HOUR_INTERVAL].statistic;
data->stats[i].total_stats =
*(census_rpc_stats*)sums[TOTAL_INTERVAL].statistic;
for (j = 0; j < NUM_INTERVALS; j++) {
gpr_free(sums[j].statistic);
}
}
gpr_free(kv);
}
}
gpr_mu_unlock(&g_mu);
}
void census_get_client_stats(census_aggregated_rpc_stats* data) {
get_stats(g_client_stats_store, data);
}
void census_get_server_stats(census_aggregated_rpc_stats* data) {
get_stats(g_server_stats_store, data);
}
void census_stats_store_init() {
gpr_log(GPR_INFO, "Initialize census stats store.");
init_mutex_once();
gpr_mu_lock(&g_mu);
if (g_client_stats_store == NULL && g_server_stats_store == NULL) {
g_client_stats_store = census_ht_create(&ht_opt);
g_server_stats_store = census_ht_create(&ht_opt);
} else {
gpr_log(GPR_ERROR, "Census stats store already initialized.");
}
gpr_mu_unlock(&g_mu);
}
void census_get_client_stats(census_aggregated_rpc_stats* data) {}
void census_stats_store_shutdown() {
gpr_log(GPR_INFO, "Shutdown census stats store.");
init_mutex_once();
gpr_mu_lock(&g_mu);
if (g_client_stats_store != NULL) {
census_ht_destroy(g_client_stats_store);
g_client_stats_store = NULL;
} else {
gpr_log(GPR_ERROR, "Census server stats store not initialized.");
}
if (g_server_stats_store != NULL) {
census_ht_destroy(g_server_stats_store);
g_server_stats_store = NULL;
} else {
gpr_log(GPR_ERROR, "Census client stats store not initialized.");
}
gpr_mu_unlock(&g_mu);
}

@ -37,6 +37,10 @@
#include "src/core/statistics/census_interface.h"
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
extern "C" {
#endif
struct census_rpc_stats {
gpr_uint64 cnt;
gpr_uint64 rpc_error_cnt;
@ -51,19 +55,20 @@ struct census_rpc_stats {
/* Creates an empty rpc stats object on heap. */
census_rpc_stats* census_rpc_stats_create_empty();
typedef struct census_per_service_per_method_rpc_stats {
const char* service;
typedef struct census_per_method_rpc_stats {
const char* method;
census_rpc_stats data;
} census_per_service_per_method_rpc_stats;
census_rpc_stats minute_stats; /* cumulative stats in the past minute */
census_rpc_stats hour_stats; /* cumulative stats in the past hour */
census_rpc_stats total_stats; /* cumulative stats from last gc */
} census_per_method_rpc_stats;
typedef struct census_aggregated_rpc_stats {
int num_entries;
census_per_service_per_method_rpc_stats* stats;
census_per_method_rpc_stats* stats;
} census_aggregated_rpc_stats;
/* Deletes aggregated data. */
void census_aggregated_rpc_stats_destroy(census_aggregated_rpc_stats* data);
/* Initializes an aggregated rpc stats object to an empty state. */
void census_aggregated_rpc_stats_set_empty(census_aggregated_rpc_stats* data);
/* Records client side stats of a rpc. */
void census_record_rpc_client_stats(census_op_id op_id,
@ -86,4 +91,11 @@ void census_get_server_stats(census_aggregated_rpc_stats* data_map);
DO NOT CALL from outside of grpc code. */
void census_get_client_stats(census_aggregated_rpc_stats* data_map);
void census_stats_store_init();
void census_stats_store_shutdown();
#ifdef __cplusplus
}
#endif
#endif /* __GRPC_INTERNAL_STATISTICS_CENSUS_RPC_STATS_H__ */

@ -33,15 +33,154 @@
#include "src/core/statistics/census_interface.h"
#include <stdio.h>
#include <string.h>
#include "src/core/statistics/census_rpc_stats.h"
#include "src/core/statistics/hash_table.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
/* Struct for a trace annotation. */
typedef struct annotation {
gpr_uint64 ts; /* timestamp of the annotation */
char txt[CENSUS_MAX_ANNOTATION_LENGTH]; /* actual txt annotation */
struct annotation* next;
} annotation;
typedef struct trace_obj {
census_op_id id;
gpr_timespec ts;
census_rpc_stats rpc_stats;
char* method;
annotation* annotations;
} trace_obj;
static void trace_obj_destroy(trace_obj* obj) {
annotation* p = obj->annotations;
while (p != NULL) {
annotation* next = p->next;
gpr_free(p);
p = next;
}
gpr_free(obj->method);
gpr_free(obj);
}
static void delete_trace_obj(void* obj) { trace_obj_destroy((trace_obj*)obj); }
static const census_ht_option ht_opt = {
CENSUS_HT_UINT64 /* key type*/, 571 /* n_of_buckets */, NULL /* hash */,
NULL /* compare_keys */, delete_trace_obj /* delete data */,
NULL /* delete key */};
static gpr_once g_init_mutex_once = GPR_ONCE_INIT;
static gpr_mu g_mu; /* Guards following two static variables. */
static census_ht* g_trace_store = NULL;
static gpr_uint64 g_id = 0;
static census_ht_key op_id_as_key(census_op_id* id) {
return *(census_ht_key*)id;
}
static gpr_uint64 op_id_2_uint64(census_op_id* id) {
gpr_uint64 ret;
memcpy(&ret, id, sizeof(census_op_id));
return ret;
}
static void init_mutex() { gpr_mu_init(&g_mu); }
static void init_mutex_once() { gpr_once_init(&g_init_mutex_once, init_mutex); }
census_op_id census_tracing_start_op() {
census_op_id empty_op_id = {0, 0};
return empty_op_id;
gpr_mu_lock(&g_mu);
{
trace_obj* ret = (trace_obj*)gpr_malloc(sizeof(trace_obj));
memset(ret, 0, sizeof(trace_obj));
g_id++;
memcpy(&ret->id, &g_id, sizeof(census_op_id));
ret->rpc_stats.cnt = 1;
ret->ts = gpr_now();
census_ht_insert(g_trace_store, op_id_as_key(&ret->id), (void*)ret);
gpr_mu_unlock(&g_mu);
gpr_log(GPR_DEBUG, "Start tracing for id %lu\n", g_id);
return ret->id;
}
}
int census_add_method_tag(census_op_id op_id, const char* method_name) {
return 0;
int census_add_method_tag(census_op_id op_id, const char* method) {
int ret = 0;
trace_obj* trace = NULL;
gpr_mu_lock(&g_mu);
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace == NULL) {
ret = 1;
} else {
trace->method = gpr_strdup(method);
}
gpr_mu_unlock(&g_mu);
return ret;
}
void census_tracing_print(census_op_id op_id, const char* annotation) {}
void census_tracing_end_op(census_op_id op_id) {}
void census_tracing_end_op(census_op_id op_id) {
trace_obj* trace = NULL;
gpr_mu_lock(&g_mu);
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace != NULL) {
trace->rpc_stats.elapsed_time_ms =
gpr_timespec_to_micros(gpr_time_sub(gpr_now(), trace->ts));
gpr_log(GPR_DEBUG, "End tracing for id %lu, method %s, latency %f us\n",
op_id_2_uint64(&op_id), trace->method,
trace->rpc_stats.elapsed_time_ms);
census_ht_erase(g_trace_store, op_id_as_key(&op_id));
}
gpr_mu_unlock(&g_mu);
}
void census_tracing_init() {
gpr_log(GPR_INFO, "Initialize census trace store.");
init_mutex_once();
gpr_mu_lock(&g_mu);
if (g_trace_store == NULL) {
g_id = 1;
g_trace_store = census_ht_create(&ht_opt);
} else {
gpr_log(GPR_ERROR, "Census trace store already initialized.");
}
gpr_mu_unlock(&g_mu);
}
void census_tracing_shutdown() {
gpr_log(GPR_INFO, "Shutdown census trace store.");
gpr_mu_lock(&g_mu);
if (g_trace_store != NULL) {
census_ht_destroy(g_trace_store);
g_trace_store = NULL;
} else {
gpr_log(GPR_ERROR, "Census trace store is not initialized.");
}
gpr_mu_unlock(&g_mu);
}
void census_internal_lock_trace_store() { gpr_mu_lock(&g_mu); }
void census_internal_unlock_trace_store() { gpr_mu_unlock(&g_mu); }
trace_obj* census_get_trace_obj_locked(census_op_id op_id) {
if (g_trace_store == NULL) {
gpr_log(GPR_ERROR, "Census trace store is not initialized.");
return NULL;
}
return (trace_obj*)census_ht_find(g_trace_store, op_id_as_key(&op_id));
}
const char* census_get_trace_method_name(const trace_obj* trace) {
return (const char*)trace->method;
}

@ -0,0 +1,59 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_
#define __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_
/* Opaque structure for trace object */
typedef struct trace_obj trace_obj;
/* Initializes trace store. This function is thread safe. */
void census_tracing_init();
/* Shutsdown trace store. This function is thread safe. */
void census_tracing_shutdown();
/* Gets trace obj corresponding to the input op_id. Returns NULL if trace store
is not initialized or trace obj is not found. Requires trace store being
locked before calling this function. */
trace_obj* census_get_trace_obj_locked(census_op_id op_id);
/* The following two functions acquire and release the trace store global lock.
They are for census internal use only. */
void census_internal_lock_trace_store();
void census_internal_unlock_trace_store();
/* Gets method tag name associated with the input trace object. */
const char* census_get_trace_method_name(const trace_obj* trace);
#endif /* __GRPC_INTERNAL_STATISTICS_CENSUS_TRACING_H_ */

@ -264,3 +264,7 @@ gpr_int32 gpr_time_to_millis(gpr_timespec t) {
return t.tv_sec * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS;
}
}
double gpr_timespec_to_micros(gpr_timespec t) {
return t.tv_sec * GPR_US_PER_SEC + t.tv_nsec * 1e-3;
}

@ -21,6 +21,7 @@ END2END_TESTS = [
'cancel_after_invoke',
'cancel_before_invoke',
'cancel_in_a_vacuum',
'census_simple_request',
'disappearing_server',
'early_server_shutdown_finishes_inflight_calls',
'early_server_shutdown_finishes_tags',

@ -0,0 +1,176 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "test/core/end2end/end2end_tests.h"
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
static gpr_timespec n_seconds_time(int n) {
return gpr_time_add(gpr_now(), gpr_time_from_micros(GPR_US_PER_SEC * n));
}
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char *test_name,
grpc_channel_args *client_args,
grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_client(&f, client_args);
config.init_server(&f, server_args);
return f;
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_destroy(f->server);
f->server = NULL;
}
static void shutdown_client(grpc_end2end_test_fixture *f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = NULL;
}
static void drain_cq(grpc_completion_queue *cq) {
grpc_event *ev;
grpc_completion_type type;
do {
ev = grpc_completion_queue_next(cq, n_seconds_time(5));
GPR_ASSERT(ev);
type = ev->type;
grpc_event_finish(ev);
} while (type != GRPC_QUEUE_SHUTDOWN);
}
static void end_test(grpc_end2end_test_fixture *f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->server_cq);
drain_cq(f->server_cq);
grpc_completion_queue_destroy(f->server_cq);
grpc_completion_queue_shutdown(f->client_cq);
drain_cq(f->client_cq);
grpc_completion_queue_destroy(f->client_cq);
}
static void *tag(gpr_intptr t) { return (void *)t; }
static void test_body(grpc_end2end_test_fixture f) {
grpc_call *c;
grpc_call *s;
gpr_timespec deadline = n_seconds_time(10);
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
tag(1);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",
deadline, NULL);
cq_verify(v_server);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s, f.server_cq, tag(102), 0));
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write_status(
s, GRPC_STATUS_UNIMPLEMENTED, "xyz", tag(5)));
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_UNIMPLEMENTED,
"xyz", NULL);
cq_verify(v_client);
cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
cq_verify(v_server);
cq_expect_finished(v_server, tag(102), NULL);
cq_verify(v_server);
grpc_call_destroy(c);
grpc_call_destroy(s);
cq_verifier_destroy(v_client);
cq_verifier_destroy(v_server);
}
static void test_invoke_request_with_census(
grpc_end2end_test_config config, const char *name,
void (*body)(grpc_end2end_test_fixture f)) {
char fullname[64];
grpc_end2end_test_fixture f;
grpc_arg client_arg, server_arg;
grpc_channel_args client_args, server_args;
client_arg.type = GRPC_ARG_INTEGER;
client_arg.key = GRPC_ARG_ENABLE_CENSUS;
client_arg.value.integer = 1;
client_args.num_args = 1;
client_args.args = &client_arg;
server_arg.type = GRPC_ARG_INTEGER;
server_arg.key = GRPC_ARG_ENABLE_CENSUS;
server_arg.value.integer = 1;
server_args.num_args = 1;
server_args.args = &server_arg;
sprintf(fullname, "%s/%s", __FUNCTION__, name);
f = begin_test(config, fullname, &client_args, &server_args);
body(f);
end_test(&f);
config.tear_down_data(&f);
}
void grpc_end2end_tests(grpc_end2end_test_config config) {
test_invoke_request_with_census(config, "census_simple_request", test_body);
}

@ -44,7 +44,8 @@
void test_census_stubs() {
census_op_id op_id;
census_rpc_stats* stats = census_rpc_stats_create_empty();
census_aggregated_rpc_stats data_map;
census_aggregated_rpc_stats data_map = {0, NULL};
/* Initializes census library at server start up time. */
census_init();
/* Starts tracing at the beginning of a rpc. */
@ -62,8 +63,9 @@ void test_census_stubs() {
census_tracing_end_op(op_id);
/* In process stats queries. */
census_get_server_stats(&data_map);
census_aggregated_rpc_stats_set_empty(&data_map);
census_get_client_stats(&data_map);
census_aggregated_rpc_stats_destroy(&data_map);
census_aggregated_rpc_stats_set_empty(&data_map);
gpr_free(stats);
census_shutdown();
}

@ -0,0 +1,197 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <string.h>
#include "src/core/statistics/census_interface.h"
#include "src/core/statistics/census_rpc_stats.h"
#include "src/core/statistics/census_tracing.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
/* Ensure all possible state transitions are called without causing problem */
static void test_init_shutdown() {
census_stats_store_init();
census_stats_store_init();
census_stats_store_shutdown();
census_stats_store_shutdown();
census_stats_store_init();
}
static void test_create_and_destroy() {
census_rpc_stats* stats = NULL;
census_aggregated_rpc_stats agg_stats = {0, NULL};
stats = census_rpc_stats_create_empty();
GPR_ASSERT(stats != NULL);
GPR_ASSERT(stats->cnt == 0 && stats->rpc_error_cnt == 0 &&
stats->app_error_cnt == 0 && stats->elapsed_time_ms == 0.0 &&
stats->api_request_bytes == 0 && stats->wire_request_bytes == 0 &&
stats->api_response_bytes == 0 && stats->wire_response_bytes == 0);
gpr_free(stats);
census_aggregated_rpc_stats_set_empty(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
agg_stats.num_entries = 1;
agg_stats.stats = (census_per_method_rpc_stats*)gpr_malloc(
sizeof(census_per_method_rpc_stats));
agg_stats.stats[0].method = gpr_strdup("foo");
census_aggregated_rpc_stats_set_empty(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
}
#define ASSERT_NEAR(a, b) \
GPR_ASSERT((a - b) * (a - b) < 1e-24 * (a + b) * (a + b))
static void test_record_and_get_stats() {
census_rpc_stats stats = {1, 2, 3, 4, 5.1, 6.2, 7.3, 8.4};
census_op_id id;
census_aggregated_rpc_stats agg_stats = {0, NULL};
/* Record client stats twice with the same op_id. */
census_init();
id = census_tracing_start_op();
census_add_method_tag(id, "m1");
census_record_rpc_client_stats(id, &stats);
census_record_rpc_client_stats(id, &stats);
census_tracing_end_op(id);
/* Server stats expect to be empty */
census_get_server_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
/* Client stats expect to have one entry */
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 1);
GPR_ASSERT(agg_stats.stats != NULL);
GPR_ASSERT(strcmp(agg_stats.stats[0].method, "m1") == 0);
GPR_ASSERT(agg_stats.stats[0].minute_stats.cnt == 2 &&
agg_stats.stats[0].hour_stats.cnt == 2 &&
agg_stats.stats[0].total_stats.cnt == 2);
ASSERT_NEAR(agg_stats.stats[0].minute_stats.wire_response_bytes, 16.8);
ASSERT_NEAR(agg_stats.stats[0].hour_stats.wire_response_bytes, 16.8);
ASSERT_NEAR(agg_stats.stats[0].total_stats.wire_response_bytes, 16.8);
/* Get stats again, results should be the same. */
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 1);
census_aggregated_rpc_stats_set_empty(&agg_stats);
census_shutdown();
/* Record both server (once) and client (twice) stats with different op_ids.*/
census_init();
id = census_tracing_start_op();
census_add_method_tag(id, "m2");
census_record_rpc_client_stats(id, &stats);
census_tracing_end_op(id);
id = census_tracing_start_op();
census_add_method_tag(id, "m3");
census_record_rpc_server_stats(id, &stats);
census_tracing_end_op(id);
id = census_tracing_start_op();
census_add_method_tag(id, "m4");
census_record_rpc_client_stats(id, &stats);
census_tracing_end_op(id);
/* Check server stats */
census_get_server_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 1);
GPR_ASSERT(strcmp(agg_stats.stats[0].method, "m3") == 0);
GPR_ASSERT(agg_stats.stats[0].minute_stats.app_error_cnt == 3 &&
agg_stats.stats[0].hour_stats.app_error_cnt == 3 &&
agg_stats.stats[0].total_stats.app_error_cnt == 3);
census_aggregated_rpc_stats_set_empty(&agg_stats);
/* Check client stats */
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 2);
GPR_ASSERT(agg_stats.stats != NULL);
GPR_ASSERT((strcmp(agg_stats.stats[0].method, "m2") == 0 &&
strcmp(agg_stats.stats[1].method, "m4") == 0) ||
(strcmp(agg_stats.stats[0].method, "m4") == 0 &&
strcmp(agg_stats.stats[1].method, "m2") == 0));
GPR_ASSERT(agg_stats.stats[0].minute_stats.cnt == 1 &&
agg_stats.stats[1].minute_stats.cnt == 1);
census_aggregated_rpc_stats_set_empty(&agg_stats);
census_shutdown();
}
static void test_record_stats_on_unknown_op_id() {
census_op_id unknown_id = {0xDEAD, 0xBEEF};
census_rpc_stats stats = {1, 2, 3, 4, 5.1, 6.2, 7.3, 8.4};
census_aggregated_rpc_stats agg_stats = {0, NULL};
census_init();
/* Tests that recording stats against unknown id is noop. */
census_record_rpc_client_stats(unknown_id, &stats);
census_record_rpc_server_stats(unknown_id, &stats);
census_get_server_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
census_aggregated_rpc_stats_set_empty(&agg_stats);
census_shutdown();
}
/* Test that record stats is noop when trace store is uninitialized. */
static void test_record_stats_with_trace_store_uninitialized() {
census_rpc_stats stats = {1, 2, 3, 4, 5.1, 6.2, 7.3, 8.4};
census_op_id id = {0, 0};
census_aggregated_rpc_stats agg_stats = {0, NULL};
census_init();
id = census_tracing_start_op();
census_add_method_tag(id, "m");
census_tracing_end_op(id);
/* shuts down trace store only. */
census_tracing_shutdown();
census_record_rpc_client_stats(id, &stats);
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
census_stats_store_shutdown();
}
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
test_init_shutdown();
test_create_and_destroy();
test_record_and_get_stats();
test_record_stats_on_unknown_op_id();
test_record_stats_with_trace_store_uninitialized();
return 0;
}

@ -0,0 +1,158 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <string.h>
#include "src/core/statistics/census_interface.h"
#include "src/core/statistics/census_tracing.h"
#include "src/core/statistics/census_tracing.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
/* Ensure all possible state transitions are called without causing problem */
static void test_init_shutdown() {
census_tracing_init();
census_tracing_init();
census_tracing_shutdown();
census_tracing_shutdown();
census_tracing_init();
}
static void test_start_op_generates_locally_unique_ids() {
/* Check that ids generated within window size of 1000 are unique.
TODO(hongyu): Replace O(n^2) duplicate detection algorithm with O(nlogn)
algorithm. Enhance the test to larger window size (>10^6) */
#define WINDOW_SIZE 1000
census_op_id ids[WINDOW_SIZE];
int i;
census_init();
for (i = 0; i < WINDOW_SIZE; i++) {
ids[i] = census_tracing_start_op();
census_tracing_end_op(ids[i]);
}
for (i = 0; i < WINDOW_SIZE - 1; i++) {
int j;
for (j = i + 1; j < WINDOW_SIZE; j++) {
GPR_ASSERT(ids[i].upper != ids[j].upper || ids[i].lower != ids[j].lower);
}
}
#undef WINDOW_SIZE
census_shutdown();
}
static void test_get_trace_method_name() {
census_op_id id;
const char write_name[] = "service/method";
census_tracing_init();
id = census_tracing_start_op();
census_add_method_tag(id, write_name);
census_internal_lock_trace_store();
{
const char* read_name =
census_get_trace_method_name(census_get_trace_obj_locked(id));
GPR_ASSERT(strcmp(read_name, write_name) == 0);
}
census_internal_unlock_trace_store();
census_tracing_shutdown();
}
typedef struct thd_arg {
int num_done;
gpr_cv done;
gpr_mu mu;
} thd_arg;
static void mimic_trace_op_sequences(void* arg) {
census_op_id id;
char method_name[200];
int i = 0;
const int num_iter = 200;
thd_arg* args = (thd_arg*)arg;
GPR_ASSERT(args != NULL);
gpr_log(GPR_INFO, "Start trace op sequence thread.");
for (i = 0; i < num_iter; i++) {
id = census_tracing_start_op();
census_add_method_tag(id, method_name);
/* pretend doing 1us work. */
gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_micros(1)));
census_tracing_end_op(id);
}
gpr_log(GPR_INFO, "End trace op sequence thread.");
gpr_mu_lock(&args->mu);
args->num_done += 1;
gpr_cv_broadcast(&args->done);
gpr_mu_unlock(&args->mu);
}
static void test_concurrency() {
#define NUM_THREADS 1000
gpr_thd_id tid[NUM_THREADS];
int i = 0;
thd_arg arg;
arg.num_done = 0;
gpr_mu_init(&arg.mu);
gpr_cv_init(&arg.done);
census_tracing_init();
for (i = 0; i < NUM_THREADS; ++i) {
gpr_thd_new(tid + i, mimic_trace_op_sequences, &arg, NULL);
}
while (arg.num_done < NUM_THREADS) {
gpr_log(GPR_INFO, "num done %d", arg.num_done);
gpr_cv_wait(&arg.done, &arg.mu, gpr_inf_future);
}
census_tracing_shutdown();
#undef NUM_THREADS
}
static void test_add_method_tag_to_unknown_op_id() {
census_op_id unknown_id = {0xDEAD, 0xBEEF};
int ret = 0;
census_tracing_init();
ret = census_add_method_tag(unknown_id, "foo");
GPR_ASSERT(ret != 0);
census_tracing_shutdown();
}
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
test_init_shutdown();
test_start_op_generates_locally_unique_ids();
test_get_trace_method_name();
test_concurrency();
test_add_method_tag_to_unknown_op_id();
return 0;
}

@ -134,6 +134,7 @@
<ClInclude Include="..\..\src\core\statistics\census_interface.h" />
<ClInclude Include="..\..\src\core\statistics\census_log.h" />
<ClInclude Include="..\..\src\core\statistics\census_rpc_stats.h" />
<ClInclude Include="..\..\src\core\statistics\census_tracing.h" />
<ClInclude Include="..\..\src\core\statistics\hash_table.h" />
<ClInclude Include="..\..\src\core\statistics\window_stats.h" />
<ClInclude Include="..\..\src\core\surface\call.h" />

@ -134,6 +134,7 @@
<ClInclude Include="..\..\src\core\statistics\census_interface.h" />
<ClInclude Include="..\..\src\core\statistics\census_log.h" />
<ClInclude Include="..\..\src\core\statistics\census_rpc_stats.h" />
<ClInclude Include="..\..\src\core\statistics\census_tracing.h" />
<ClInclude Include="..\..\src\core\statistics\hash_table.h" />
<ClInclude Include="..\..\src\core\statistics\window_stats.h" />
<ClInclude Include="..\..\src\core\surface\call.h" />

Loading…
Cancel
Save