first stab at a ZeroCopyStream api.

These functions are not yet part of the upb build but this is a good chunk of
work so let's snapshot it now.

PiperOrigin-RevId: 459156286
pull/13171/head
Protobuf Team Bot 2 years ago committed by Copybara-Service
parent cce085b7e7
commit 1c13fd0686
  1. 45
      upb/io/BUILD
  2. 4
      upb/io/README.md
  3. 110
      upb/io/chunked_input_stream.c
  4. 53
      upb/io/chunked_input_stream.h
  5. 94
      upb/io/chunked_output_stream.c
  6. 53
      upb/io/chunked_output_stream.h
  7. 129
      upb/io/zero_copy_input_stream.h
  8. 130
      upb/io/zero_copy_output_stream.h
  9. 290
      upb/io/zero_copy_stream_test.cc

@ -0,0 +1,45 @@
cc_library(
name = "zero_copy_stream",
hdrs = [
"zero_copy_input_stream.h",
"zero_copy_output_stream.h",
],
visibility = ["//upb/io:__pkg__"],
deps = [
"//:upb",
"//:port",
],
)
cc_library(
name = "chunked_stream",
testonly = 1,
srcs = [
"chunked_input_stream.c",
"chunked_output_stream.c",
],
hdrs = [
"chunked_input_stream.h",
"chunked_output_stream.h",
],
visibility = ["//upb/io:__pkg__"],
deps = [
":zero_copy_stream",
"//:upb",
"//:port",
],
)
cc_test(
name = "zero_copy_stream_test",
size = "small",
srcs = [
"zero_copy_stream_test.cc",
],
deps = [
":chunked_stream",
":zero_copy_stream",
"//:upb",
"@com_google_googletest//:gtest_main",
],
)

@ -0,0 +1,4 @@
This subdir originated as a best-effort C approximation of the C++ code in
in third_party/protobuf/io/ but over time the two will invariably diverge.
Comments have generally been copied verbatim and may therefore refer to C++
symbol names instead of C symbol names.

@ -0,0 +1,110 @@
/*
* Copyright (c) 2009-2022, Google LLC
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Google LLC nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "upb/io/chunked_input_stream.h"
// Must be last.
#include "upb/port_def.inc"
typedef struct {
upb_ZeroCopyInputStream base;
const char* data;
size_t size;
size_t limit;
size_t position;
size_t last_returned_size;
} upb_ChunkedInputStream;
static const void* upb_ChunkedInputStream_Next(upb_ZeroCopyInputStream* z,
size_t* count,
upb_Status* status) {
upb_ChunkedInputStream* c = (upb_ChunkedInputStream*)z;
UPB_ASSERT(c->position <= c->size);
const char* out = c->data + c->position;
const size_t chunk = UPB_MIN(c->limit, c->size - c->position);
c->position += chunk;
c->last_returned_size = chunk;
*count = chunk;
return chunk ? out : NULL;
}
static void upb_ChunkedInputStream_BackUp(upb_ZeroCopyInputStream* z,
size_t count) {
upb_ChunkedInputStream* c = (upb_ChunkedInputStream*)z;
UPB_ASSERT(c->last_returned_size >= count);
c->position -= count;
c->last_returned_size -= count;
}
static bool upb_ChunkedInputStream_Skip(upb_ZeroCopyInputStream* z,
size_t count) {
upb_ChunkedInputStream* c = (upb_ChunkedInputStream*)z;
c->last_returned_size = 0; // Don't let caller back up.
if (count > c->size - c->position) {
c->position = c->size;
return false;
}
c->position += count;
return true;
}
static size_t upb_ChunkedInputStream_ByteCount(
const upb_ZeroCopyInputStream* z) {
const upb_ChunkedInputStream* c = (const upb_ChunkedInputStream*)z;
return c->position;
}
static const _upb_ZeroCopyInputStream_VTable upb_ChunkedInputStream_vtable = {
upb_ChunkedInputStream_Next,
upb_ChunkedInputStream_BackUp,
upb_ChunkedInputStream_Skip,
upb_ChunkedInputStream_ByteCount,
};
upb_ZeroCopyInputStream* upb_ChunkedInputStream_New(const void* data,
size_t size, size_t limit,
upb_Arena* arena) {
upb_ChunkedInputStream* c = upb_Arena_Malloc(arena, sizeof(*c));
if (!c || !limit) return NULL;
c->base.vtable = &upb_ChunkedInputStream_vtable;
c->data = data;
c->size = size;
c->limit = limit;
c->position = 0;
c->last_returned_size = 0;
return (upb_ZeroCopyInputStream*)c;
}

@ -0,0 +1,53 @@
/*
* Copyright (c) 2009-2022, Google LLC
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Google LLC nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef UPB_IO_CHUNKED_INPUT_STREAM_H_
#define UPB_IO_CHUNKED_INPUT_STREAM_H_
#include "upb/arena.h"
#include "upb/io/zero_copy_input_stream.h"
// Must be last.
#include "upb/port_def.inc"
#ifdef __cplusplus
extern "C" {
#endif
// A ZeroCopyInputStream which wraps a flat buffer and limits the number of
// bytes that can be returned by a single call to Next().
upb_ZeroCopyInputStream* upb_ChunkedInputStream_New(const void* data,
size_t size, size_t limit,
upb_Arena* arena);
#ifdef __cplusplus
} /* extern "C" */
#endif
#include "upb/port_undef.inc"
#endif /* UPB_IO_CHUNKED_INPUT_STREAM_H_ */

