simplify base resource definition; misc comment updates

pull/6774/head
Alistair Veitch 9 years ago
parent 4eb67b47a0
commit 0cf4defa42
  1. 119
      src/core/ext/census/base_resources.c
  2. 4
      src/core/ext/census/base_resources.h
  3. 142
      src/core/ext/census/resource.c
  4. 29
      src/core/ext/census/resource.h

@ -30,15 +30,15 @@
*
*/
#include "base_resources.h"
#include <stdio.h>
#include <string.h>
#include <grpc/census.h>
#include <grpc/support/log.h>
#include "base_resources.h"
#include "gen/census.pb.h"
#include "third_party/nanopb/pb_encode.h"
#include "resource.h"
// Add base RPC resource definitions for use by RPC runtime.
//
@ -48,101 +48,24 @@
// file, which is compiled to .pb format and read by still-to-be-written
// configuration functions.
// Structure representing a MeasurementUnit proto.
typedef struct {
int32_t prefix;
int n_numerators;
const google_census_Resource_BasicUnit *numerators;
int n_denominators;
const google_census_Resource_BasicUnit *denominators;
} measurement_unit;
// Encode a nanopb string. Expects the string argument to be passed in as `arg`.
static bool encode_string(pb_ostream_t *stream, const pb_field_t *field,
void *const *arg) {
if (!pb_encode_tag_for_field(stream, field)) {
return false;
}
return pb_encode_string(stream, (uint8_t *)*arg, strlen((const char *)*arg));
}
// Encode the numerators part of a measurement_unit (passed in as `arg`).
static bool encode_numerators(pb_ostream_t *stream, const pb_field_t *field,
void *const *arg) {
const measurement_unit *mu = (const measurement_unit *)*arg;
for (int i = 0; i < mu->n_numerators; i++) {
if (!pb_encode_tag_for_field(stream, field)) {
return false;
}
if (!pb_encode_varint(stream, mu->numerators[i])) {
return false;
}
}
return true;
}
// Encode the denominators part of a measurement_unit (passed in as `arg`).
static bool encode_denominators(pb_ostream_t *stream, const pb_field_t *field,
void *const *arg) {
const measurement_unit *mu = (const measurement_unit *)*arg;
for (int i = 0; i < mu->n_denominators; i++) {
if (!pb_encode_tag_for_field(stream, field)) {
return false;
}
if (!pb_encode_varint(stream, mu->numerators[i])) {
return false;
}
}
return true;
}
// Define a Resource, given the important details. Encodes a protobuf, which
// is then passed to census_define_resource.
static void define_resource(const char *name, const char *description,
const measurement_unit *unit) {
// nanopb generated type for Resource. Initialize encoding functions to NULL
// since we can't directly initialize them due to embedded union in struct.
google_census_Resource resource = {
{{NULL}, (void *)name},
{{NULL}, (void *)description},
true, // has_unit
{true, unit->prefix, {{NULL}, (void *)unit}, {{NULL}, (void *)unit}}};
resource.name.funcs.encode = &encode_string;
resource.description.funcs.encode = &encode_string;
resource.unit.numerator.funcs.encode = &encode_numerators;
resource.unit.denominator.funcs.encode = &encode_denominators;
// Buffer for storing encoded proto.
uint8_t buffer[512];
pb_ostream_t stream = pb_ostream_from_buffer(buffer, 512);
if (!pb_encode(&stream, google_census_Resource_fields, &resource)) {
gpr_log(GPR_ERROR, "Error encoding resource %s.", name);
return;
}
int32_t mid = census_define_resource(buffer, stream.bytes_written);
if (mid < 0) {
gpr_log(GPR_ERROR, "Error defining resource %s.", name);
}
}
// Define a resource for client RPC latency.
static void define_client_rpc_latency_resource() {
google_census_Resource_BasicUnit numerator =
google_census_Resource_BasicUnit_SECS;
measurement_unit unit = {0, 1, &numerator, 0, NULL};
define_resource("client_rpc_latency", "Client RPC latency in seconds", &unit);
}
// Define a resource for server RPC latency.
static void define_server_rpc_latency_resource() {
google_census_Resource_BasicUnit numerator =
google_census_Resource_BasicUnit_SECS;
measurement_unit unit = {0, 1, &numerator, 0, NULL};
define_resource("server_rpc_latency", "Server RPC latency in seconds", &unit);
}
// Define all base resources. This should be called by census initialization.
void define_base_resources() {
define_client_rpc_latency_resource();
define_server_rpc_latency_resource();
google_census_Resource_BasicUnit numerator =
google_census_Resource_BasicUnit_SECS;
resource r = {"client_rpc_latency", // name
"Client RPC latency in seconds", // description
0, // prefix
1, // n_numerators
&numerator, // numerators
0, // n_denominators
NULL}; // denominators
define_resource(&r);
r = (resource){"server_rpc_latency", // name
"Server RPC latency in seconds", // description
0, // prefix
1, // n_numerators
&numerator, // numerators
0, // n_denominators
NULL}; // denominators
define_resource(&r);
}

