Fix a bug in base-log, and add test.

The tests compute usable space in the log, but do so using subtraction on unsigned values and did not correctly account for number of blocks used per-core. This could lead underflow, and an incorrect space calculation.

In addition, testing showed that with current defaults, tests would fail on a system with > ~64 cores. This can be alleviated through use of a larger log. An explicit
check in the log initialization has been added for this case, and the log size increased if necessary.
	Change on 2014/12/15 by aveitch <aveitch@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82154432
pull/1/merge
aveitch 10 years ago committed by Michael Lumish
parent 5e04b1376d
commit 482a5be0b9
  1. 44
      Makefile
  2. 12
      build.json
  3. 75
      src/core/statistics/census_log.c
  4. 10
      src/core/statistics/census_log.h
  5. 31
      test/core/statistics/census_log_tests.c
  6. 3
      test/core/statistics/census_log_tests.h
  7. 46
      test/core/statistics/small_log_test.c

File diff suppressed because one or more lines are too long

@ -821,6 +821,18 @@
"gpr"
]
},
{
"name": "census_statistics_small_log_test",
"build": "test",
"src": [
"test/core/statistics/small_log_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr"
]
},
{
"name": "census_statistics_performance_test",
"build": "test",

@ -97,6 +97,7 @@
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
/* End of platform specific code */
@ -118,18 +119,18 @@ typedef struct census_log_block {
gpr_int32 bytes_read;
/* Links for list */
cl_block_list_struct link;
/* We want this structure to be cacheline aligned. We assume the following
sizes for the various parts on 32/64bit systems:
type 32b size 64b size
char* 4 8
3x gpr_atm 12 24
gpr_int32 4 8 (assumes padding)
cl_block_list_struct 12 24
TOTAL 32 64
Depending on the size of our cacheline and the architecture, we
selectively add char buffering to this structure. The size is checked
via assert in census_log_initialize(). */
/* We want this structure to be cacheline aligned. We assume the following
sizes for the various parts on 32/64bit systems:
type 32b size 64b size
char* 4 8
3x gpr_atm 12 24
gpr_int32 4 8 (assumes padding)
cl_block_list_struct 12 24
TOTAL 32 64
Depending on the size of our cacheline and the architecture, we
selectively add char buffering to this structure. The size is checked
via assert in census_log_initialize(). */
#if defined(GPR_ARCH_64)
#define CL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 64)
#else
@ -146,15 +147,15 @@ typedef struct census_log_block {
/* A list of cl_blocks, doubly-linked through cl_block::link. */
typedef struct census_log_block_list {
gpr_int32 count; /* Number of items in list. */
cl_block_list_struct ht; /* head/tail of linked list. */
gpr_int32 count; /* Number of items in list. */
cl_block_list_struct ht; /* head/tail of linked list. */
} cl_block_list;
/* Cacheline aligned block pointers to avoid false sharing. Block pointer must
be initialized via set_block(), before calling other functions */
typedef struct census_log_core_local_block {
gpr_atm block;
/* Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8 */
/* Ensure cachline alignment: we assume sizeof(gpr_atm) == 4 or 8 */
#if defined(GPR_ARCH_64)
#define CL_CORE_LOCAL_BLOCK_PAD_SIZE (GPR_CACHELINE_SIZE - 8)
#else
@ -175,10 +176,10 @@ struct census_log {
int num_cores;
/* number of CENSUS_LOG_2_MAX_RECORD_SIZE blocks in log */
gpr_int32 num_blocks;
cl_block* blocks; /* Block metadata. */
cl_core_local_block* core_local_blocks; /* Keeps core to block mappings. */
cl_block* blocks; /* Block metadata. */
cl_core_local_block* core_local_blocks; /* Keeps core to block mappings. */
gpr_mu lock;
int initialized; /* has log been initialized? */
int initialized; /* has log been initialized? */
/* Keeps the state of the reader iterator. A value of 0 indicates that
iterator has reached the end. census_log_init_reader() resets the
value to num_core to restart iteration. */
@ -200,14 +201,9 @@ static struct census_log g_log;
/* Functions that operate on an atomic memory location used as a lock */
/* Returns non-zero if lock is acquired */
static int cl_try_lock(gpr_atm* lock) {
return gpr_atm_acq_cas(lock, 0, 1);
}
static void cl_unlock(gpr_atm* lock) {
gpr_atm_rel_store(lock, 0);
}
static int cl_try_lock(gpr_atm* lock) { return gpr_atm_acq_cas(lock, 0, 1); }
static void cl_unlock(gpr_atm* lock) { gpr_atm_rel_store(lock, 0); }
/* Functions that operate on cl_core_local_block's */
@ -220,7 +216,6 @@ static cl_block* cl_core_local_block_get_block(cl_core_local_block* clb) {
return (cl_block*)gpr_atm_acq_load(&clb->block);
}
/* Functions that operate on cl_block_list_struct's */
static void cl_block_list_struct_initialize(cl_block_list_struct* bls,
@ -229,7 +224,6 @@ static void cl_block_list_struct_initialize(cl_block_list_struct* bls,
bls->block = block;
}
/* Functions that operate on cl_block_list's */
static void cl_block_list_initialize(cl_block_list* list) {
@ -243,8 +237,7 @@ static cl_block* cl_block_list_head(cl_block_list* list) {
}
/* Insert element *e after *pos. */
static void cl_block_list_insert(cl_block_list* list,
cl_block_list_struct* pos,
static void cl_block_list_insert(cl_block_list* list, cl_block_list_struct* pos,
cl_block_list_struct* e) {
list->count++;
e->next = pos->next;
@ -254,14 +247,12 @@ static void cl_block_list_insert(cl_block_list* list,
}
/* Insert block at the head of the list */
static void cl_block_list_insert_at_head(cl_block_list* list,
cl_block* block) {
static void cl_block_list_insert_at_head(cl_block_list* list, cl_block* block) {
cl_block_list_insert(list, &list->ht, &block->link);
}
/* Insert block at the tail of the list */
static void cl_block_list_insert_at_tail(cl_block_list* list,
cl_block* block) {
static void cl_block_list_insert_at_tail(cl_block_list* list, cl_block* block) {
cl_block_list_insert(list, list->ht.prev, &block->link);
}
@ -272,7 +263,6 @@ static void cl_block_list_remove(cl_block_list* list, cl_block* b) {
b->link.prev->next = b->link.next;
}
/* Functions that operate on cl_block's */
static void cl_block_initialize(cl_block* block, char* buffer) {
@ -374,7 +364,6 @@ static void cl_block_end_read(cl_block* block) {
cl_unlock(&block->reader_lock);
}
/* Internal functions operating on g_log */
/* Allocates a new free block (or recycles an available dirty block if log is
@ -472,17 +461,15 @@ static cl_block* cl_next_block_to_read(cl_block* prev) {
/* External functions: primary stats_log interface */
void census_log_initialize(size_t size_in_mb, int discard_old_records) {
gpr_int32 ix;
/* check cacheline alignment */
/* Check cacheline alignment. */
GPR_ASSERT(sizeof(cl_block) % GPR_CACHELINE_SIZE == 0);
GPR_ASSERT(sizeof(cl_core_local_block) % GPR_CACHELINE_SIZE == 0);
GPR_ASSERT(!g_log.initialized);
g_log.discard_old_records = discard_old_records;
g_log.num_cores = gpr_cpu_num_cores();
if (size_in_mb < 1 || size_in_mb > 1000) {
gpr_log(GPR_ERROR, "Invalid size for stats_log: using 1MB default");
size_in_mb = 1;
}
g_log.num_blocks = (size_in_mb << 20) >> CENSUS_LOG_2_MAX_RECORD_SIZE;
/* Ensure at least as many blocks as there are cores. */
g_log.num_blocks = GPR_MAX(
g_log.num_cores, (size_in_mb << 20) >> CENSUS_LOG_2_MAX_RECORD_SIZE);
gpr_mu_init(&g_log.lock);
g_log.read_iterator_state = 0;
g_log.block_being_read = NULL;
@ -501,7 +488,7 @@ void census_log_initialize(size_t size_in_mb, int discard_old_records) {
for (ix = 0; ix < g_log.num_blocks; ++ix) {
cl_block* block = g_log.blocks + ix;
cl_block_initialize(block,
g_log.buffer + (CENSUS_LOG_MAX_RECORD_SIZE * ix));
g_log.buffer + (CENSUS_LOG_MAX_RECORD_SIZE * ix));
cl_block_try_disable_access(block, 1 /* discard data */);
cl_block_list_insert_at_tail(&g_log.free_block_list, block);
}
@ -585,8 +572,8 @@ const void* census_log_read_next(size_t* bytes_available) {
do {
g_log.block_being_read = cl_next_block_to_read(g_log.block_being_read);
if (g_log.block_being_read != NULL) {
void* record = cl_block_start_read(g_log.block_being_read,
bytes_available);
void* record =
cl_block_start_read(g_log.block_being_read, bytes_available);
if (record != NULL) {
gpr_mu_unlock(&g_log.lock);
return record;

@ -40,9 +40,11 @@
#define CENSUS_LOG_2_MAX_RECORD_SIZE 14 /* 2^14 = 16KB */
#define CENSUS_LOG_MAX_RECORD_SIZE (1 << CENSUS_LOG_2_MAX_RECORD_SIZE)
/* Initialize the statistics logging subsystem with the given log size. If
discard_old_records is non-zero, then new records will displace older
ones when the log is full. This function must be called before any other
/* Initialize the statistics logging subsystem with the given log size. A log
size of 0 will result in the smallest possible log for the platform
(approximately CENSUS_LOG_MAX_RECORD_SIZE * gpr_cpu_num_cores()). If
discard_old_records is non-zero, then new records will displace older ones
when the log is full. This function must be called before any other
census_log functions.
*/
void census_log_initialize(size_t size_in_mb, int discard_old_records);
@ -86,4 +88,4 @@ size_t census_log_remaining_space();
out-of-space. */
int census_log_out_of_space_count();
#endif /* __GRPC_INTERNAL_STATISTICS_LOG_H__ */
#endif /* __GRPC_INTERNAL_STATISTICS_LOG_H__ */

@ -41,6 +41,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
/* Fills in 'record' of size 'size'. Each byte in record is filled in with the
same value. The value is extracted from 'record' pointer. */
@ -120,12 +121,18 @@ static void assert_log_empty() {
}
/* Given log size and record size, computes the minimum usable space. */
static size_t min_usable_space(size_t log_size, size_t record_size) {
static gpr_int32 min_usable_space(size_t log_size, size_t record_size) {
gpr_int32 usable_space;
gpr_int32 num_blocks = log_size / CENSUS_LOG_MAX_RECORD_SIZE;
gpr_int32 waste_per_block = CENSUS_LOG_MAX_RECORD_SIZE % record_size;
/* In the worst case, all except one core-local block is empty. */
return (log_size - ((gpr_cpu_num_cores() - 1) * CENSUS_LOG_MAX_RECORD_SIZE) -
((num_blocks - gpr_cpu_num_cores() - 1) * waste_per_block));
/* In the worst case, all except one core-local block is full. */
gpr_int32 num_full_blocks = GPR_MAX(gpr_cpu_num_cores() - 2, 2);
GPR_ASSERT(num_blocks >= num_full_blocks);
usable_space = (gpr_int32)log_size -
(num_full_blocks * CENSUS_LOG_MAX_RECORD_SIZE) -
((num_blocks - num_full_blocks) * waste_per_block);
GPR_ASSERT(usable_space > 0);
return usable_space;
}
/* Fills the log and verifies data. If 'no fragmentation' is true, records
@ -152,7 +159,6 @@ static void fill_log(size_t log_size, int no_fragmentation, int circular_log) {
records_written = write_records_to_log(
0 /* writer id */, size, (log_size / size) * 2, 0 /* spin count */);
usable_space = min_usable_space(log_size, size);
GPR_ASSERT(usable_space > 0);
GPR_ASSERT(records_written * size >= usable_space);
records_read = perform_read_iteration(size);
if (!circular_log) {
@ -538,6 +544,21 @@ void test_multiple_writers() {
census_log_shutdown();
}
/* Repeat the straddling records and multiple writers tests with a small log. */
void test_small_log() {
size_t log_size;
const int circular = 0;
printf("Starting test: small log\n");
census_log_initialize(0, circular);
log_size = census_log_remaining_space();
GPR_ASSERT(log_size > 0);
fill_log(log_size, 0, circular);
census_log_shutdown();
census_log_initialize(0, circular);
multiple_writers_single_reader(circular);
census_log_shutdown();
}
void test_performance() {
int write_size = 1;
for (; write_size < CENSUS_LOG_MAX_RECORD_SIZE; write_size *= 2) {

@ -46,5 +46,6 @@ void test_fill_circular_log_with_straddling_records();
void test_multiple_writers_circular_log();
void test_multiple_writers();
void test_performance();
void test_small_log();
#endif /* __GRPC_TEST_STATISTICS_LOG_TESTS_H__ */
#endif /* __GRPC_TEST_STATISTICS_LOG_TESTS_H__ */

@ -0,0 +1,46 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "test/core/statistics/census_log_tests.h"
#include <stdlib.h>
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
srand(gpr_now().tv_nsec);
test_small_log();
return 0;
}
Loading…
Cancel
Save