@ -0,0 +1,94 @@
/*
* Copyright (c) 2009-2022, Google LLC
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Google LLC nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "upb/io/chunked_output_stream.h"
// Must be last.
#include "upb/port_def.inc"
typedef struct {
upb_ZeroCopyOutputStream base;
char* data;
size_t size;
size_t limit;
size_t position;
size_t last_returned_size;
} upb_ChunkedOutputStream;
static void* upb_ChunkedOutputStream_Next(upb_ZeroCopyOutputStream* z,
size_t* count, upb_Status* status) {
upb_ChunkedOutputStream* c = (upb_ChunkedOutputStream*)z;
UPB_ASSERT(c->position <= c->size);
char* out = c->data + c->position;
const size_t chunk = UPB_MIN(c->limit, c->size - c->position);
c->position += chunk;
c->last_returned_size = chunk;
*count = chunk;
return chunk ? out : NULL;
}
static void upb_ChunkedOutputStream_BackUp(upb_ZeroCopyOutputStream* z,
size_t count) {
upb_ChunkedOutputStream* c = (upb_ChunkedOutputStream*)z;
UPB_ASSERT(c->last_returned_size >= count);
c->position -= count;
c->last_returned_size -= count;
}
static size_t upb_ChunkedOutputStream_ByteCount(
const upb_ZeroCopyOutputStream* z) {
const upb_ChunkedOutputStream* c = (const upb_ChunkedOutputStream*)z;
return c->position;
}
static const _upb_ZeroCopyOutputStream_VTable upb_ChunkedOutputStream_vtable = {
upb_ChunkedOutputStream_Next,
upb_ChunkedOutputStream_BackUp,
upb_ChunkedOutputStream_ByteCount,
};
upb_ZeroCopyOutputStream* upb_ChunkedOutputStream_New(void* data, size_t size,
size_t limit,
upb_Arena* arena) {
upb_ChunkedOutputStream* c = upb_Arena_Malloc(arena, sizeof(*c));
if (!c || !limit) return NULL;
c->base.vtable = &upb_ChunkedOutputStream_vtable;
c->data = data;
c->size = size;
c->limit = limit;
c->position = 0;
c->last_returned_size = 0;
return (upb_ZeroCopyOutputStream*)c;
}

@ -0,0 +1,53 @@
/*
* Copyright (c) 2009-2022, Google LLC
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Google LLC nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef UPB_IO_CHUNKED_OUTPUT_STREAM_H_
#define UPB_IO_CHUNKED_OUTPUT_STREAM_H_
#include "upb/arena.h"
#include "upb/io/zero_copy_output_stream.h"
// Must be last.
#include "upb/port_def.inc"
#ifdef __cplusplus
extern "C" {
#endif
// A ZeroCopyOutputStream which wraps a flat buffer and limits the number of
// bytes that can be returned by a single call to Next().
upb_ZeroCopyOutputStream* upb_ChunkedOutputStream_New(void* data, size_t size,
size_t limit,
upb_Arena* arena);
#ifdef __cplusplus
} /* extern "C" */
#endif
#include "upb/port_undef.inc"
#endif /* UPB_IO_CHUNKED_OUTPUT_STREAM_H_ */