@ -33,7 +33,7 @@
#ifndef GRPC_CORE_EXT_CENSUS_BASE_RESOURCES_H
#define GRPC_CORE_EXT_CENSUS_BASE_RESOURCES_H
// Define all base resources. This should be called by census initialization.
/* Define all base resources. This should be called by census initialization. */
void define_base_resources();
#endif // GRPC_CORE_EXT_CENSUS_BASE_RESOURCES_H
#endif /* GRPC_CORE_EXT_CENSUS_BASE_RESOURCES_H */

@ -32,7 +32,6 @@
*/
#include "resource.h"
#include "gen/census.pb.h"
#include "third_party/nanopb/pb_decode.h"
#include <grpc/census.h>
@ -43,13 +42,6 @@
#include <stdbool.h>
#include <string.h>
// Internal representation of a resource.
typedef struct {
char *name;
// Pointer to raw protobuf used in resource definition.
uint8_t *raw_pb;
} resource;
// Protect local resource data structures.
static gpr_mu resource_lock;
@ -65,7 +57,7 @@ static size_t n_resources = 0;
// Number of defined resources
static size_t n_defined_resources = 0;
void initialize_resources() {
void initialize_resources(void) {
gpr_mu_init(&resource_lock);
gpr_mu_lock(&resource_lock);
GPR_ASSERT(resources == NULL && n_resources == 0 && n_defined_resources == 0);
@ -78,16 +70,17 @@ void initialize_resources() {
// Delete a resource given it's ID. Must be called with resource_lock held.
static void delete_resource_locked(size_t rid) {
GPR_ASSERT(resources[rid] != NULL && resources[rid]->raw_pb != NULL &&
resources[rid]->name != NULL && n_defined_resources > 0);
GPR_ASSERT(resources[rid] != NULL);
gpr_free(resources[rid]->name);
gpr_free(resources[rid]->raw_pb);
gpr_free(resources[rid]->description);
gpr_free(resources[rid]->numerators);
gpr_free(resources[rid]->denominators);
gpr_free(resources[rid]);
resources[rid] = NULL;
n_defined_resources--;
}
void shutdown_resources() {
void shutdown_resources(void) {
gpr_mu_lock(&resource_lock);
for (size_t i = 0; i < n_resources; i++) {
if (resources[i] != NULL) {
@ -128,8 +121,13 @@ static bool validate_string(pb_istream_t *stream, const pb_field_t *field,
}
break;
case google_census_Resource_description_tag:
// Description is optional, does not need validating, just skip.
if (!pb_read(stream, NULL, stream->bytes_left)) {
if (stream->bytes_left == 0) {
return true;
}
vresource->description = gpr_malloc(stream->bytes_left + 1);
vresource->description[stream->bytes_left] = '\0';
if (!pb_read(stream, (uint8_t *)vresource->description,
stream->bytes_left)) {
return false;
}
break;
@ -144,17 +142,47 @@ static bool validate_string(pb_istream_t *stream, const pb_field_t *field,
return true;
}
// Decode numerators/denominators in a stream. The `count` and `bup`
// (BasicUnit pointer) are pointers to the approriate fields in a resource
// struct.
static bool validate_units_helper(pb_istream_t *stream, int *count,
google_census_Resource_BasicUnit **bup) {
while (stream->bytes_left) {
(*count)++;
// Have to allocate a new array of values. Normal case is 0 or 1, so
// this should normally not be an issue.
google_census_Resource_BasicUnit *new_bup =
gpr_malloc((size_t)*count * sizeof(google_census_Resource_BasicUnit));
if (*count != 1) {
memcpy(new_bup, *bup,
(size_t)(*count - 1) * sizeof(google_census_Resource_BasicUnit));
gpr_free(*bup);
}
*bup = new_bup;
if (!pb_decode_varint(stream, (uint64_t *)(*bup + *count - 1))) {
return false;
}
}
return true;
}
// Validate units field of a Resource proto.
static bool validate_units(pb_istream_t *stream, const pb_field_t *field,
void **arg) {
if (field->tag == google_census_Resource_MeasurementUnit_numerator_tag) {
*(bool *)*arg = true; // has_numerator = true.
}
while (stream->bytes_left) {
uint64_t value;
if (!pb_decode_varint(stream, &value)) {
resource *vresource = (resource *)(*arg);
switch (field->tag) {
case google_census_Resource_MeasurementUnit_numerator_tag:
return validate_units_helper(stream, &vresource->n_numerators,
&vresource->numerators);
break;
case google_census_Resource_MeasurementUnit_denominator_tag:
return validate_units_helper(stream, &vresource->n_denominators,
&vresource->denominators);
break;
default:
gpr_log(GPR_ERROR, "Unknown field type.");
return false;
}
break;
}
return true;
}
@ -170,10 +198,11 @@ static bool validate_resource_pb(const uint8_t *resource_pb,
vresource.name.funcs.decode = &validate_string;
vresource.name.arg = resources[id];
vresource.description.funcs.decode = &validate_string;
vresource.description.arg = resources[id];
vresource.unit.numerator.funcs.decode = &validate_units;
bool has_numerator = false;
vresource.unit.numerator.arg = &has_numerator;
vresource.unit.numerator.arg = resources[id];
vresource.unit.denominator.funcs.decode = &validate_units;
vresource.unit.denominator.arg = resources[id];
pb_istream_t stream =
pb_istream_from_buffer((uint8_t *)resource_pb, resource_pb_size);
@ -181,17 +210,15 @@ static bool validate_resource_pb(const uint8_t *resource_pb,
return false;
}
// A Resource must have a name, a unit, with at least one numerator.
return (resources[id]->name != NULL && vresource.has_unit && has_numerator);
return (resources[id]->name != NULL && vresource.has_unit &&
resources[id]->n_numerators > 0);
}
int32_t census_define_resource(const uint8_t *resource_pb,
size_t resource_pb_size) {
// Allocate a blank resource, and return associated ID. Must be called with
// resource_lock held.
size_t allocate_resource(void) {
// use next_id to optimize expected placement of next new resource.
static size_t next_id = 0;
if (resource_pb == NULL) {
return -1;
}
gpr_mu_lock(&resource_lock);
size_t id = n_resources; // resource ID - initialize to invalid value.
// Expand resources if needed.
if (n_resources == n_defined_resources) {
@ -212,22 +239,25 @@ int32_t census_define_resource(const uint8_t *resource_pb,
}
GPR_ASSERT(id < n_resources && resources[id] == NULL);
resources[id] = gpr_malloc(sizeof(resource));
resources[id]->name = NULL;
memset(resources[id], 0, sizeof(resource));
n_defined_resources++;
next_id = (id + 1) % n_resources;
return id;
}
int32_t census_define_resource(const uint8_t *resource_pb,
size_t resource_pb_size) {
if (resource_pb == NULL) {
return -1;
}
gpr_mu_lock(&resource_lock);
size_t id = allocate_resource();
// Validate pb, extract name.
if (!validate_resource_pb(resource_pb, resource_pb_size, id)) {
if (resources[id]->name != NULL) {
gpr_free(resources[id]->name);
}
gpr_free(resources[id]);
resources[id] = NULL;
delete_resource_locked(id);
gpr_mu_unlock(&resource_lock);
return -1;
}
next_id = (id + 1) % n_resources;
// Make copy of raw proto, and return.
resources[id]->raw_pb = gpr_malloc(resource_pb_size);
memcpy(resources[id]->raw_pb, resource_pb, resource_pb_size);
n_defined_resources++;
gpr_mu_unlock(&resource_lock);
return (int32_t)id;
}
@ -253,3 +283,31 @@ int32_t census_resource_id(const char *name) {
gpr_mu_unlock(&resource_lock);
return -1;
}
int32_t define_resource(const resource *base) {
GPR_ASSERT(base != NULL && base->name != NULL && base->n_numerators > 0 &&
base->numerators != NULL);
gpr_mu_lock(&resource_lock);
size_t id = allocate_resource();
size_t len = strlen(base->name) + 1;
resources[id]->name = gpr_malloc(len);
memcpy(resources[id]->name, base->name, len);
if (base->description) {
len = strlen(base->description);
resources[id]->description = gpr_malloc(len);
memcpy(resources[id]->description, base->description, len);
}
resources[id]->prefix = base->prefix;
resources[id]->n_numerators = base->n_numerators;
len = (size_t)base->n_numerators * sizeof(*base->numerators);
resources[id]->numerators = gpr_malloc(len);
memcpy(resources[id]->numerators, base->numerators, len);
resources[id]->n_denominators = base->n_denominators;
if (base->n_denominators != 0) {
len = (size_t)base->n_denominators * sizeof(*base->denominators);
resources[id]->denominators = gpr_malloc(len);
memcpy(resources[id]->denominators, base->denominators, len);
}
gpr_mu_unlock(&resource_lock);
return (int32_t)id;
}

@ -31,12 +31,33 @@
*
*/
// Census-internal resource definition and manipluation functions.
/* Census-internal resource definition and manipluation functions. */
#ifndef GRPC_CORE_EXT_CENSUS_RESOURCE_H
#define GRPC_CORE_EXT_CENSUS_RESOURCE_H
// Initialize and shutdown the resources subsystem.
void initialize_resources();
void shutdown_resources();
#include <grpc/grpc.h>
#include "gen/census.pb.h"
/* Internal representation of a resource. */
typedef struct {
char *name;
char *description;
int32_t prefix;
int n_numerators;
google_census_Resource_BasicUnit *numerators;
int n_denominators;
google_census_Resource_BasicUnit *denominators;
} resource;
/* Initialize and shutdown the resources subsystem. */
void initialize_resources(void);
void shutdown_resources(void);
/* Add a new resource, given a proposed resource structure. Returns the
resource ID, or -ve on failure.
TODO(aveitch): this function exists to support addition of the base
resources. It should be removed when we have the ability to add resources
from configuration files. */
int32_t define_resource(const resource *base);
#endif /* GRPC_CORE_EXT_CENSUS_RESOURCE_H */

Loading…
Cancel
Save