Massive changes (practically a rewrite). More compact and minimal.

Still doesn't actually work yet, but much closer.
pull/13171/head
Joshua Haberman 16 years ago
parent 75ee6df2ff
commit faf6b5f3bf
  1. 613
      pbstream.c
  2. 116
      pbstream.h
  3. 16
      tests.c

@ -4,7 +4,6 @@
* Copyright (c) 2008-2009 Joshua Haberman. See LICENSE for details.
*/
#include <limits.h>
#include <string.h>
#include "pbstream.h"
@ -17,427 +16,295 @@
#define unlikely(x) (x)
#endif
/* An array, indexed by pbstream_type, that indicates what wire type is
* expected for the given pbstream type. */
static enum pbstream_wire_type expected_wire_type[] = {
PBSTREAM_WIRE_TYPE_64BIT, // PBSTREAM_TYPE_DOUBLE,
PBSTREAM_WIRE_TYPE_32BIT, // PBSTREAM_TYPE_FLOAT,
PBSTREAM_WIRE_TYPE_VARINT, // PBSTREAM_TYPE_INT32,
PBSTREAM_WIRE_TYPE_VARINT, // PBSTREAM_TYPE_INT64,
PBSTREAM_WIRE_TYPE_VARINT, // PBSTREAM_TYPE_UINT32,
PBSTREAM_WIRE_TYPE_VARINT, // PBSTREAM_TYPE_UINT64,
PBSTREAM_WIRE_TYPE_VARINT, // PBSTREAM_TYPE_SINT32,
PBSTREAM_WIRE_TYPE_VARINT, // PBSTREAM_TYPE_SINT64,
PBSTREAM_WIRE_TYPE_32BIT, // PBSTREAM_TYPE_FIXED32,
PBSTREAM_WIRE_TYPE_64BIT, // PBSTREAM_TYPE_FIXED64,
PBSTREAM_WIRE_TYPE_32BIT, // PBSTREAM_TYPE_SFIXED32,
PBSTREAM_WIRE_TYPE_64BIT, // PBSTREAM_TYPE_SFIXED64,
PBSTREAM_WIRE_TYPE_VARINT, // PBSTREAM_TYPE_BOOL,
PBSTREAM_WIRE_TYPE_STRING, // PBSTREAM_TYPE_STRING,
PBSTREAM_WIRE_TYPE_STRING, // PBSTREAM_TYPE_BYTES,
PBSTREAM_WIRE_TYPE_VARINT, // PBSTREAM_TYPE_ENUM,
PBSTREAM_WIRE_TYPE_STRING, // PBSTREAM_TYPE_MESSAGE
};
/* Lowest-level functions -- these read integers from the input buffer.
* To avoid branches, none of these do bounds checking. So we force clients
* to overallocate their buffers by >=9 bytes. */
/* Reads a varint starting at buf (but not past end), storing the result
* in out_value. Returns whether the operation was successful. */
enum pbstream_status get_varint(char **buf, char *end, uint64_t *out_value)
static pbstream_status_t get_v_uint64_t(char **buf, char *end, uint64_t *val)
{
*out_value = 0;
int bitpos = 0;
char *b = *buf;
/* Because we don't check for buffer overrun inside the loop, we require
* that callers use a buffer that is overallocated by at least 9 bytes (the
* maximum we can overrun before the bitpos check catches the problem). */
for(; *b & 0x80 && bitpos < 64; bitpos += 7, b++)
*out_value |= (uint64_t)(*b & 0x7F) << bitpos;
/* If bitpos is 63 (as it will be if this was a nine-byte varint) this will
* throw away the middle six bits of the final byte. We don't bother warning
* about this. */
*out_value |= (uint64_t)(*b & 0x7F) << bitpos;
b++;
if(unlikely(bitpos >= 64)) return PBSTREAM_ERROR_UNTERMINATED_VARINT;
if(unlikely(b > end)) return PBSTREAM_STATUS_INCOMPLETE;
*buf = b;
return PBSTREAM_STATUS_OK;
uint8_t* ptr = (uint8_t*)*buf;
uint32_t b;
uint32_t part0 = 0, part1 = 0, part2 = 0;
b = *(ptr++); part0 = (b & 0x7F) ; if (!(b & 0x80)) goto done;
b = *(ptr++); part0 |= (b & 0x7F) << 7; if (!(b & 0x80)) goto done;
b = *(ptr++); part0 |= (b & 0x7F) << 14; if (!(b & 0x80)) goto done;
b = *(ptr++); part0 |= (b & 0x7F) << 21; if (!(b & 0x80)) goto done;
b = *(ptr++); part1 = (b & 0x7F) ; if (!(b & 0x80)) goto done;
b = *(ptr++); part1 |= (b & 0x7F) << 7; if (!(b & 0x80)) goto done;
b = *(ptr++); part1 |= (b & 0x7F) << 14; if (!(b & 0x80)) goto done;
b = *(ptr++); part1 |= (b & 0x7F) << 21; if (!(b & 0x80)) goto done;
b = *(ptr++); part2 = (b & 0x7F) ; if (!(b & 0x80)) goto done;
b = *(ptr++); part2 |= (b & 0x7F) << 7; if (!(b & 0x80)) goto done;
return PBSTREAM_ERROR_UNTERMINATED_VARINT;
done:
*buf = (char*)ptr;
*val = (uint64_t)part0 | ((uint64_t)part1 << 28) | ((uint64_t)part2 << 56);
return unlikely(*buf > end) ? PBSTREAM_STATUS_INCOMPLETE : PBSTREAM_STATUS_OK;
}
/* TODO: the little-endian versions of these functions don't respect alignment.
* While it's hard to believe that this could be less efficient than the
* alternative (the big endian implementation), this deserves some tests and
* measurements to be sure. */
enum pbstream_status get_32_le(char **buf, char *end, uint32_t *out_value)
static pbstream_status_t get_v_uint32_t(char **buf, char *end, uint32_t *val)
{
char *b = *buf;
char *int32_end = b+4;
if(unlikely(int32_end > end)) return PBSTREAM_STATUS_INCOMPLETE;
#if __BYTE_ORDER == __LITTLE_ENDIAN
*out_value = *(uint32_t*)b;
#else
*out_value = b[0] | (b[1] << 8) | (b[2] << 16) | (b[3] << 24);
#endif
*buf = int32_end;
return PBSTREAM_STATUS_OK;
uint8_t* ptr = (uint8_t*)*buf;
uint32_t b;
uint32_t result;
b = *(ptr++); result = (b & 0x7F) ; if (!(b & 0x80)) goto done;
b = *(ptr++); result |= (b & 0x7F) << 7; if (!(b & 0x80)) goto done;
b = *(ptr++); result |= (b & 0x7F) << 14; if (!(b & 0x80)) goto done;
b = *(ptr++); result |= (b & 0x7F) << 21; if (!(b & 0x80)) goto done;
b = *(ptr++); result = (b & 0x7F) << 28; if (!(b & 0x80)) goto done;
return PBSTREAM_ERROR_UNTERMINATED_VARINT;
done:
*buf = (char*)ptr;
*val = result;
return unlikely(*buf > end) ? PBSTREAM_STATUS_INCOMPLETE: PBSTREAM_STATUS_OK;
}
bool get_64_le(char **buf, char *end, uint64_t *out_value)
static pbstream_status_t get_f_uint32_t(char **buf, char *end, uint32_t *val)
{
char *b = *buf;
char *int64_end = b+8;
if(unlikely(int64_end > end)) return PBSTREAM_STATUS_INCOMPLETE;
uint8_t *b = (uint8_t*)*buf;
#if __BYTE_ORDER == __LITTLE_ENDIAN
*out_value = *(uint64_t*)buf;
*val = *(uint32_t*)b; /* likely unaligned, TODO: verify performance. */
#else
*out_value = (b[0]) | (b[1] << 8 ) | (b[2] << 16) | (b[3] << 24) |
(b[4] << 32) | (b[5] << 40) | (b[6] << 48) | (b[7] << 56);
*val = b[0] | (b[1] << 8) | (b[2] << 16) | (b[3] << 24);
#endif
*buf = int64_end;
return PBSTREAM_STATUS_OK;
*buf = (char*)b + sizeof(uint32_t);
return unlikely(*buf > end) ? PBSTREAM_STATUS_INCOMPLETE : PBSTREAM_STATUS_OK;
}
int32_t zigzag_decode_32(uint32_t n)
static pbstream_status_t get_f_uint64_t(char **buf, char *end, uint64_t *val)
{
return (n >> 1) ^ -(int32_t)(n & 1);
uint8_t *b = (uint8_t*)*buf;
#if __BYTE_ORDER == __LITTLE_ENDIAN
*val = *(uint64_t*)buf; /* likely unaligned, TODO: verify performance. */
#else
*val = (b[0]) | (b[1] << 8 ) | (b[2] << 16) | (b[3] << 24) |
(b[4] << 32) | (b[5] << 40) | (b[6] << 48) | (b[7] << 56);
#endif
*buf = (char*)b + sizeof(uint64_t);
return unlikely(*buf > end) ? PBSTREAM_STATUS_INCOMPLETE : PBSTREAM_STATUS_OK;
}
int64_t zigzag_decode_64(uint64_t n)
{
return (n >> 1) ^ -(int64_t)(n & 1);
}
static int32_t zz_decode_32(uint32_t n) { return (n >> 1) ^ -(int32_t)(n & 1); }
static int64_t zz_decode_64(uint64_t n) { return (n >> 1) ^ -(int64_t)(n & 1); }
/* Parses the next field-number/wire-value pair from the stream of bytes
* starting at *buf, without reading past end. Stores the parsed and wire
* value in *field_number and *wire_value, respectively.
*
* Returns a status indicating whether the operation was successful. If the
* return status is STATUS_INCOMPLETE, returns the number of additional bytes
* requred in *need_more_bytes. Updates *buf to point past the end of the
* parsed data if the operation was successful.
*/
enum pbstream_status pbstream_parse_wire_value(
char **buf, char *end,
pbstream_field_number_t *field_number,
struct pbstream_wire_value *wire_value,
int *need_more_bytes)
{
char *b = *buf; /* Our local buf pointer -- only update buf if we succeed. */
#define DECODE(dest, func) \
do { \
enum pbstream_status status = func(&b, end, &dest); \
if(unlikely(status != PBSTREAM_STATUS_OK)) { \
*need_more_bytes = 0; /* This only arises below in this function. */ \
return status; \
} \
#define CHECK(func) do { \
pbstream_wire_type_t status = func; \
if(status != PBSTREAM_STATUS_OK) return status; \
} while (0)
uint64_t key;
DECODE(key, get_varint);
/* WVTOV() generates a function:
* void wvtov_TYPE(wire_t src, val_t *dst, size_t offset)
* (macro invoker defines the body of the function). */
#define WVTOV(type, wire_t, val_t) \
static void wvtov_ ## type(wire_t s, val_t *d, size_t offset)
/* GET() generates a function:
* pbstream_status_t get_TYPE(char **buf, char *end, size_t offset,
* pbstream_value *dst) */
#define GET(type, v_or_f, wire_t, val_t, member_name) \
static pbstream_status_t get_ ## type(char **buf, char *end, size_t offset, \
struct pbstream_value *d) { \
wire_t tmp; \
CHECK(get_ ## v_or_f ## _ ## wire_t(buf, end, &tmp)); \
wvtov_ ## type(tmp, &d->v.member_name, offset); \
return PBSTREAM_STATUS_OK; \
}
*field_number = key >> 3;
wire_value->type = key & 0x07;
#define T(type, v_or_f, wire_t, val_t, member_name) \
WVTOV(type, wire_t, val_t); /* prototype for GET below */ \
GET(type, v_or_f, wire_t, val_t, member_name) \
WVTOV(type, wire_t, val_t)
T(DOUBLE, f, uint64_t, double, _double){ memcpy(d, &s, sizeof(double)); }
T(FLOAT, f, uint32_t, float, _float) { memcpy(d, &s, sizeof(float)); }
T(INT32, v, uint32_t, int32_t, int32) { *d = (int32_t)s; }
T(INT64, v, uint64_t, int64_t, int64) { *d = (int64_t)s; }
T(UINT32, v, uint32_t, uint32_t, uint32) { *d = s; }
T(UINT64, v, uint64_t, uint64_t, uint64) { *d = s; }
T(SINT32, v, uint32_t, int32_t, int32) { *d = zz_decode_32(s); }
T(SINT64, v, uint64_t, int64_t, int64) { *d = zz_decode_64(s); }
T(FIXED32, f, uint32_t, uint32_t, uint32) { *d = s; }
T(FIXED64, f, uint64_t, uint64_t, uint64) { *d = s; }
T(SFIXED32, f, uint32_t, int32_t, int32) { *d = (int32_t)s; }
T(SFIXED64, f, uint64_t, int64_t, int64) { *d = (int64_t)s; }
T(BOOL, v, uint32_t, bool, _bool) { *d = (bool)s; }
T(ENUM, v, uint32_t, int32_t, _enum) { *d = (int32_t)s; }
#define T_DELIMITED(type) \
T(type, v, uint32_t, struct pbstream_delimited, delimited) { \
d->offset = offset; \
d->len = s; \
}
T_DELIMITED(STRING); /* We leave UTF-8 validation to the client. */
T_DELIMITED(BYTES);
T_DELIMITED(MESSAGE);
#undef WVTOV
#undef GET
#undef T
#undef T_DELIMITED
struct pbstream_type_info {
pbstream_wire_type_t expected_wire_type;
pbstream_status_t (*get)(char **buf, char *end, size_t offset,
struct pbstream_value *d);
};
static struct pbstream_type_info type_info[] = {
{PBSTREAM_WIRE_TYPE_64BIT, get_DOUBLE},
{PBSTREAM_WIRE_TYPE_32BIT, get_FLOAT},
{PBSTREAM_WIRE_TYPE_VARINT, get_INT32},
{PBSTREAM_WIRE_TYPE_VARINT, get_INT64},
{PBSTREAM_WIRE_TYPE_VARINT, get_UINT32},
{PBSTREAM_WIRE_TYPE_VARINT, get_UINT64},
{PBSTREAM_WIRE_TYPE_VARINT, get_SINT32},
{PBSTREAM_WIRE_TYPE_VARINT, get_SINT64},
{PBSTREAM_WIRE_TYPE_32BIT, get_FIXED32},
{PBSTREAM_WIRE_TYPE_64BIT, get_FIXED64},
{PBSTREAM_WIRE_TYPE_32BIT, get_SFIXED32},
{PBSTREAM_WIRE_TYPE_64BIT, get_SFIXED64},
{PBSTREAM_WIRE_TYPE_VARINT, get_BOOL},
{PBSTREAM_WIRE_TYPE_DELIMITED, get_STRING},
{PBSTREAM_WIRE_TYPE_DELIMITED, get_BYTES},
{PBSTREAM_WIRE_TYPE_VARINT, get_ENUM},
{PBSTREAM_WIRE_TYPE_DELIMITED, get_MESSAGE}
};
switch(wire_value->type) {
case PBSTREAM_WIRE_TYPE_VARINT:
DECODE(wire_value->v.varint, get_varint);
break;
static pbstream_status_t parse_tag(char **buf, char *end, struct pbstream_tag *tag)
{
uint32_t tag_int;
CHECK(get_v_uint32_t(buf, end, &tag_int));
tag->wire_type = tag_int & 0x07;
tag->field_number = tag_int >> 3;
return PBSTREAM_STATUS_OK;
}
static pbstream_status_t parse_unknown_value(
char **buf, char *end, int buf_offset,
struct pbstream_wire_value *wv)
{
#define DECODE(dest, func) CHECK(func(buf, end, &dest))
switch(wv->type) {
case PBSTREAM_WIRE_TYPE_VARINT:
DECODE(wv->v.varint, get_v_uint64_t); break;
case PBSTREAM_WIRE_TYPE_64BIT:
DECODE(wire_value->v._64bit, get_64_le);
break;
case PBSTREAM_WIRE_TYPE_STRING: {
uint64_t string_len;
DECODE(string_len, get_varint);
if (unlikely(string_len > INT_MAX)) {
/* TODO: notice this and fail. */
}
wire_value->v.string.len = (int)string_len;
if(b + wire_value->v.string.len > end) {
*need_more_bytes = b + wire_value->v.string.len - end;
return PBSTREAM_STATUS_INCOMPLETE;
}
wire_value->v.string.data = b;
b += wire_value->v.string.len;
DECODE(wv->v._64bit, get_f_uint64_t); break;
case PBSTREAM_WIRE_TYPE_32BIT:
DECODE(wv->v._32bit, get_f_uint32_t); break;
case PBSTREAM_WIRE_TYPE_DELIMITED: {
uint32_t len;
wv->v.delimited.offset = buf_offset;
DECODE(len, get_v_uint32_t);
wv->v.delimited.len = (size_t)len;
break;
}
case PBSTREAM_WIRE_TYPE_START_GROUP:
case PBSTREAM_WIRE_TYPE_END_GROUP:
/* TODO (though these are deprecated, so not high priority). */
break;
case PBSTREAM_WIRE_TYPE_32BIT:
DECODE(wire_value->v._32bit, get_32_le);
break;
}
*buf = b;
return true;
return PBSTREAM_STATUS_OK;
#undef DECODE
}
/* Translates from a wire value to a .proto value. The caller should have
* already checked that the wire_value is of the correct type. The pbstream
* type must not be PBSTREAM_TYPE_MESSAGE. This operation always succeeds. */
void pbstream_translate_field(struct pbstream_wire_value *wire_value,
enum pbstream_type type,
struct pbstream_value *out_value)
{
out_value->type = type;
switch(type) {
case PBSTREAM_TYPE_DOUBLE:
memcpy(&out_value->v._double, &wire_value->v._64bit, sizeof(double));
break;
case PBSTREAM_TYPE_FLOAT:
memcpy(&out_value->v._float, &wire_value->v._32bit, sizeof(float));
break;
case PBSTREAM_TYPE_INT32:
out_value->v.int32 = (int32_t)wire_value->v.varint;
break;
case PBSTREAM_TYPE_INT64:
out_value->v.int64 = (int64_t)zigzag_decode_64(wire_value->v.varint);
break;
case PBSTREAM_TYPE_UINT32:
out_value->v.uint32 = (uint32_t)wire_value->v.varint;
break;
case PBSTREAM_TYPE_UINT64:
out_value->v.uint64 = (uint64_t)wire_value->v.varint;
break;
case PBSTREAM_TYPE_SINT32:
out_value->v.int32 = zigzag_decode_32(wire_value->v.varint);
break;
case PBSTREAM_TYPE_SINT64:
out_value->v.int64 = zigzag_decode_64(wire_value->v.varint);
break;
case PBSTREAM_TYPE_FIXED32:
out_value->v.int32 = wire_value->v._32bit;
break;
case PBSTREAM_TYPE_FIXED64:
out_value->v.int64 = wire_value->v._64bit;
break;
case PBSTREAM_TYPE_SFIXED32:
out_value->v.int32 = (int32_t)wire_value->v._32bit;
break;
case PBSTREAM_TYPE_SFIXED64:
out_value->v.int64 = (int64_t)wire_value->v._64bit;
break;
case PBSTREAM_TYPE_BOOL:
out_value->v._bool = (bool)wire_value->v.varint;
break;
case PBSTREAM_TYPE_STRING:
out_value->v.string.data = wire_value->v.string.data;
out_value->v.string.len = wire_value->v.string.len;
/* TODO: validate UTF-8? */
break;
case PBSTREAM_TYPE_BYTES:
out_value->v.bytes.data = wire_value->v.string.data;
out_value->v.bytes.len = wire_value->v.string.len;
break;
#define CALLBACK(s, func, ...) do { \
if(s->callbacks.func) s->callbacks.func(__VA_ARGS__); \
} while (0)
case PBSTREAM_TYPE_ENUM:
out_value->v._enum = (bool)wire_value->v.varint;
break;
#define NONFATAL_ERROR(s, code) do { \
if(s->ignore_nonfatal_errors) CALLBACK(s, error_callback, code); \
else return code; \
} while (0)
case PBSTREAM_TYPE_MESSAGE:
/* Should never happen. */
break;
}
static struct pbstream_field_descriptor *find_field_descriptor(
struct pbstream_message_descriptor* md,
pbstream_field_number_t field_number)
{
/* Likely will want to replace linear search with something better. */
for (int i = 0; i < md->fields_len; i++)
if (md->fields[i].field_number == field_number) return &md->fields[i];
return NULL;
}
/* Given a wire value that was just parsed and a matching field descriptor,
* processes the given value and performs the appropriate actions. These
* actions include:
* - checking that the wire type is as expected
* - converting the wire type to a .proto type
* - entering a sub-message, if that is in fact what this field implies.
*
* This function also calls user callbacks pertaining to any of the above at
* the appropriate times. */
void process_value(struct pbstream_parse_state *s,
struct pbstream_wire_value *wire_value,
struct pbstream_field_descriptor *field_descriptor)
/* Process actions associated with the end of a [sub-]message. */
pbstream_status_t process_message_end(struct pbstream_parse_state *s)
{
/* Check that the wire type is appropriate for this .proto type. */
if(unlikely(wire_value->type != expected_wire_type[field_descriptor->type])) {
/* Type mismatch. */
if(s->callbacks.error_callback) {
/* TODO: a nice formatted message. */
s->callbacks.error_callback(PBSTREAM_ERROR_MISMATCHED_TYPE, NULL,
s->offset, false);
struct pbstream_parse_stack_frame *frame = DYNARRAY_GET_TOP(s->stack);
/* A submessage that doesn't end exactly on a field boundary indicates
* corruption. */
if(unlikely(s->offset != frame->end_offset))
return PBSTREAM_ERROR_BAD_SUBMESSAGE_END;
/* Check required fields. */
struct pbstream_message_descriptor *md = frame->message_descriptor;
for(int i = 0; i < md->fields_len; i++) {
struct pbstream_field_descriptor *fd = &md->fields[i];
if(fd->seen_field_num && !frame->seen_fields[fd->seen_field_num] &&
fd->cardinality == PBSTREAM_CARDINALITY_REQUIRED) {
NONFATAL_ERROR(s, PBSTREAM_ERROR_MISSING_REQUIRED_FIELD);
}
}
RESIZE_DYNARRAY(s->stack, s->stack_len-1);
return PBSTREAM_STATUS_OK;
}
/* Report the wire value we parsed as an unknown value. */
if(s->callbacks.unknown_value_callback) {
s->callbacks.unknown_value_callback(field_descriptor->field_number,
wire_value,
s->user_data);
}
return;
/* Parses and processes the next value from buf (but not past end). */
pbstream_status_t parse_field(struct pbstream_parse_state *s,
char *buf, char *end,
pbstream_field_number_t *fieldnum,
struct pbstream_value *val,
struct pbstream_wire_value *wv)
{
struct pbstream_parse_stack_frame *frame = DYNARRAY_GET_TOP(s->stack);
struct pbstream_message_descriptor *md = frame->message_descriptor;
struct pbstream_tag tag;
struct pbstream_field_descriptor *fd;
struct pbstream_type_info *info;
char *b = buf;
if(unlikely(s->offset >= frame->end_offset)) return process_message_end(s);
CHECK(parse_tag(&b, end, &tag));
size_t val_offset = s->offset + (b-buf);
fd = find_field_descriptor(md, tag.field_number);
if(unlikely(!fd)) goto unknown_value;
info = &type_info[fd->type];
/* Check type and cardinality. */
if(unlikely(tag.wire_type != info->expected_wire_type)) {
NONFATAL_ERROR(s, PBSTREAM_ERROR_MISMATCHED_TYPE);
goto unknown_value;
}
if(fd->seen_field_num > 0) {
if(unlikely(frame->seen_fields[fd->seen_field_num]))
NONFATAL_ERROR(s, PBSTREAM_ERROR_DUPLICATE_FIELD);
frame->seen_fields[fd->seen_field_num] = true;
}
if(field_descriptor->type == PBSTREAM_TYPE_MESSAGE) {
if(unlikely(fd->type == PBSTREAM_TYPE_MESSAGE)) {
/* We're entering a sub-message. */
if(s->callbacks.begin_message_callback) {
s->callbacks.begin_message_callback(field_descriptor->d.message,
s->user_data);
}
/* Push and initialize a new stack frame. */
CHECK(info->get(&b, end, val_offset, val));
RESIZE_DYNARRAY(s->stack, s->stack_len+1);
struct pbstream_parse_stack_frame *frame = DYNARRAY_GET_TOP(s->stack);
frame->message_descriptor = field_descriptor->d.message;
frame->end_offset = 0; /* TODO: set this correctly. */
frame->message_descriptor = fd->d.message;
frame->end_offset = val->v.delimited.offset + val->v.delimited.len;
s->offset = wv->v.delimited.offset; /* skip past only the tag. */
int num_seen_fields = frame->message_descriptor->num_seen_fields;
INIT_DYNARRAY(frame->seen_fields, num_seen_fields, num_seen_fields);
} else {
/* This is a scalar value. */
struct pbstream_value value;
pbstream_translate_field(wire_value, field_descriptor->type, &value);
if(s->callbacks.value_callback) {
s->callbacks.value_callback(field_descriptor, &value, s->user_data);
}
}
}
struct pbstream_field_descriptor *find_field_descriptor_by_number(
struct pbstream_message_descriptor* message_descriptor,
pbstream_field_number_t field_number)
{
/* Currently a linear search -- could be optimized to do a binary search, hash
* table lookup, or any other number of clever things you might imagine. */
for (int i = 0; i < message_descriptor->fields_len; i++)
if (message_descriptor->fields[i].field_number == field_number)
return &message_descriptor->fields[i];
return NULL;
}
/* Parses and processes the next value from *buf (but not past end), returning
* a status indicating whether the operation succeeded, and calling appropriate
* callbacks. If more data is needed to parse the last partial field, returns
* how many more bytes are needed in need_more_bytes. Updates *buf to point
* past the parsed value if the operation succeeds. */
enum pbstream_status pbstream_parse_field(struct pbstream_parse_state *s,
char **buf, char *end,
int *need_more_bytes)
{
struct pbstream_parse_stack_frame *frame = DYNARRAY_GET_TOP(s->stack);
struct pbstream_message_descriptor *message_descriptor =
frame->message_descriptor;
pbstream_field_number_t field_number;
struct pbstream_wire_value wire_value;
enum pbstream_status status;
/* Decode the raw wire data. */
status = pbstream_parse_wire_value(buf, end, &field_number, &wire_value,
need_more_bytes);
if(unlikely(status != PBSTREAM_STATUS_OK)) {
if(status == PBSTREAM_ERROR_UNTERMINATED_VARINT &&
s->callbacks.error_callback) {
/* TODO: a nice formatted message. */
s->callbacks.error_callback(PBSTREAM_ERROR_UNTERMINATED_VARINT, NULL,
s->offset, true);
}
s->fatal_error = true;
return status;
}
/* Find the corresponding field definition from the .proto file. */
struct pbstream_field_descriptor *field_descriptor;
field_descriptor = find_field_descriptor_by_number(message_descriptor,
field_number);
if(likely(field_descriptor != NULL)) {
if(field_descriptor->seen_field_num > 0) { /* for non-repeated fields */
/* Check that this field has not been seen before. */
if(frame->seen_fields[field_descriptor->seen_field_num]) {
if(s->callbacks.error_callback)
s->callbacks.error_callback(PBSTREAM_ERROR_DUPLICATE_FIELD, NULL,
s->offset, false);
return PBSTREAM_STATUS_ERROR;
}
/* Mark the field as seen. */
frame->seen_fields[field_descriptor->seen_field_num] = true;
}
process_value(s, &wire_value, field_descriptor);
} else {
/* This field was not defined in the .proto file. */
if(s->callbacks.unknown_value_callback)
s->callbacks.unknown_value_callback(field_number, &wire_value,
s->user_data);
*fieldnum = tag.field_number;
val->type = fd->type;
CHECK(info->get(&b, end, val_offset, val));
s->offset += (b-buf);
}
return PBSTREAM_STATUS_OK;
}
/* Process actions associated with the end of a submessage. This includes:
* - emitting default values for all optional elements (either explicit
* defaults or implicit defaults).
* - emitting errors for any required fields that were not seen.
* - calling the user's callback.
* - popping the stack frame. */
void process_submessage_end(struct pbstream_parse_state *s)
{
/* TODO: emit default values for optional elements. either explicit defaults
* (specified in the .proto file) or implicit defaults (which are specified
* in the pbstream definition, by type. */
/* TODO: emit errors for required fields that were not seen. */
/* Process the end of message by calling the user's callback and popping
* our stack frame. */
if(s->callbacks.end_message_callback)
s->callbacks.end_message_callback(s->user_data);
/* Pop the stack frame associated with this submessage. */
RESIZE_DYNARRAY(s->stack, s->stack_len-1);
}
enum pbstream_status pbstream_parse(struct pbstream_parse_state *s,
char *buf_start, int buf_len,
int *consumed_bytes, int *need_more_bytes)
{
char *buf = buf_start;
char *end = buf_start + buf_len;
int buf_start_offset = s->offset;
enum pbstream_status status = PBSTREAM_STATUS_OK;
while(buf < end) {
/* Check for a submessage ending. */
while(s->offset >= DYNARRAY_GET_TOP(s->stack)->end_offset) {
/* A submessage that doesn't end exactly on a field boundary indicates
* corruption. */
if(unlikely(s->offset != DYNARRAY_GET_TOP(s->stack)->end_offset)) {
if(s->callbacks.error_callback) {
s->callbacks.error_callback(PBSTREAM_ERROR_BAD_SUBMESSAGE_END, NULL,
s->offset, true);
}
s->fatal_error = true;
break;
}
process_submessage_end(s);
}
status = pbstream_parse_field(s, &buf, end, need_more_bytes);
if(status != PBSTREAM_STATUS_OK)
break;
s->offset = buf_start_offset + (buf - buf_start);
}
return status;
unknown_value:
wv->type = tag.wire_type;
CHECK(parse_unknown_value(&b, end, val_offset, wv));
s->offset += (b-buf);
return PBSTREAM_STATUS_OK;
}

@ -10,7 +10,7 @@
#include "dynarray.h"
/* A list of types as they can appear in a .proto file. */
enum pbstream_type {
typedef enum pbstream_type {
PBSTREAM_TYPE_DOUBLE,
PBSTREAM_TYPE_FLOAT,
PBSTREAM_TYPE_INT32,
@ -26,34 +26,32 @@ enum pbstream_type {
PBSTREAM_TYPE_BOOL,
PBSTREAM_TYPE_STRING,
PBSTREAM_TYPE_BYTES,
PBSTREAM_TYPE_ENUM,
PBSTREAM_TYPE_MESSAGE
};
} pbstream_type_t;
/* A list of types as they are encoded on-the-wire. */
enum pbstream_wire_type {
typedef enum pbstream_wire_type {
PBSTREAM_WIRE_TYPE_VARINT = 0,
PBSTREAM_WIRE_TYPE_64BIT = 1,
PBSTREAM_WIRE_TYPE_STRING = 2,
PBSTREAM_WIRE_TYPE_DELIMITED = 2,
PBSTREAM_WIRE_TYPE_START_GROUP = 3,
PBSTREAM_WIRE_TYPE_END_GROUP = 4,
PBSTREAM_WIRE_TYPE_32BIT = 5,
};
} pbstream_wire_type_t;
/* Each field must have a cardinality that is one of the following. */
enum pbstream_cardinality {
typedef enum pbstream_cardinality {
PBSTREAM_CARDINALITY_OPTIONAL, /* must appear 0 or 1 times */
PBSTREAM_CARDINALITY_REQUIRED, /* must appear exactly 1 time */
PBSTREAM_CARDINALITY_REPEATED, /* may appear 0 or more times */
};
} pbstream_cardinality_t;
typedef int32_t pbstream_field_number_t;
/* A deserialized value as described in a .proto file. */
struct pbstream_value {
enum pbstream_type type;
pbstream_type_t type;
union {
double _double;
float _float;
@ -62,28 +60,29 @@ struct pbstream_value {
uint32_t uint32;
uint64_t uint64;
bool _bool;
struct {
char *data; /* points into the client's input buffer */
int len;
} string;
struct {
char *data; /* points into the client's input buffer */
struct pbstream_delimited {
size_t offset; /* relative to the beginning of the stream. */
int len;
} bytes;
} delimited;
int32_t _enum;
} v;
};
struct pbstream_tag {
pbstream_field_number_t field_number;
pbstream_wire_type_t wire_type;
};
/* A value as it is encoded on-the-wire */
struct pbstream_wire_value {
enum pbstream_wire_type type;
pbstream_wire_type_t type;
union {
uint64_t varint;
uint64_t _64bit;
struct {
char *data; /* points into the client's input buffer */
size_t offset; /* relative to the beginning of the stream. */
int len;
} string;
} delimited;
uint32_t _32bit;
} v;
};
@ -113,8 +112,8 @@ struct pbstream_enum_descriptor {
struct pbstream_field_descriptor {
pbstream_field_number_t field_number;
char *name;
enum pbstream_type type;
enum pbstream_cardinality cardinality;
pbstream_type_t type;
pbstream_cardinality_t cardinality;
struct pbstream_value *default_value; /* NULL if none */
/* Index into the "seen" list for the message. -1 for repeated fields (for
@ -137,57 +136,38 @@ struct pbstream_message_descriptor {
DEFINE_DYNARRAY(enums, struct pbstream_enum_descriptor);
};
/* Callback for when a regular value is parsed. */
typedef void (*pbstream_value_callback_t)(
struct pbstream_field_descriptor *field_descriptor,
struct pbstream_value *value,
void *user_data);
/* Callback for when an error occurred.
* The description is a static buffer which the client must not free. The
* offset is the location in the input where the error was detected (this
* offset is relative to the beginning of the stream). If is_fatal is true,
* parsing cannot continue. */
typedef enum pbstream_status {
PBSTREAM_STATUS_OK = 0,
PBSTREAM_STATUS_INCOMPLETE = 1, /* buffer ended in the middle of a field */
/* Callback for when a value is parsed but wasn't in the .proto file. */
typedef void (*pbstream_unknown_value_callback_t)(
pbstream_field_number_t field_number,
struct pbstream_wire_value *wire_value,
void *user_data);
/** FATAL ERRORS: these indicate corruption, and cannot be recovered. */
/* Callback for when a nested message is beginning. */
typedef void (*pbstream_begin_message_callback_t)(
struct pbstream_message_descriptor *message_descriptor,
void *user_data);
// A varint did not terminate before hitting 64 bits.
PBSTREAM_ERROR_UNTERMINATED_VARINT,
/* Callback for when a nested message is ending. */
typedef void (*pbstream_end_message_callback_t)(void *user_data);
// A submessage ended in the middle of data.
PBSTREAM_ERROR_BAD_SUBMESSAGE_END,
/* Callback for when an error occurred. */
enum pbstream_error {
/* A varint did not terminate before hitting 64 bits. Fatal. */
PBSTREAM_ERROR_UNTERMINATED_VARINT,
/** NONFATAL ERRORS: the input was invalid, but we can continue if desired. */
/* A field marked "required" was not present. */
// A field marked "required" was not present. */
PBSTREAM_ERROR_MISSING_REQUIRED_FIELD,
/* An optional or required field appeared more than once. */
// An optional or required field appeared more than once.
PBSTREAM_ERROR_DUPLICATE_FIELD,
/* A field was encoded with the wrong wire type. */
// A field was encoded with the wrong wire type.
PBSTREAM_ERROR_MISMATCHED_TYPE,
/* A submessage ended in the middle of data. Indicates corruption. */
PBSTREAM_ERROR_BAD_SUBMESSAGE_END,
};
/* The description is a static buffer which the client must not free. The
* offset is the location in the input where the error was detected (this
* offset is relative to the beginning of the stream). If is_fatal is true,
* parsing cannot continue. */
typedef void (*pbstream_error_callback_t)(enum pbstream_error error,
char *description,
int offset, bool is_fatal);
} pbstream_status_t;
typedef void (*pbstream_error_callback_t)(pbstream_status_t error);
struct pbstream_callbacks {
pbstream_value_callback_t value_callback;
pbstream_unknown_value_callback_t unknown_value_callback;
pbstream_begin_message_callback_t begin_message_callback;
pbstream_end_message_callback_t end_message_callback;
pbstream_error_callback_t error_callback;
pbstream_error_callback_t error_callback;
};
struct pbstream_parse_stack_frame {
@ -201,8 +181,8 @@ struct pbstream_parse_stack_frame {
/* The stream parser's state. */
struct pbstream_parse_state {
struct pbstream_callbacks callbacks;
int offset;
bool fatal_error;
size_t offset;
bool ignore_nonfatal_errors;
void *user_data;
DEFINE_DYNARRAY(stack, struct pbstream_parse_stack_frame);
};
@ -227,12 +207,6 @@ void pbstream_init_parser(
* thus parsing of the pbstream cannot proceed) unless need_more_bytes more
* data is available upon the next call to parse. The caller may need to
* increase its buffer size. */
enum pbstream_status {
PBSTREAM_STATUS_OK = 0,
PBSTREAM_STATUS_INCOMPLETE = 1, /* buffer ended in the middle of a field */
PBSTREAM_STATUS_ERROR = 2, /* fatal error in the file, cannot recover */
};
enum pbstream_status pbstream_parse(struct pbstream_parse_state *state,
char *buf, int buf_len,
int *consumed_bytes, int *need_more_bytes);
pbstream_status_t pbstream_parse(struct pbstream_parse_state *state,
char *buf, int buf_len, int buf_offset);

@ -2,14 +2,14 @@
#include <stdio.h>
#include "pbstream.c"
void test_get_varint()
void test_get_v_uint64_t()
{
enum pbstream_status status;
char zero[] = {0x00};
char *zero_buf = zero;
uint64_t zero_val;
status = get_varint(&zero_buf, zero_buf+sizeof(zero), &zero_val);
status = get_v_uint64_t(&zero_buf, zero_buf+sizeof(zero), &zero_val);
assert(status == PBSTREAM_STATUS_OK);
assert(zero_val == 0);
assert(zero_buf == zero + sizeof(zero));
@ -17,40 +17,40 @@ void test_get_varint()
char one[] = {0x01};
char *one_buf = one;
uint64_t one_val;
status = get_varint(&one_buf, one_buf+sizeof(one), &one_val);
status = get_v_uint64_t(&one_buf, one_buf+sizeof(one), &one_val);
assert(status == PBSTREAM_STATUS_OK);
assert(one_val == 1);
char twobyte[] = {0xAC, 0x02};
char *twobyte_buf = twobyte;
uint64_t twobyte_val;
status = get_varint(&twobyte_buf, twobyte_buf+sizeof(twobyte), &twobyte_val);
status = get_v_uint64_t(&twobyte_buf, twobyte_buf+sizeof(twobyte), &twobyte_val);
assert(status == PBSTREAM_STATUS_OK);
assert(twobyte_val == 300);
char ninebyte[] = {0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x7F};
char *ninebyte_buf = ninebyte;
uint64_t ninebyte_val;
status = get_varint(&ninebyte_buf, ninebyte_buf+sizeof(ninebyte), &ninebyte_val);
status = get_v_uint64_t(&ninebyte_buf, ninebyte_buf+sizeof(ninebyte), &ninebyte_val);
assert(status == PBSTREAM_STATUS_OK);
assert(ninebyte_val == (1LL<<63));
char overrun[] = {0x80, 0x01};
char *overrun_buf = overrun;
uint64_t overrun_val;
status = get_varint(&overrun_buf, overrun_buf+sizeof(overrun)-1, &overrun_val);
status = get_v_uint64_t(&overrun_buf, overrun_buf+sizeof(overrun)-1, &overrun_val);
assert(status == PBSTREAM_STATUS_INCOMPLETE);
char tenbyte[] = {0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01};
char *tenbyte_buf = tenbyte;
uint64_t tenbyte_val;
status = get_varint(&tenbyte_buf, tenbyte_buf+sizeof(tenbyte), &tenbyte_val);
status = get_v_uint64_t(&tenbyte_buf, tenbyte_buf+sizeof(tenbyte), &tenbyte_val);
assert(status == PBSTREAM_ERROR_UNTERMINATED_VARINT);
}
int main()
{
test_get_varint();
test_get_v_uint64_t();
printf("All tests passed.\n");
return 0;
}

Loading…
Cancel
Save