@ -0,0 +1,129 @@
/*
* Copyright (c) 2009-2022, Google LLC
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Google LLC nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef UPB_IO_ZERO_COPY_INPUT_STREAM_H_
#define UPB_IO_ZERO_COPY_INPUT_STREAM_H_
#include "upb/status.h"
// Must be last.
#include "upb/port_def.inc"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct upb_ZeroCopyInputStream upb_ZeroCopyInputStream;
typedef struct {
// Obtains a chunk of data from the stream.
//
// Preconditions:
// "count" and "status" are not NULL.
//
// Postconditions:
// All errors are permanent. If an error occurs then:
// - NULL will be returned to the caller.
// - *count will be set to zero.
// - *status will be set to the error.
// EOF is permanent. If EOF is reached then:
// - NULL will be returned to the caller.
// - *count will be set to zero.
// - *status will not be touched.
// Otherwise:
// - The returned value will point to a buffer containing the bytes read.
// - *count will be set to the number of bytes read.
// - *status will not be touched.
//
// Ownership of this buffer remains with the stream, and the buffer
// remains valid only until some other method of the stream is called
// or the stream is destroyed.
const void* (*Next)(struct upb_ZeroCopyInputStream* z, size_t* count,
upb_Status* status);
// Backs up a number of bytes, so that the next call to Next() returns
// data again that was already returned by the last call to Next(). This
// is useful when writing procedures that are only supposed to read up
// to a certain point in the input, then return. If Next() returns a
// buffer that goes beyond what you wanted to read, you can use BackUp()
// to return to the point where you intended to finish.
//
// Preconditions:
// * The last method called must have been Next().
// * count must be less than or equal to the size of the last buffer
// returned by Next().
//
// Postconditions:
// * The last "count" bytes of the last buffer returned by Next() will be
// pushed back into the stream. Subsequent calls to Next() will return
// the same data again before producing new data.
void (*BackUp)(struct upb_ZeroCopyInputStream* z, size_t count);
// Skips a number of bytes. Returns false if the end of the stream is
// reached or some input error occurred. In the end-of-stream case, the
// stream is advanced to the end of the stream (so ByteCount() will return
// the total size of the stream).
bool (*Skip)(struct upb_ZeroCopyInputStream* z, size_t count);
// Returns the total number of bytes read since this object was created.
size_t (*ByteCount)(const struct upb_ZeroCopyInputStream* z);
} _upb_ZeroCopyInputStream_VTable;
struct upb_ZeroCopyInputStream {
const _upb_ZeroCopyInputStream_VTable* vtable;
};
UPB_INLINE const void* upb_ZeroCopyInputStream_Next(upb_ZeroCopyInputStream* z,
size_t* count,
upb_Status* status) {
const void* out = z->vtable->Next(z, count, status);
UPB_ASSERT((out == NULL) == (*count == 0));
return out;
}
UPB_INLINE void upb_ZeroCopyInputStream_BackUp(upb_ZeroCopyInputStream* z,
size_t count) {
return z->vtable->BackUp(z, count);
}
UPB_INLINE bool upb_ZeroCopyInputStream_Skip(upb_ZeroCopyInputStream* z,
size_t count) {
return z->vtable->Skip(z, count);
}
UPB_INLINE size_t
upb_ZeroCopyInputStream_ByteCount(const upb_ZeroCopyInputStream* z) {
return z->vtable->ByteCount(z);
}
#ifdef __cplusplus
} /* extern "C" */
#endif
#include "upb/port_undef.inc"
#endif /* UPB_IO_ZERO_COPY_INPUT_STREAM_H_ */

