From 1c13fd068607361ab0118b2a6f90e919e510a26b Mon Sep 17 00:00:00 2001 From: Protobuf Team Bot Date: Tue, 5 Jul 2022 20:05:10 -0700 Subject: [PATCH] first stab at a ZeroCopyStream api. These functions are not yet part of the upb build but this is a good chunk of work so let's snapshot it now. PiperOrigin-RevId: 459156286 --- upb/io/BUILD | 45 +++++ upb/io/README.md | 4 + upb/io/chunked_input_stream.c | 110 ++++++++++++ upb/io/chunked_input_stream.h | 53 ++++++ upb/io/chunked_output_stream.c | 94 ++++++++++ upb/io/chunked_output_stream.h | 53 ++++++ upb/io/zero_copy_input_stream.h | 129 ++++++++++++++ upb/io/zero_copy_output_stream.h | 130 ++++++++++++++ upb/io/zero_copy_stream_test.cc | 290 +++++++++++++++++++++++++++++++ 9 files changed, 908 insertions(+) create mode 100644 upb/io/BUILD create mode 100644 upb/io/README.md create mode 100644 upb/io/chunked_input_stream.c create mode 100644 upb/io/chunked_input_stream.h create mode 100644 upb/io/chunked_output_stream.c create mode 100644 upb/io/chunked_output_stream.h create mode 100644 upb/io/zero_copy_input_stream.h create mode 100644 upb/io/zero_copy_output_stream.h create mode 100644 upb/io/zero_copy_stream_test.cc diff --git a/upb/io/BUILD b/upb/io/BUILD new file mode 100644 index 0000000000..bb65eef69c --- /dev/null +++ b/upb/io/BUILD @@ -0,0 +1,45 @@ +cc_library( + name = "zero_copy_stream", + hdrs = [ + "zero_copy_input_stream.h", + "zero_copy_output_stream.h", + ], + visibility = ["//upb/io:__pkg__"], + deps = [ + "//:upb", + "//:port", + ], +) + +cc_library( + name = "chunked_stream", + testonly = 1, + srcs = [ + "chunked_input_stream.c", + "chunked_output_stream.c", + ], + hdrs = [ + "chunked_input_stream.h", + "chunked_output_stream.h", + ], + visibility = ["//upb/io:__pkg__"], + deps = [ + ":zero_copy_stream", + "//:upb", + "//:port", + ], +) + +cc_test( + name = "zero_copy_stream_test", + size = "small", + srcs = [ + "zero_copy_stream_test.cc", + ], + deps = [ + ":chunked_stream", + ":zero_copy_stream", + "//:upb", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/upb/io/README.md b/upb/io/README.md new file mode 100644 index 0000000000..e0a70113bc --- /dev/null +++ b/upb/io/README.md @@ -0,0 +1,4 @@ +This subdir originated as a best-effort C approximation of the C++ code in +in third_party/protobuf/io/ but over time the two will invariably diverge. +Comments have generally been copied verbatim and may therefore refer to C++ +symbol names instead of C symbol names. diff --git a/upb/io/chunked_input_stream.c b/upb/io/chunked_input_stream.c new file mode 100644 index 0000000000..c02df70f4f --- /dev/null +++ b/upb/io/chunked_input_stream.c @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2009-2022, Google LLC + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Google LLC nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "upb/io/chunked_input_stream.h" + +// Must be last. +#include "upb/port_def.inc" + +typedef struct { + upb_ZeroCopyInputStream base; + + const char* data; + size_t size; + size_t limit; + size_t position; + size_t last_returned_size; +} upb_ChunkedInputStream; + +static const void* upb_ChunkedInputStream_Next(upb_ZeroCopyInputStream* z, + size_t* count, + upb_Status* status) { + upb_ChunkedInputStream* c = (upb_ChunkedInputStream*)z; + UPB_ASSERT(c->position <= c->size); + + const char* out = c->data + c->position; + + const size_t chunk = UPB_MIN(c->limit, c->size - c->position); + c->position += chunk; + c->last_returned_size = chunk; + *count = chunk; + + return chunk ? out : NULL; +} + +static void upb_ChunkedInputStream_BackUp(upb_ZeroCopyInputStream* z, + size_t count) { + upb_ChunkedInputStream* c = (upb_ChunkedInputStream*)z; + + UPB_ASSERT(c->last_returned_size >= count); + c->position -= count; + c->last_returned_size -= count; +} + +static bool upb_ChunkedInputStream_Skip(upb_ZeroCopyInputStream* z, + size_t count) { + upb_ChunkedInputStream* c = (upb_ChunkedInputStream*)z; + + c->last_returned_size = 0; // Don't let caller back up. + if (count > c->size - c->position) { + c->position = c->size; + return false; + } + + c->position += count; + return true; +} + +static size_t upb_ChunkedInputStream_ByteCount( + const upb_ZeroCopyInputStream* z) { + const upb_ChunkedInputStream* c = (const upb_ChunkedInputStream*)z; + + return c->position; +} + +static const _upb_ZeroCopyInputStream_VTable upb_ChunkedInputStream_vtable = { + upb_ChunkedInputStream_Next, + upb_ChunkedInputStream_BackUp, + upb_ChunkedInputStream_Skip, + upb_ChunkedInputStream_ByteCount, +}; + +upb_ZeroCopyInputStream* upb_ChunkedInputStream_New(const void* data, + size_t size, size_t limit, + upb_Arena* arena) { + upb_ChunkedInputStream* c = upb_Arena_Malloc(arena, sizeof(*c)); + if (!c || !limit) return NULL; + + c->base.vtable = &upb_ChunkedInputStream_vtable; + c->data = data; + c->size = size; + c->limit = limit; + c->position = 0; + c->last_returned_size = 0; + + return (upb_ZeroCopyInputStream*)c; +} diff --git a/upb/io/chunked_input_stream.h b/upb/io/chunked_input_stream.h new file mode 100644 index 0000000000..40ce3621ed --- /dev/null +++ b/upb/io/chunked_input_stream.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2009-2022, Google LLC + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Google LLC nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef UPB_IO_CHUNKED_INPUT_STREAM_H_ +#define UPB_IO_CHUNKED_INPUT_STREAM_H_ + +#include "upb/arena.h" +#include "upb/io/zero_copy_input_stream.h" + +// Must be last. +#include "upb/port_def.inc" + +#ifdef __cplusplus +extern "C" { +#endif + +// A ZeroCopyInputStream which wraps a flat buffer and limits the number of +// bytes that can be returned by a single call to Next(). +upb_ZeroCopyInputStream* upb_ChunkedInputStream_New(const void* data, + size_t size, size_t limit, + upb_Arena* arena); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#include "upb/port_undef.inc" + +#endif /* UPB_IO_CHUNKED_INPUT_STREAM_H_ */ diff --git a/upb/io/chunked_output_stream.c b/upb/io/chunked_output_stream.c new file mode 100644 index 0000000000..fb5d064649 --- /dev/null +++ b/upb/io/chunked_output_stream.c @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2009-2022, Google LLC + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Google LLC nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "upb/io/chunked_output_stream.h" + +// Must be last. +#include "upb/port_def.inc" + +typedef struct { + upb_ZeroCopyOutputStream base; + + char* data; + size_t size; + size_t limit; + size_t position; + size_t last_returned_size; +} upb_ChunkedOutputStream; + +static void* upb_ChunkedOutputStream_Next(upb_ZeroCopyOutputStream* z, + size_t* count, upb_Status* status) { + upb_ChunkedOutputStream* c = (upb_ChunkedOutputStream*)z; + UPB_ASSERT(c->position <= c->size); + + char* out = c->data + c->position; + + const size_t chunk = UPB_MIN(c->limit, c->size - c->position); + c->position += chunk; + c->last_returned_size = chunk; + *count = chunk; + + return chunk ? out : NULL; +} + +static void upb_ChunkedOutputStream_BackUp(upb_ZeroCopyOutputStream* z, + size_t count) { + upb_ChunkedOutputStream* c = (upb_ChunkedOutputStream*)z; + + UPB_ASSERT(c->last_returned_size >= count); + c->position -= count; + c->last_returned_size -= count; +} + +static size_t upb_ChunkedOutputStream_ByteCount( + const upb_ZeroCopyOutputStream* z) { + const upb_ChunkedOutputStream* c = (const upb_ChunkedOutputStream*)z; + + return c->position; +} + +static const _upb_ZeroCopyOutputStream_VTable upb_ChunkedOutputStream_vtable = { + upb_ChunkedOutputStream_Next, + upb_ChunkedOutputStream_BackUp, + upb_ChunkedOutputStream_ByteCount, +}; + +upb_ZeroCopyOutputStream* upb_ChunkedOutputStream_New(void* data, size_t size, + size_t limit, + upb_Arena* arena) { + upb_ChunkedOutputStream* c = upb_Arena_Malloc(arena, sizeof(*c)); + if (!c || !limit) return NULL; + + c->base.vtable = &upb_ChunkedOutputStream_vtable; + c->data = data; + c->size = size; + c->limit = limit; + c->position = 0; + c->last_returned_size = 0; + + return (upb_ZeroCopyOutputStream*)c; +} diff --git a/upb/io/chunked_output_stream.h b/upb/io/chunked_output_stream.h new file mode 100644 index 0000000000..1c0c47e8b8 --- /dev/null +++ b/upb/io/chunked_output_stream.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2009-2022, Google LLC + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Google LLC nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef UPB_IO_CHUNKED_OUTPUT_STREAM_H_ +#define UPB_IO_CHUNKED_OUTPUT_STREAM_H_ + +#include "upb/arena.h" +#include "upb/io/zero_copy_output_stream.h" + +// Must be last. +#include "upb/port_def.inc" + +#ifdef __cplusplus +extern "C" { +#endif + +// A ZeroCopyOutputStream which wraps a flat buffer and limits the number of +// bytes that can be returned by a single call to Next(). +upb_ZeroCopyOutputStream* upb_ChunkedOutputStream_New(void* data, size_t size, + size_t limit, + upb_Arena* arena); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#include "upb/port_undef.inc" + +#endif /* UPB_IO_CHUNKED_OUTPUT_STREAM_H_ */ diff --git a/upb/io/zero_copy_input_stream.h b/upb/io/zero_copy_input_stream.h new file mode 100644 index 0000000000..1abded08e8 --- /dev/null +++ b/upb/io/zero_copy_input_stream.h @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2009-2022, Google LLC + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Google LLC nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef UPB_IO_ZERO_COPY_INPUT_STREAM_H_ +#define UPB_IO_ZERO_COPY_INPUT_STREAM_H_ + +#include "upb/status.h" + +// Must be last. +#include "upb/port_def.inc" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct upb_ZeroCopyInputStream upb_ZeroCopyInputStream; + +typedef struct { + // Obtains a chunk of data from the stream. + // + // Preconditions: + // "count" and "status" are not NULL. + // + // Postconditions: + // All errors are permanent. If an error occurs then: + // - NULL will be returned to the caller. + // - *count will be set to zero. + // - *status will be set to the error. + // EOF is permanent. If EOF is reached then: + // - NULL will be returned to the caller. + // - *count will be set to zero. + // - *status will not be touched. + // Otherwise: + // - The returned value will point to a buffer containing the bytes read. + // - *count will be set to the number of bytes read. + // - *status will not be touched. + // + // Ownership of this buffer remains with the stream, and the buffer + // remains valid only until some other method of the stream is called + // or the stream is destroyed. + const void* (*Next)(struct upb_ZeroCopyInputStream* z, size_t* count, + upb_Status* status); + + // Backs up a number of bytes, so that the next call to Next() returns + // data again that was already returned by the last call to Next(). This + // is useful when writing procedures that are only supposed to read up + // to a certain point in the input, then return. If Next() returns a + // buffer that goes beyond what you wanted to read, you can use BackUp() + // to return to the point where you intended to finish. + // + // Preconditions: + // * The last method called must have been Next(). + // * count must be less than or equal to the size of the last buffer + // returned by Next(). + // + // Postconditions: + // * The last "count" bytes of the last buffer returned by Next() will be + // pushed back into the stream. Subsequent calls to Next() will return + // the same data again before producing new data. + void (*BackUp)(struct upb_ZeroCopyInputStream* z, size_t count); + + // Skips a number of bytes. Returns false if the end of the stream is + // reached or some input error occurred. In the end-of-stream case, the + // stream is advanced to the end of the stream (so ByteCount() will return + // the total size of the stream). + bool (*Skip)(struct upb_ZeroCopyInputStream* z, size_t count); + + // Returns the total number of bytes read since this object was created. + size_t (*ByteCount)(const struct upb_ZeroCopyInputStream* z); +} _upb_ZeroCopyInputStream_VTable; + +struct upb_ZeroCopyInputStream { + const _upb_ZeroCopyInputStream_VTable* vtable; +}; + +UPB_INLINE const void* upb_ZeroCopyInputStream_Next(upb_ZeroCopyInputStream* z, + size_t* count, + upb_Status* status) { + const void* out = z->vtable->Next(z, count, status); + UPB_ASSERT((out == NULL) == (*count == 0)); + return out; +} + +UPB_INLINE void upb_ZeroCopyInputStream_BackUp(upb_ZeroCopyInputStream* z, + size_t count) { + return z->vtable->BackUp(z, count); +} + +UPB_INLINE bool upb_ZeroCopyInputStream_Skip(upb_ZeroCopyInputStream* z, + size_t count) { + return z->vtable->Skip(z, count); +} + +UPB_INLINE size_t +upb_ZeroCopyInputStream_ByteCount(const upb_ZeroCopyInputStream* z) { + return z->vtable->ByteCount(z); +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#include "upb/port_undef.inc" + +#endif /* UPB_IO_ZERO_COPY_INPUT_STREAM_H_ */ diff --git a/upb/io/zero_copy_output_stream.h b/upb/io/zero_copy_output_stream.h new file mode 100644 index 0000000000..61cf6952b2 --- /dev/null +++ b/upb/io/zero_copy_output_stream.h @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2009-2022, Google LLC + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Google LLC nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef UPB_IO_ZERO_COPY_OUTPUT_STREAM_H_ +#define UPB_IO_ZERO_COPY_OUTPUT_STREAM_H_ + +#include "upb/status.h" + +// Must be last. +#include "upb/port_def.inc" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct upb_ZeroCopyOutputStream upb_ZeroCopyOutputStream; + +typedef struct { + // Obtains a buffer into which data can be written. Any data written + // into this buffer will eventually (maybe instantly, maybe later on) + // be written to the output. + // + // Preconditions: + // "count" and "status" are not NULL. + // + // Postconditions: + // All errors are permanent. If an error occurs then: + // - NULL will be returned to the caller. + // - *count will be set to zero. + // - *status will be set to the error. + // EOF is permanent. If EOF is reached then: + // - NULL will be returned to the caller. + // - *count will be set to zero. + // - *status will not be touched. + // Otherwise: + // - The returned value will point to a buffer containing the bytes read. + // - *count will be set to the number of bytes read. + // - *status will not be touched. + // + // Ownership of this buffer remains with the stream, and the buffer + // remains valid only until some other method of the stream is called + // or the stream is destroyed. + // + // Any data which the caller stores in this buffer will eventually be + // written to the output (unless BackUp() is called). + void* (*Next)(struct upb_ZeroCopyOutputStream* z, size_t* count, + upb_Status* status); + + // Backs up a number of bytes, so that the end of the last buffer returned + // by Next() is not actually written. This is needed when you finish + // writing all the data you want to write, but the last buffer was bigger + // than you needed. You don't want to write a bunch of garbage after the + // end of your data, so you use BackUp() to back up. + // + // Preconditions: + // * The last method called must have been Next(). + // * count must be less than or equal to the size of the last buffer + // returned by Next(). + // * The caller must not have written anything to the last "count" bytes + // of that buffer. + // + // Postconditions: + // * The last "count" bytes of the last buffer returned by Next() will be + // ignored. + // + // This method can be called with `count = 0` to finalize (flush) any + // previously returned buffer. For example, a file output stream can + // flush buffers returned from a previous call to Next() upon such + // BackUp(0) invocations. ZeroCopyOutputStream callers should always + // invoke BackUp() after a final Next() call, even if there is no + // excess buffer data to be backed up to indicate a flush point. + void (*BackUp)(struct upb_ZeroCopyOutputStream* z, size_t count); + + // Returns the total number of bytes written since this object was created. + size_t (*ByteCount)(const struct upb_ZeroCopyOutputStream* z); +} _upb_ZeroCopyOutputStream_VTable; + +struct upb_ZeroCopyOutputStream { + const _upb_ZeroCopyOutputStream_VTable* vtable; +}; + +UPB_INLINE void* upb_ZeroCopyOutputStream_Next(upb_ZeroCopyOutputStream* z, + size_t* count, + upb_Status* status) { + void* out = z->vtable->Next(z, count, status); + UPB_ASSERT((out == NULL) == (*count == 0)); + return out; +} + +UPB_INLINE void upb_ZeroCopyOutputStream_BackUp(upb_ZeroCopyOutputStream* z, + size_t count) { + return z->vtable->BackUp(z, count); +} + +UPB_INLINE size_t +upb_ZeroCopyOutputStream_ByteCount(const upb_ZeroCopyOutputStream* z) { + return z->vtable->ByteCount(z); +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#include "upb/port_undef.inc" + +#endif /* UPB_IO_ZERO_COPY_OUTPUT_STREAM_H_ */ diff --git a/upb/io/zero_copy_stream_test.cc b/upb/io/zero_copy_stream_test.cc new file mode 100644 index 0000000000..6cdb4f2eae --- /dev/null +++ b/upb/io/zero_copy_stream_test.cc @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2009-2022, Google LLC + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Google LLC nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL Google LLC BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// Testing strategy: For each type of I/O (array, string, file, etc.) we +// create an output stream and write some data to it, then create a +// corresponding input stream to read the same data back and expect it to +// match. When the data is written, it is written in several small chunks +// of varying sizes, with a BackUp() after each chunk. It is read back +// similarly, but with chunks separated at different points. The whole +// process is run with a variety of block sizes for both the input and +// the output. + +#include "gtest/gtest.h" +#include "upb/io/chunked_input_stream.h" +#include "upb/io/chunked_output_stream.h" +#include "upb/upb.hpp" + +namespace upb { +namespace { + +class IoTest : public testing::Test { + protected: + // Test helpers. + + // Helper to write an array of data to an output stream. + bool WriteToOutput(upb_ZeroCopyOutputStream* output, const void* data, + int size); + // Helper to read a fixed-length array of data from an input stream. + int ReadFromInput(upb_ZeroCopyInputStream* input, void* data, int size); + // Write a string to the output stream. + void WriteString(upb_ZeroCopyOutputStream* output, const std::string& str); + // Read a number of bytes equal to the size of the given string and checks + // that it matches the string. + void ReadString(upb_ZeroCopyInputStream* input, const std::string& str); + // Writes some text to the output stream in a particular order. Returns + // the number of bytes written, in case the caller needs that to set up an + // input stream. + int WriteStuff(upb_ZeroCopyOutputStream* output); + // Reads text from an input stream and expects it to match what + // WriteStuff() writes. + void ReadStuff(upb_ZeroCopyInputStream* input, bool read_eof = true); + + // Similar to WriteStuff, but performs more sophisticated testing. + int WriteStuffLarge(upb_ZeroCopyOutputStream* output); + // Reads and tests a stream that should have been written to + // via WriteStuffLarge(). + void ReadStuffLarge(upb_ZeroCopyInputStream* input); + + static const int kBlockSizes[]; + static const int kBlockSizeCount; +}; + +const int IoTest::kBlockSizes[] = {1, 2, 5, 7, 10, 23, 64}; +const int IoTest::kBlockSizeCount = sizeof(IoTest::kBlockSizes) / sizeof(int); + +bool IoTest::WriteToOutput(upb_ZeroCopyOutputStream* output, const void* data, + int size) { + const uint8_t* in = reinterpret_cast(data); + size_t in_size = size; + size_t out_size; + + while (true) { + upb::Status status; + void* out = upb_ZeroCopyOutputStream_Next(output, &out_size, status.ptr()); + if (out_size == 0) return false; + + if (in_size <= out_size) { + memcpy(out, in, in_size); + upb_ZeroCopyOutputStream_BackUp(output, out_size - in_size); + return true; + } + + memcpy(out, in, out_size); + in += out_size; + in_size -= out_size; + } +} + +int IoTest::ReadFromInput(upb_ZeroCopyInputStream* input, void* data, + int size) { + uint8_t* out = reinterpret_cast(data); + size_t out_size = size; + + const void* in; + size_t in_size = 0; + + while (true) { + upb::Status status; + in = upb_ZeroCopyInputStream_Next(input, &in_size, status.ptr()); + + if (in_size == 0) { + return size - out_size; + } + + if (out_size <= in_size) { + memcpy(out, in, out_size); + if (in_size > out_size) { + upb_ZeroCopyInputStream_BackUp(input, in_size - out_size); + } + return size; // Copied all of it. + } + + memcpy(out, in, in_size); + out += in_size; + out_size -= in_size; + } +} + +void IoTest::WriteString(upb_ZeroCopyOutputStream* output, + const std::string& str) { + EXPECT_TRUE(WriteToOutput(output, str.c_str(), str.size())); +} + +void IoTest::ReadString(upb_ZeroCopyInputStream* input, + const std::string& str) { + std::unique_ptr buffer(new char[str.size() + 1]); + buffer[str.size()] = '\0'; + EXPECT_EQ(ReadFromInput(input, buffer.get(), str.size()), str.size()); + EXPECT_STREQ(str.c_str(), buffer.get()); +} + +int IoTest::WriteStuff(upb_ZeroCopyOutputStream* output) { + WriteString(output, "Hello world!\n"); + WriteString(output, "Some te"); + WriteString(output, "xt. Blah blah."); + WriteString(output, "abcdefg"); + WriteString(output, "01234567890123456789"); + WriteString(output, "foobar"); + + const int result = upb_ZeroCopyOutputStream_ByteCount(output); + EXPECT_EQ(result, 68); + return result; +} + +// Reads text from an input stream and expects it to match what WriteStuff() +// writes. +void IoTest::ReadStuff(upb_ZeroCopyInputStream* input, bool read_eof) { + ReadString(input, "Hello world!\n"); + ReadString(input, "Some text. "); + ReadString(input, "Blah "); + ReadString(input, "blah."); + ReadString(input, "abcdefg"); + EXPECT_TRUE(upb_ZeroCopyInputStream_Skip(input, 20)); + ReadString(input, "foo"); + ReadString(input, "bar"); + + EXPECT_EQ(upb_ZeroCopyInputStream_ByteCount(input), 68); + + if (read_eof) { + uint8_t byte; + EXPECT_EQ(ReadFromInput(input, &byte, 1), 0); + } +} + +int IoTest::WriteStuffLarge(upb_ZeroCopyOutputStream* output) { + WriteString(output, "Hello world!\n"); + WriteString(output, "Some te"); + WriteString(output, "xt. Blah blah."); + WriteString(output, std::string(100000, 'x')); // A very long string + WriteString(output, std::string(100000, 'y')); // A very long string + WriteString(output, "01234567890123456789"); + + const int result = upb_ZeroCopyOutputStream_ByteCount(output); + EXPECT_EQ(result, 200055); + return result; +} + +// Reads text from an input stream and expects it to match what WriteStuff() +// writes. +void IoTest::ReadStuffLarge(upb_ZeroCopyInputStream* input) { + ReadString(input, "Hello world!\nSome text. "); + EXPECT_TRUE(upb_ZeroCopyInputStream_Skip(input, 5)); + ReadString(input, "blah."); + EXPECT_TRUE(upb_ZeroCopyInputStream_Skip(input, 100000 - 10)); + ReadString(input, std::string(10, 'x') + std::string(100000 - 20000, 'y')); + EXPECT_TRUE(upb_ZeroCopyInputStream_Skip(input, 20000 - 10)); + ReadString(input, "yyyyyyyyyy01234567890123456789"); + EXPECT_EQ(upb_ZeroCopyInputStream_ByteCount(input), 200055); + + uint8_t byte; + EXPECT_EQ(ReadFromInput(input, &byte, 1), 0); +} + +// =================================================================== + +TEST_F(IoTest, ArrayIo) { + const int kBufferSize = 256; + uint8_t buffer[kBufferSize]; + + upb::Arena arena; + for (int i = 0; i < kBlockSizeCount; i++) { + for (int j = 0; j < kBlockSizeCount; j++) { + auto output = upb_ChunkedOutputStream_New(buffer, kBufferSize, + kBlockSizes[j], arena.ptr()); + int size = WriteStuff(output); + auto input = + upb_ChunkedInputStream_New(buffer, size, kBlockSizes[j], arena.ptr()); + ReadStuff(input); + } + } +} + +TEST(ChunkedStream, SingleInput) { + const int kBufferSize = 256; + uint8_t buffer[kBufferSize]; + upb::Arena arena; + auto input = + upb_ChunkedInputStream_New(buffer, kBufferSize, kBufferSize, arena.ptr()); + const void* data; + size_t size; + + upb::Status status; + data = upb_ZeroCopyInputStream_Next(input, &size, status.ptr()); + EXPECT_EQ(size, kBufferSize); + + data = upb_ZeroCopyInputStream_Next(input, &size, status.ptr()); + EXPECT_EQ(data, nullptr); + EXPECT_EQ(size, 0); + EXPECT_TRUE(upb_Status_IsOk(status.ptr())); +} + +TEST(ChunkedStream, SingleOutput) { + const int kBufferSize = 256; + uint8_t buffer[kBufferSize]; + upb::Arena arena; + auto output = upb_ChunkedOutputStream_New(buffer, kBufferSize, kBufferSize, + arena.ptr()); + size_t size; + upb::Status status; + void* data = upb_ZeroCopyOutputStream_Next(output, &size, status.ptr()); + EXPECT_EQ(size, kBufferSize); + + data = upb_ZeroCopyOutputStream_Next(output, &size, status.ptr()); + EXPECT_EQ(data, nullptr); + EXPECT_EQ(size, 0); + EXPECT_TRUE(upb_Status_IsOk(status.ptr())); +} + +// Check that a zero-size input array doesn't confuse the code. +TEST(ChunkedStream, InputEOF) { + upb::Arena arena; + char buf; + auto input = upb_ChunkedInputStream_New(&buf, 0, 1, arena.ptr()); + size_t size; + upb::Status status; + const void* data = upb_ZeroCopyInputStream_Next(input, &size, status.ptr()); + EXPECT_EQ(data, nullptr); + EXPECT_EQ(size, 0); + EXPECT_TRUE(upb_Status_IsOk(status.ptr())); +} + +// Check that a zero-size output array doesn't confuse the code. +TEST(ChunkedStream, OutputEOF) { + upb::Arena arena; + char buf; + auto output = upb_ChunkedOutputStream_New(&buf, 0, 1, arena.ptr()); + size_t size; + upb::Status status; + void* data = upb_ZeroCopyOutputStream_Next(output, &size, status.ptr()); + EXPECT_EQ(data, nullptr); + EXPECT_EQ(size, 0); + EXPECT_TRUE(upb_Status_IsOk(status.ptr())); +} + +} // namespace +} // namespace upb