Always compress tags

pull/4750/head
Alistair Veitch 9 years ago
parent 0f690721bd
commit a237796233
  1. 218
      src/core/census/tag_set.c
  2. 13
      test/core/census/tag_set_test.c

@ -59,13 +59,12 @@
// to handle endian-ness conversions.
// * Keep all tag information (keys/values/flags) in a single memory buffer,
// that can be directly copied to the wire. This makes iteration by index
// somewhat less efficient.
// somewhat less efficient. If it becomes a problem, we could consider
// building an index at tag_set creation.
// * Binary tags are encoded seperately from non-binary tags. There are a
// primarily because non-binary tags are far more likely to be repeated
// across multiple RPC calls, so are more efficiently cached and
// compressed in any metadata schemes.
// * deleted/modified tags are kept in memory, just marked with a deleted
// flag. This enables faster processing TODO: benchmark this
// * all lengths etc. are restricted to one byte. This eliminates endian
// issues.
@ -74,7 +73,8 @@
struct tag_set {
int ntags; // number of tags.
int ntags_alloc; // ntags + number of deleted tags (total number of tags
// in all of kvm).
// in all of kvm). This will always be == ntags, except
// during the process of building a new tag set.
size_t kvm_size; // number of bytes allocated for key/value storage.
size_t kvm_used; // number of bytes of used key/value memory
char *kvm; // key/value memory. Consists of repeated entries of:
@ -138,28 +138,17 @@ static void tag_set_copy(struct tag_set *to, const struct tag_set *from) {
memcpy(to->kvm, from->kvm, to->kvm_used);
}
// We may want to keep information about a deleted tag for a short time,
// in case we can reuse the space (same tag is reinserted). This structure
// is used for that purpose.
struct deleted_tag_info {
struct raw_tag raw; // raw tag information.
uint8_t *raw_flags_p; // pointer to original flags
struct tag_set *tags; // the tag set from which tag was deleted.
};
// Delete a tag from a tag set, if it exists. Returns true if the tag was
// originally present (and is now deleted), false if it wasn't.
static bool tag_set_delete_tag(struct tag_set *tags,
struct deleted_tag_info *dtag, const char *key,
// Delete a tag from a tag set, if it exists (returns true it it did).
static bool tag_set_delete_tag(struct tag_set *tags, const char *key,
size_t key_len) {
char *kvp = tags->kvm;
for (int i = 0; i < tags->ntags_alloc; i++) {
dtag->raw_flags_p = (uint8_t *)(kvp + FLAG_OFFSET);
kvp = decode_tag(&dtag->raw, kvp, 0);
if (CENSUS_TAG_IS_DELETED(dtag->raw.flags)) continue;
if ((key_len == dtag->raw.key_len) &&
(memcmp(key, dtag->raw.key, key_len) == 0)) {
*(dtag->raw_flags_p) |= CENSUS_TAG_DELETED;
uint8_t *flags = (uint8_t *)(kvp + FLAG_OFFSET);
struct raw_tag tag;
kvp = decode_tag(&tag, kvp, 0);
if (CENSUS_TAG_IS_DELETED(tag.flags)) continue;
if ((key_len == tag.key_len) && (memcmp(key, tag.key, key_len) == 0)) {
*flags |= CENSUS_TAG_DELETED;
tags->ntags--;
return true;
}
@ -167,46 +156,34 @@ static bool tag_set_delete_tag(struct tag_set *tags,
return false;
}
// Delete a tag from a tag set. If the tag is found in any of the underlying
// tag sets, *and* that tag set corresponds to the one in which the tag (if
// later inserted) would be placed, then fills in dtag, and returns true.
// Returns false otherwise. This information is later used to optimize the
// placement of the tag if the value space can be reused, effectively
// "undeleting" the tag.
static bool cts_delete_tag(census_tag_set *tags, const census_tag *tag,
size_t key_len, struct deleted_tag_info *dtag) {
// Delete a tag from a tag set.
static void cts_delete_tag(census_tag_set *tags, const census_tag *tag,
size_t key_len) {
// use the to-be-deleted tag flags as a hint to determine the order in which
// we delete from the underlying tag sets.
if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
if (CENSUS_TAG_IS_BINARY(tag->flags)) {
if (tag_set_delete_tag(&tags->propagated_binary_tags, dtag, tag->key,
if (tag_set_delete_tag(&tags->propagated_binary_tags, tag->key,
key_len)) {
dtag->tags = &tags->propagated_binary_tags;
return true;
return;
}
if (tag_set_delete_tag(&tags->propagated_tags, dtag, tag->key, key_len))
return false;
tag_set_delete_tag(&tags->local_tags, dtag, tag->key, key_len);
if (tag_set_delete_tag(&tags->propagated_tags, tag->key, key_len)) return;
tag_set_delete_tag(&tags->local_tags, tag->key, key_len);
} else {
if (tag_set_delete_tag(&tags->propagated_tags, dtag, tag->key, key_len)) {
dtag->tags = &tags->propagated_tags;
return true;
if (tag_set_delete_tag(&tags->propagated_tags, tag->key, key_len)) {
return;
}
if (tag_set_delete_tag(&tags->propagated_binary_tags, dtag, tag->key,
key_len))
return false;
tag_set_delete_tag(&tags->local_tags, dtag, tag->key, key_len);
if (tag_set_delete_tag(&tags->propagated_binary_tags, tag->key, key_len))
return;
tag_set_delete_tag(&tags->local_tags, tag->key, key_len);
}
} else {
if (tag_set_delete_tag(&tags->local_tags, dtag, tag->key, key_len)) {
dtag->tags = &tags->local_tags;
return true;
if (tag_set_delete_tag(&tags->local_tags, tag->key, key_len)) {
return;
}
if (tag_set_delete_tag(&tags->propagated_tags, dtag, tag->key, key_len))
return false;
tag_set_delete_tag(&tags->propagated_binary_tags, dtag, tag->key, key_len);
if (tag_set_delete_tag(&tags->propagated_tags, tag->key, key_len)) return;
tag_set_delete_tag(&tags->propagated_binary_tags, tag->key, key_len);
}
return false;
}
// Add a tag to a tag set.
@ -218,6 +195,7 @@ static void tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
return;
}
if (tags->kvm_used + tag_size > tags->kvm_size) {
// allocate new memory if needed
tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE;
char *new_kvm = gpr_malloc(tags->kvm_size);
memcpy(new_kvm, tags->kvm, tags->kvm_used);
@ -242,28 +220,60 @@ static void tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
static void cts_add_tag(census_tag_set *tags, const census_tag *tag,
size_t key_len) {
// first delete the tag if it is already present
struct deleted_tag_info dtag;
bool deleted_match = cts_delete_tag(tags, tag, key_len, &dtag);
cts_delete_tag(tags, tag, key_len);
if (tag->value != NULL && tag->value_len != 0) {
if (deleted_match && tag->value_len == dtag.raw.value_len) {
// if we have a close match for tag being added to one just deleted,
// only need to modify value and flags.
memcpy(dtag.raw.value, tag->value, tag->value_len);
*dtag.raw_flags_p = (tag->flags & (CENSUS_TAG_PROPAGATE |
CENSUS_TAG_STATS | CENSUS_TAG_BINARY));
dtag.tags->ntags++;
} else {
if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
if (CENSUS_TAG_IS_BINARY(tag->flags)) {
tag_set_add_tag(&tags->propagated_binary_tags, tag, key_len);
} else {
tag_set_add_tag(&tags->propagated_tags, tag, key_len);
}
if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
if (CENSUS_TAG_IS_BINARY(tag->flags)) {
tag_set_add_tag(&tags->propagated_binary_tags, tag, key_len);
} else {
tag_set_add_tag(&tags->local_tags, tag, key_len);
tag_set_add_tag(&tags->propagated_tags, tag, key_len);
}
} else {
tag_set_add_tag(&tags->local_tags, tag, key_len);
}
}
}
// Remove any deleted tags from the tag set. Basic algorithm:
// 1) Walk through tag set to find first deleted tag. Record where it is.
// 2) Find the next not-deleted tag. Copy all of kvm from there to the end
// "over" the deleted tags
// 3) repeat #1 and #2 until we have seen all tags
// 4) if we are still looking for a not-deleted tag, then all the end portion
// of the kvm is deleted. Just reduce the used amount of memory by the
// appropriate amount.
static void tag_set_compress(struct tag_set *tags) {
if (tags->ntags == tags->ntags_alloc) return;
bool find_deleted = true; // are we looking for deleted tags?
char *kvp = tags->kvm;
char *dbase; // record location of deleted tag
for (int i = 0; i < tags->ntags_alloc; i++) {
struct raw_tag tag;
char *next_kvp = decode_tag(&tag, kvp, 0);
if (find_deleted) {
if (CENSUS_TAG_IS_DELETED(tag.flags)) {
dbase = kvp;
find_deleted = false;
}
} else {
if (!CENSUS_TAG_IS_DELETED(tag.flags)) {
ptrdiff_t reduce = kvp - dbase; // #bytes in deleted tags
GPR_ASSERT(reduce > 0);
ptrdiff_t copy_size = tags->kvm + tags->kvm_used - kvp;
GPR_ASSERT(copy_size > 0);
memmove(dbase, kvp, (size_t)copy_size);
tags->kvm_used -= (size_t)reduce;
next_kvp -= reduce;
find_deleted = true;
}
}
kvp = next_kvp;
}
if (!find_deleted) {
GPR_ASSERT(dbase > tags->kvm);
tags->kvm_used = (size_t)(dbase - tags->kvm);
}
tags->ntags_alloc = tags->ntags;
}
census_tag_set *census_tag_set_create(const census_tag_set *base,
@ -286,6 +296,9 @@ census_tag_set *census_tag_set_create(const census_tag_set *base,
cts_add_tag(new_ts, tag, key_len);
}
}
tag_set_compress(&new_ts->propagated_tags);
tag_set_compress(&new_ts->propagated_binary_tags);
tag_set_compress(&new_ts->local_tags);
return new_ts;
}
@ -307,21 +320,15 @@ static void tag_set_get_tag_by_index(const struct tag_set *tags, int index,
census_tag *tag) {
GPR_ASSERT(index < tags->ntags);
char *kvp = tags->kvm;
for (;;) {
struct raw_tag raw;
struct raw_tag raw;
kvp = decode_tag(&raw, kvp, 0);
for (int i = 0; i < index; i++) {
kvp = decode_tag(&raw, kvp, 0);
if (CENSUS_TAG_IS_DELETED(raw.flags)) {
continue;
} else if (index == 0) {
tag->key = raw.key;
tag->value = raw.value;
tag->value_len = raw.value_len;
tag->flags = raw.flags;
return;
}
index--;
}
// NOT REACHED
tag->key = raw.key;
tag->value = raw.value;
tag->value_len = raw.value_len;
tag->flags = raw.flags;
}
int census_tag_set_get_tag_by_index(const census_tag_set *tags, int index,
@ -350,9 +357,7 @@ static bool tag_set_get_tag_by_key(const struct tag_set *tags, const char *key,
char *kvp = tags->kvm;
for (int i = 0; i < tags->ntags; i++) {
struct raw_tag raw;
do {
kvp = decode_tag(&raw, kvp, 0);
} while (CENSUS_TAG_IS_DELETED(raw.flags));
kvp = decode_tag(&raw, kvp, 0);
if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) {
tag->key = raw.key;
tag->value = raw.value;
@ -391,42 +396,16 @@ int census_tag_set_get_tag_by_key(const census_tag_set *tags, const char *key,
// expansion.
// 2 1 number of bytes in each tag header.
// 3 1 ntags value from tag set.
// 4 1 ntags_alloc value from tag set.
//
// This is followed by the key/value memory from struct tag_set.
#define ENCODED_VERSION 0 // Version number
#define ENCODED_HEADER_SIZE 5 // size of tag set header
// Pack a tag set into as few bytes as possible (eliding deleted tags). Assumes
// header is already generated.
static size_t tag_set_encode_packed(const struct tag_set *tags, char *buffer,
size_t buf_size) {
size_t encoded_size = 0;
char *kvp = tags->kvm;
for (int i = 0; i < tags->ntags_alloc; i++) {
struct raw_tag raw;
char *base = kvp;
kvp = decode_tag(&raw, kvp, 0);
size_t tag_size =
TAG_HEADER_SIZE + (size_t)raw.key_len + (size_t)raw.value_len;
if (!(CENSUS_TAG_IS_DELETED(raw.flags))) {
if (tag_size > buf_size) {
return 0;
}
memcpy(buffer, base, tag_size);
buffer += tag_size;
encoded_size += tag_size;
buf_size -= tag_size;
}
}
return encoded_size;
}
#define ENCODED_HEADER_SIZE 4 // size of tag set header
// Encode a tag set. Returns 0 if buffer is too small.
static size_t tag_set_encode(const struct tag_set *tags, char *buffer,
size_t buf_size) {
if (buf_size < ENCODED_HEADER_SIZE) {
if (buf_size < ENCODED_HEADER_SIZE + tags->kvm_used) {
return 0;
}
buf_size -= ENCODED_HEADER_SIZE;
@ -435,18 +414,8 @@ static size_t tag_set_encode(const struct tag_set *tags, char *buffer,
*buffer++ = (char)TAG_HEADER_SIZE;
*buffer++ = (char)tags->ntags;
if (tags->ntags == 0) {
*buffer = (char)tags->ntags;
return ENCODED_HEADER_SIZE;
}
if (buf_size < tags->kvm_used || tags->ntags_alloc > CENSUS_MAX_TAGS) {
*buffer++ = (char)tags->ntags;
size_t enc_size = tag_set_encode_packed(tags, buffer, buf_size);
if (enc_size == 0) {
return 0;
}
return ENCODED_HEADER_SIZE + enc_size;
}
*buffer++ = (char)tags->ntags_alloc;
memcpy(buffer, tags->kvm, tags->kvm_used);
return ENCODED_HEADER_SIZE + tags->kvm_used;
}
@ -467,7 +436,7 @@ static void tag_set_decode(struct tag_set *tags, const char *buffer,
uint8_t version = (uint8_t)(*buffer++);
uint8_t header_size = (uint8_t)(*buffer++);
uint8_t tag_header_size = (uint8_t)(*buffer++);
tags->ntags = (int)(*buffer++);
tags->ntags = tags->ntags_alloc = (int)(*buffer++);
if (tags->ntags == 0) {
tags->ntags_alloc = 0;
tags->kvm_size = 0;
@ -475,7 +444,6 @@ static void tag_set_decode(struct tag_set *tags, const char *buffer,
tags->kvm = NULL;
return;
}
tags->ntags_alloc = (uint8_t)(*buffer++);
if (header_size != ENCODED_HEADER_SIZE) {
GPR_ASSERT(version != ENCODED_VERSION);
GPR_ASSERT(ENCODED_HEADER_SIZE < header_size);
@ -490,7 +458,7 @@ static void tag_set_decode(struct tag_set *tags, const char *buffer,
GPR_ASSERT(version != ENCODED_VERSION);
GPR_ASSERT(tag_header_size > TAG_HEADER_SIZE);
char *kvp = tags->kvm;
for (int i = 0; i < tags->ntags_alloc; i++) {
for (int i = 0; i < tags->ntags; i++) {
memcpy(kvp, buffer, TAG_HEADER_SIZE);
kvp += header_size;
struct raw_tag raw;

@ -367,19 +367,6 @@ static void complex_encode_decode_test(void) {
GPR_ASSERT(census_tag_set_ntags(cts3) == 2);
GPR_ASSERT(validate_tag(cts3, &basic_tags[4]));
GPR_ASSERT(validate_tag(cts3, &modify_tags[8]));
// Now force tag set to be in smaller space
census_tag_set_destroy(cts3);
size_t nb1 = census_tag_set_encode_propagated(cts2, buf1, b1 - 1);
GPR_ASSERT(nb1 != 0);
GPR_ASSERT(nb1 < b1);
size_t nb2 = census_tag_set_encode_propagated_binary(cts2, buf2, b2 - 1);
GPR_ASSERT(nb2 != 0);
GPR_ASSERT(nb2 < b2);
cts3 = census_tag_set_decode(buf1, nb1, buf2, nb2);
GPR_ASSERT(cts3 != NULL);
GPR_ASSERT(census_tag_set_ntags(cts3) == 2);
GPR_ASSERT(validate_tag(cts3, &basic_tags[4]));
GPR_ASSERT(validate_tag(cts3, &modify_tags[8]));
census_tag_set_destroy(cts3);
census_tag_set_destroy(cts2);
census_tag_set_destroy(cts);

Loading…
Cancel
Save