@ -0,0 +1,130 @@
/*
* Copyright (c) 2009-2022, Google LLC
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Google LLC nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef UPB_IO_ZERO_COPY_OUTPUT_STREAM_H_
#define UPB_IO_ZERO_COPY_OUTPUT_STREAM_H_
#include "upb/status.h"
// Must be last.
#include "upb/port_def.inc"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct upb_ZeroCopyOutputStream upb_ZeroCopyOutputStream;
typedef struct {
// Obtains a buffer into which data can be written. Any data written
// into this buffer will eventually (maybe instantly, maybe later on)
// be written to the output.
//
// Preconditions:
// "count" and "status" are not NULL.
//
// Postconditions:
// All errors are permanent. If an error occurs then:
// - NULL will be returned to the caller.
// - *count will be set to zero.
// - *status will be set to the error.
// EOF is permanent. If EOF is reached then:
// - NULL will be returned to the caller.
// - *count will be set to zero.
// - *status will not be touched.
// Otherwise:
// - The returned value will point to a buffer containing the bytes read.
// - *count will be set to the number of bytes read.
// - *status will not be touched.
//
// Ownership of this buffer remains with the stream, and the buffer
// remains valid only until some other method of the stream is called
// or the stream is destroyed.
//
// Any data which the caller stores in this buffer will eventually be
// written to the output (unless BackUp() is called).
void* (*Next)(struct upb_ZeroCopyOutputStream* z, size_t* count,
upb_Status* status);
// Backs up a number of bytes, so that the end of the last buffer returned
// by Next() is not actually written. This is needed when you finish
// writing all the data you want to write, but the last buffer was bigger
// than you needed. You don't want to write a bunch of garbage after the
// end of your data, so you use BackUp() to back up.
//
// Preconditions:
// * The last method called must have been Next().
// * count must be less than or equal to the size of the last buffer
// returned by Next().
// * The caller must not have written anything to the last "count" bytes
// of that buffer.
//
// Postconditions:
// * The last "count" bytes of the last buffer returned by Next() will be
// ignored.
//
// This method can be called with `count = 0` to finalize (flush) any
// previously returned buffer. For example, a file output stream can
// flush buffers returned from a previous call to Next() upon such
// BackUp(0) invocations. ZeroCopyOutputStream callers should always
// invoke BackUp() after a final Next() call, even if there is no
// excess buffer data to be backed up to indicate a flush point.
void (*BackUp)(struct upb_ZeroCopyOutputStream* z, size_t count);
// Returns the total number of bytes written since this object was created.
size_t (*ByteCount)(const struct upb_ZeroCopyOutputStream* z);
} _upb_ZeroCopyOutputStream_VTable;
struct upb_ZeroCopyOutputStream {
const _upb_ZeroCopyOutputStream_VTable* vtable;
};
UPB_INLINE void* upb_ZeroCopyOutputStream_Next(upb_ZeroCopyOutputStream* z,
size_t* count,
upb_Status* status) {
void* out = z->vtable->Next(z, count, status);
UPB_ASSERT((out == NULL) == (*count == 0));
return out;
}
UPB_INLINE void upb_ZeroCopyOutputStream_BackUp(upb_ZeroCopyOutputStream* z,
size_t count) {
return z->vtable->BackUp(z, count);
}
UPB_INLINE size_t
upb_ZeroCopyOutputStream_ByteCount(const upb_ZeroCopyOutputStream* z) {
return z->vtable->ByteCount(z);
}
#ifdef __cplusplus
} /* extern "C" */
#endif
#include "upb/port_undef.inc"
#endif /* UPB_IO_ZERO_COPY_OUTPUT_STREAM_H_ */

