Work on decoder buffering.

pull/13171/head
Joshua Haberman 15 years ago
parent d7e631d9b0
commit edd1f5a61f
  1. 148
      src/upb_decoder.c
  2. 5
      src/upb_srcsink.h

@ -42,19 +42,21 @@ struct upb_decoder {
// We keep a stack of messages we have recursed into. // We keep a stack of messages we have recursed into.
upb_decoder_frame stack[UPB_MAX_NESTING], *top, *limit; upb_decoder_frame stack[UPB_MAX_NESTING], *top, *limit;
// The buffers of input data. See buffering code below for details. // The buffer of input data. NULL is equivalent to the empty string.
upb_string *buf; upb_string *buf;
upb_string *nextbuf;
uint8_t tmpbuf[UPB_MAX_ENCODED_SIZE]; // Used to bridge buf and nextbuf. // Holds residual bytes when fewer than UPB_MAX_ENCODED_SIZE bytes remain.
uint8_t tmpbuf[UPB_MAX_ENCODED_SIZE];
// The number of bytes we have yet to consume from "buf". This can be // The number of bytes we have yet to consume from "buf". This can be
// negative if we have skipped more bytes than are in the buffer, or if we // negative if we have skipped more bytes than are in the buffer, or if we
// have started to consume bytes from "nextbuf". // have started to consume bytes from "nextbuf".
int32_t buf_bytesleft; int32_t buf_bytesleft;
int32_t buf_offset;
// The overall stream offset of the end of "buf". If "buf" is NULL, it is as // The overall stream offset of the end of "buf". If "buf" is NULL, it is as
// if "buf" was the empty string. // if "buf" was the empty string.
uint32_t buf_endoffset; uint32_t buf_stream_offset;
// Fielddef for the key we just read. // Fielddef for the key we just read.
upb_fielddef *field; upb_fielddef *field;
@ -76,29 +78,39 @@ struct upb_decoder {
static upb_strlen_t upb_decoder_offset(upb_decoder *d) static upb_strlen_t upb_decoder_offset(upb_decoder *d)
{ {
return d->buf_endoffset - d->buf_bytesleft; return d->buf_stream_offset - d->buf_offset;
} }
// Discards the current buffer if we are done with it, make the next buffer static bool upb_decoder_nextbuf(upb_decoder *d)
// current if there is one.
static void upb_decoder_advancebuf(upb_decoder *d)
{ {
if(d->buf_bytesleft <= 0) { assert(d->buf_bytesleft < UPB_MAX_ENCODED_SIZE);
if(d->buf) upb_bytesrc_recycle(d->bytesrc, d->buf);
d->buf = d->nextbuf; // Copy residual bytes to temporary buffer.
d->nextbuf = NULL; if(d->buf_bytesleft > 0) {
if(d->buf) d->buf_bytesleft += upb_string_len(d->buf); memcpy(d->tmpbuf, upb_string_getrobuf(d->buf) + d->buf_offset,
d->buf_bytesleft);
} }
}
static bool upb_decoder_pullnextbuf(upb_decoder *d) // Recycle old buffer, pull new one.
{ if(d->buf) {
if(!d->nextbuf && !upb_bytesrc_eof(d->bytesrc)) { upb_bytesrc_recycle(d->bytesrc, d->buf);
d->nextbuf = upb_bytesrc_get(d->bytesrc, UPB_MAX_ENCODED_SIZE); d->buf_offset -= upb_string_len(d->buf);
if(!d->nextbuf && !upb_bytesrc_eof(d->bytesrc)) { d->buf_stream_offset += upb_string_len(d->buf);
// There was an error in the byte stream, halt the decoder. }
d->buf = upb_bytesrc_get(d->bytesrc, UPB_MAX_ENCODED_SIZE);
// Handle cases arising from error or EOF.
if(d->buf) {
d->buf_bytesleft += upb_string_len(d->buf);
} else {
if(!upb_bytesrc_eof(d->bytesrc)) {
// Error from bytesrc.
upb_copyerr(&d->src.status, upb_bytesrc_status(d->bytesrc)); upb_copyerr(&d->src.status, upb_bytesrc_status(d->bytesrc));
return false; return false;
} else if(d->buf_bytesleft == 0) {
// EOF from bytesrc and we don't have any residual bytes left.
d->src.eof = true;
return false;
} }
} }
return true; return true;
@ -106,11 +118,7 @@ static bool upb_decoder_pullnextbuf(upb_decoder *d)
static bool upb_decoder_skipbytes(upb_decoder *d, int32_t bytes) static bool upb_decoder_skipbytes(upb_decoder *d, int32_t bytes)
{ {
d->buf_bytesleft -= bytes; // TODO.
while(d->buf_bytesleft <= 0 && !upb_bytesrc_eof(d->bytesrc)) {
if(!upb_decoder_pullnextbuf(d)) return false;
upb_decoder_advancebuf(d);
}
return true; return true;
} }
@ -124,22 +132,28 @@ static upb_strlen_t upb_decoder_append(uint8_t *buf, upb_string *frombuf,
static const uint8_t *upb_decoder_getbuf_full(upb_decoder *d, uint32_t *bytes) static const uint8_t *upb_decoder_getbuf_full(upb_decoder *d, uint32_t *bytes)
{ {
if(d->buf_bytesleft < UPB_MAX_ENCODED_SIZE) { if(d->buf_bytesleft < UPB_MAX_ENCODED_SIZE)
upb_decoder_pullnextbuf(d); if(!upb_decoder_nextbuf(d)) return NULL;
upb_decoder_advancebuf(d);
} assert(d->buf_bytesleft >= UPB_MAX_ENCODED_SIZE);
if(d->buf_bytesleft >= UPB_MAX_ENCODED_SIZE) {
if(d->buf_offset >= 0) {
// Common case: the main buffer contains at least UPB_MAX_ENCODED_SIZE
// contiguous bytes, so we can read directly out of it.
*bytes = d->buf_bytesleft; *bytes = d->buf_bytesleft;
return (uint8_t*)upb_string_getrobuf(d->buf) + upb_string_len(d->buf) - return (uint8_t*)upb_string_getrobuf(d->buf) + d->buf_offset;
d->buf_bytesleft;
} else { } else {
upb_strlen_t len = 0; upb_strlen_t residual_bytes = -d->buf_offset;
if(d->buf) if(d->buf) {
len += upb_decoder_append(d->tmpbuf, d->buf, len, UPB_MAX_ENCODED_SIZE); memcpy(d->tmpbuf + residual_bytes, upb_string_getrobuf(d->buf),
if(d->nextbuf) UPB_MAX_ENCODED_SIZE - residual_bytes);
len += upb_decoder_append(d->tmpbuf, d->nextbuf, len, UPB_MAX_ENCODED_SIZE); *bytes = 10;
*bytes = len; } else {
memset(d->tmpbuf + len, 0x80, UPB_MAX_ENCODED_SIZE - len); // All we have are residual bytes; pad them with 0x80.
memset(d->tmpbuf + residual_bytes, 0x80,
UPB_MAX_ENCODED_SIZE - residual_bytes);
*bytes = residual_bytes;
}
return d->tmpbuf; return d->tmpbuf;
} }
} }
@ -154,12 +168,11 @@ static const uint8_t *upb_decoder_getbuf_full(upb_decoder *d, uint32_t *bytes)
// indicate how many bytes were consumed. // indicate how many bytes were consumed.
static const uint8_t *upb_decoder_getbuf(upb_decoder *d, uint32_t *bytes) static const uint8_t *upb_decoder_getbuf(upb_decoder *d, uint32_t *bytes)
{ {
if(d->buf_bytesleft >= UPB_MAX_ENCODED_SIZE) { if(d->buf_bytesleft >= UPB_MAX_ENCODED_SIZE && d->buf_offset >= 0) {
// The common case; only when we get to the last ten bytes of the buffer // Common case: the main buffer contains at least UPB_MAX_ENCODED_SIZE
// do we have to do tricky things. // contiguous bytes, so we can read directly out of it.
*bytes = d->buf_bytesleft; *bytes = d->buf_bytesleft;
return (uint8_t*)upb_string_getrobuf(d->buf) + upb_string_len(d->buf) - return (uint8_t*)upb_string_getrobuf(d->buf) + d->buf_offset;
d->buf_bytesleft;
} else { } else {
return upb_decoder_getbuf_full(d, bytes); return upb_decoder_getbuf_full(d, bytes);
} }
@ -168,9 +181,13 @@ static const uint8_t *upb_decoder_getbuf(upb_decoder *d, uint32_t *bytes)
static bool upb_decoder_consume(upb_decoder *d, uint32_t bytes) static bool upb_decoder_consume(upb_decoder *d, uint32_t bytes)
{ {
assert(bytes <= UPB_MAX_ENCODED_SIZE); assert(bytes <= UPB_MAX_ENCODED_SIZE);
//if() d->buf_offset += bytes;
d->buf_bytesleft -= bytes; d->buf_bytesleft -= bytes;
//if(d->buf_bytesleft > upb_string_length()) if(d->buf_offset < 0) {
// We still have residual bytes we have not consumed.
memmove(d->tmpbuf, d->tmpbuf + bytes, -d->buf_offset);
}
return true;
} }
@ -208,8 +225,7 @@ INLINE bool upb_decoder_readv64(upb_decoder *d, uint32_t *low, uint32_t *high)
return false; return false;
done: done:
upb_decoder_consume(d, buf - start); return upb_decoder_consume(d, buf - start);
return true;
} }
// Gets a varint -- called when we only need 32 bits of it. Note that a 32-bit // Gets a varint -- called when we only need 32 bits of it. Note that a 32-bit
@ -291,7 +307,6 @@ static uint8_t upb_decoder_skipv64(upb_decoder *d)
/* upb_src implementation for upb_decoder. ************************************/ /* upb_src implementation for upb_decoder. ************************************/
bool upb_decoder_get_v_uint32(upb_decoder *d, uint32_t *key);
bool upb_decoder_skipval(upb_decoder *d); bool upb_decoder_skipval(upb_decoder *d);
upb_fielddef *upb_decoder_getdef(upb_decoder *d) upb_fielddef *upb_decoder_getdef(upb_decoder *d)
@ -365,20 +380,20 @@ bool upb_decoder_getval(upb_decoder *d, upb_valueptr val)
d->buf_bytesleft -= total_len; d->buf_bytesleft -= total_len;
*val.str = d->str; *val.str = d->str;
} else { } else {
// The string spans buffers, so we must copy from the current buffer, //// The string spans buffers, so we must copy from the current buffer,
// the next buffer (if we have one), and finally from the bytesrc. //// the next buffer (if we have one), and finally from the bytesrc.
uint8_t *str = (uint8_t*)upb_string_getrwbuf(d->str, total_len); //uint8_t *str = (uint8_t*)upb_string_getrwbuf(d->str, total_len);
upb_strlen_t len = 0; //upb_strlen_t len = 0;
len += upb_decoder_append(str, d->buf, len, total_len); //len += upb_decoder_append(str, d->buf, len, total_len);
upb_decoder_advancebuf(d); //upb_decoder_advancebuf(d);
if(d->buf) len += upb_decoder_append(str, d->buf, len, total_len); //if(d->buf) len += upb_decoder_append(str, d->buf, len, total_len);
upb_string_getrwbuf(d->str, len); // Cheap resize. //upb_string_getrwbuf(d->str, len); // Cheap resize.
if(len < total_len) { //if(len < total_len) {
if(!upb_bytesrc_append(d->bytesrc, d->str, total_len - len)) { // if(!upb_bytesrc_append(d->bytesrc, d->str, total_len - len)) {
upb_copyerr(&d->src.status, upb_bytesrc_status(d->bytesrc)); // upb_copyerr(&d->src.status, upb_bytesrc_status(d->bytesrc));
return false; // return false;
} // }
} //}
} }
d->field = NULL; d->field = NULL;
} else { } else {
@ -521,7 +536,6 @@ upb_decoder *upb_decoder_new(upb_msgdef *msgdef)
d->toplevel_msgdef = msgdef; d->toplevel_msgdef = msgdef;
d->limit = &d->stack[UPB_MAX_NESTING]; d->limit = &d->stack[UPB_MAX_NESTING];
d->buf = NULL; d->buf = NULL;
d->nextbuf = NULL;
d->str = upb_string_new(); d->str = upb_string_new();
upb_src_init(&d->src, &upb_decoder_src_vtbl); upb_src_init(&d->src, &upb_decoder_src_vtbl);
return d; return d;
@ -535,7 +549,6 @@ void upb_decoder_free(upb_decoder *d)
void upb_decoder_reset(upb_decoder *d, upb_bytesrc *bytesrc) void upb_decoder_reset(upb_decoder *d, upb_bytesrc *bytesrc)
{ {
if(d->buf) upb_bytesrc_recycle(d->bytesrc, d->buf); if(d->buf) upb_bytesrc_recycle(d->bytesrc, d->buf);
if(d->nextbuf) upb_bytesrc_recycle(d->bytesrc, d->nextbuf);
d->top = d->stack; d->top = d->stack;
d->top->msgdef = d->toplevel_msgdef; d->top->msgdef = d->toplevel_msgdef;
// The top-level message is not delimited (we can keep receiving data for it // The top-level message is not delimited (we can keep receiving data for it
@ -544,8 +557,7 @@ void upb_decoder_reset(upb_decoder *d, upb_bytesrc *bytesrc)
d->top->end_offset = UINT32_MAX - 1; d->top->end_offset = UINT32_MAX - 1;
d->bytesrc = bytesrc; d->bytesrc = bytesrc;
d->buf = NULL; d->buf = NULL;
d->nextbuf = NULL;
d->buf_bytesleft = 0; d->buf_bytesleft = 0;
d->buf_endoffset = 0; d->buf_stream_offset = 0;
d->buf_offset = 0;
} }

@ -26,6 +26,11 @@
extern "C" { extern "C" {
#endif #endif
// Note! The "eof" flags work like feof() in C; they cannot report end-of-file
// until a read has failed due to eof. They cannot preemptively tell you that
// the next call will fail due to eof. Since these are the semantics that C
// and UNIX provide, we're stuck with them if we want to support eg. stdio.
/* upb_src ********************************************************************/ /* upb_src ********************************************************************/
// TODO: decide how to handle unknown fields. // TODO: decide how to handle unknown fields.

Loading…
Cancel
Save