@ -0,0 +1,290 @@
/*
* Copyright (c) 2009-2022, Google LLC
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Google LLC nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
// Testing strategy: For each type of I/O (array, string, file, etc.) we
// create an output stream and write some data to it, then create a
// corresponding input stream to read the same data back and expect it to
// match. When the data is written, it is written in several small chunks
// of varying sizes, with a BackUp() after each chunk. It is read back
// similarly, but with chunks separated at different points. The whole
// process is run with a variety of block sizes for both the input and
// the output.
#include "gtest/gtest.h"
#include "upb/io/chunked_input_stream.h"
#include "upb/io/chunked_output_stream.h"
#include "upb/upb.hpp"
namespace upb {
namespace {
class IoTest : public testing::Test {
protected:
// Test helpers.
// Helper to write an array of data to an output stream.
bool WriteToOutput(upb_ZeroCopyOutputStream* output, const void* data,
int size);
// Helper to read a fixed-length array of data from an input stream.
int ReadFromInput(upb_ZeroCopyInputStream* input, void* data, int size);
// Write a string to the output stream.
void WriteString(upb_ZeroCopyOutputStream* output, const std::string& str);
// Read a number of bytes equal to the size of the given string and checks
// that it matches the string.
void ReadString(upb_ZeroCopyInputStream* input, const std::string& str);
// Writes some text to the output stream in a particular order. Returns
// the number of bytes written, in case the caller needs that to set up an
// input stream.
int WriteStuff(upb_ZeroCopyOutputStream* output);
// Reads text from an input stream and expects it to match what
// WriteStuff() writes.
void ReadStuff(upb_ZeroCopyInputStream* input, bool read_eof = true);
// Similar to WriteStuff, but performs more sophisticated testing.
int WriteStuffLarge(upb_ZeroCopyOutputStream* output);
// Reads and tests a stream that should have been written to
// via WriteStuffLarge().
void ReadStuffLarge(upb_ZeroCopyInputStream* input);
static const int kBlockSizes[];
static const int kBlockSizeCount;
};
const int IoTest::kBlockSizes[] = {1, 2, 5, 7, 10, 23, 64};
const int IoTest::kBlockSizeCount = sizeof(IoTest::kBlockSizes) / sizeof(int);
bool IoTest::WriteToOutput(upb_ZeroCopyOutputStream* output, const void* data,
int size) {
const uint8_t* in = reinterpret_cast<const uint8_t*>(data);
size_t in_size = size;
size_t out_size;
while (true) {
upb::Status status;
void* out = upb_ZeroCopyOutputStream_Next(output, &out_size, status.ptr());
if (out_size == 0) return false;
if (in_size <= out_size) {
memcpy(out, in, in_size);
upb_ZeroCopyOutputStream_BackUp(output, out_size - in_size);
return true;
}
memcpy(out, in, out_size);
in += out_size;
in_size -= out_size;
}
}
int IoTest::ReadFromInput(upb_ZeroCopyInputStream* input, void* data,
int size) {
uint8_t* out = reinterpret_cast<uint8_t*>(data);
size_t out_size = size;
const void* in;
size_t in_size = 0;
while (true) {
upb::Status status;
in = upb_ZeroCopyInputStream_Next(input, &in_size, status.ptr());
if (in_size == 0) {
return size - out_size;
}
if (out_size <= in_size) {
memcpy(out, in, out_size);
if (in_size > out_size) {
upb_ZeroCopyInputStream_BackUp(input, in_size - out_size);
}
return size; // Copied all of it.
}
memcpy(out, in, in_size);
out += in_size;
out_size -= in_size;
}
}
void IoTest::WriteString(upb_ZeroCopyOutputStream* output,
const std::string& str) {
EXPECT_TRUE(WriteToOutput(output, str.c_str(), str.size()));
}
void IoTest::ReadString(upb_ZeroCopyInputStream* input,
const std::string& str) {
std::unique_ptr<char[]> buffer(new char[str.size() + 1]);
buffer[str.size()] = '\0';
EXPECT_EQ(ReadFromInput(input, buffer.get(), str.size()), str.size());
EXPECT_STREQ(str.c_str(), buffer.get());
}
int IoTest::WriteStuff(upb_ZeroCopyOutputStream* output) {
WriteString(output, "Hello world!\n");
WriteString(output, "Some te");
WriteString(output, "xt. Blah blah.");
WriteString(output, "abcdefg");
WriteString(output, "01234567890123456789");
WriteString(output, "foobar");
const int result = upb_ZeroCopyOutputStream_ByteCount(output);
EXPECT_EQ(result, 68);
return result;
}
// Reads text from an input stream and expects it to match what WriteStuff()
// writes.
void IoTest::ReadStuff(upb_ZeroCopyInputStream* input, bool read_eof) {
ReadString(input, "Hello world!\n");
ReadString(input, "Some text. ");
ReadString(input, "Blah ");
ReadString(input, "blah.");
ReadString(input, "abcdefg");
EXPECT_TRUE(upb_ZeroCopyInputStream_Skip(input, 20));
ReadString(input, "foo");
ReadString(input, "bar");
EXPECT_EQ(upb_ZeroCopyInputStream_ByteCount(input), 68);
if (read_eof) {
uint8_t byte;
EXPECT_EQ(ReadFromInput(input, &byte, 1), 0);
}
}
int IoTest::WriteStuffLarge(upb_ZeroCopyOutputStream* output) {
WriteString(output, "Hello world!\n");
WriteString(output, "Some te");
WriteString(output, "xt. Blah blah.");
WriteString(output, std::string(100000, 'x')); // A very long string
WriteString(output, std::string(100000, 'y')); // A very long string
WriteString(output, "01234567890123456789");
const int result = upb_ZeroCopyOutputStream_ByteCount(output);
EXPECT_EQ(result, 200055);
return result;
}
// Reads text from an input stream and expects it to match what WriteStuff()
// writes.
void IoTest::ReadStuffLarge(upb_ZeroCopyInputStream* input) {
ReadString(input, "Hello world!\nSome text. ");
EXPECT_TRUE(upb_ZeroCopyInputStream_Skip(input, 5));
ReadString(input, "blah.");
EXPECT_TRUE(upb_ZeroCopyInputStream_Skip(input, 100000 - 10));
ReadString(input, std::string(10, 'x') + std::string(100000 - 20000, 'y'));
EXPECT_TRUE(upb_ZeroCopyInputStream_Skip(input, 20000 - 10));
ReadString(input, "yyyyyyyyyy01234567890123456789");
EXPECT_EQ(upb_ZeroCopyInputStream_ByteCount(input), 200055);
uint8_t byte;
EXPECT_EQ(ReadFromInput(input, &byte, 1), 0);
}
// ===================================================================
TEST_F(IoTest, ArrayIo) {
const int kBufferSize = 256;
uint8_t buffer[kBufferSize];
upb::Arena arena;
for (int i = 0; i < kBlockSizeCount; i++) {
for (int j = 0; j < kBlockSizeCount; j++) {
auto output = upb_ChunkedOutputStream_New(buffer, kBufferSize,
kBlockSizes[j], arena.ptr());
int size = WriteStuff(output);
auto input =
upb_ChunkedInputStream_New(buffer, size, kBlockSizes[j], arena.ptr());
ReadStuff(input);
}
}
}
TEST(ChunkedStream, SingleInput) {
const int kBufferSize = 256;
uint8_t buffer[kBufferSize];
upb::Arena arena;
auto input =
upb_ChunkedInputStream_New(buffer, kBufferSize, kBufferSize, arena.ptr());
const void* data;
size_t size;
upb::Status status;
data = upb_ZeroCopyInputStream_Next(input, &size, status.ptr());
EXPECT_EQ(size, kBufferSize);
data = upb_ZeroCopyInputStream_Next(input, &size, status.ptr());
EXPECT_EQ(data, nullptr);
EXPECT_EQ(size, 0);
EXPECT_TRUE(upb_Status_IsOk(status.ptr()));
}
TEST(ChunkedStream, SingleOutput) {
const int kBufferSize = 256;
uint8_t buffer[kBufferSize];
upb::Arena arena;
auto output = upb_ChunkedOutputStream_New(buffer, kBufferSize, kBufferSize,
arena.ptr());
size_t size;
upb::Status status;
void* data = upb_ZeroCopyOutputStream_Next(output, &size, status.ptr());
EXPECT_EQ(size, kBufferSize);
data = upb_ZeroCopyOutputStream_Next(output, &size, status.ptr());
EXPECT_EQ(data, nullptr);
EXPECT_EQ(size, 0);
EXPECT_TRUE(upb_Status_IsOk(status.ptr()));
}
// Check that a zero-size input array doesn't confuse the code.
TEST(ChunkedStream, InputEOF) {
upb::Arena arena;
char buf;
auto input = upb_ChunkedInputStream_New(&buf, 0, 1, arena.ptr());
size_t size;
upb::Status status;
const void* data = upb_ZeroCopyInputStream_Next(input, &size, status.ptr());
EXPECT_EQ(data, nullptr);
EXPECT_EQ(size, 0);
EXPECT_TRUE(upb_Status_IsOk(status.ptr()));
}
// Check that a zero-size output array doesn't confuse the code.
TEST(ChunkedStream, OutputEOF) {
upb::Arena arena;
char buf;
auto output = upb_ChunkedOutputStream_New(&buf, 0, 1, arena.ptr());
size_t size;
upb::Status status;
void* data = upb_ZeroCopyOutputStream_Next(output, &size, status.ptr());
EXPECT_EQ(data, nullptr);
EXPECT_EQ(size, 0);
EXPECT_TRUE(upb_Status_IsOk(status.ptr()));
}
} // namespace
} // namespace upb
Loading…
Cancel
Save