diff --git a/src/google/protobuf/io/zero_copy_stream_impl_lite.cc b/src/google/protobuf/io/zero_copy_stream_impl_lite.cc index 31f0c232b3..7195a4d383 100644 --- a/src/google/protobuf/io/zero_copy_stream_impl_lite.cc +++ b/src/google/protobuf/io/zero_copy_stream_impl_lite.cc @@ -489,6 +489,219 @@ bool LimitingInputStream::ReadCord(absl::Cord* cord, int count) { // =================================================================== +CordInputStream::CordInputStream(const absl::Cord* cord) + : it_(cord->char_begin()), + length_(cord->size()), + bytes_remaining_(length_) { + LoadChunkData(); +} + +bool CordInputStream::LoadChunkData() { + if (bytes_remaining_ != 0) { + absl::string_view sv = absl::Cord::ChunkRemaining(it_); + data_ = sv.data(); + size_ = available_ = sv.size(); + return true; + } + size_ = available_ = 0; + return false; +} + +bool CordInputStream::NextChunk(size_t skip) { + // `size_ == 0` indicates we're at EOF. + if (size_ == 0) return false; + + // The caller consumed 'size_ - available_' bytes that are not yet accounted + // for in the iterator position to get to the start of the next chunk. + const size_t distance = size_ - available_ + skip; + absl::Cord::Advance(&it_, distance); + bytes_remaining_ -= skip; + + return LoadChunkData(); +} + +bool CordInputStream::Next(const void** data, int* size) { + if (available_ > 0 || NextChunk(0)) { + *data = data_ + size_ - available_; + *size = available_; + bytes_remaining_ -= available_; + available_ = 0; + return true; + } + return false; +} + +void CordInputStream::BackUp(int count) { + // Backup is only allowed on last returned chunk from `Next()`. + GOOGLE_CHECK_LE(static_cast(count), size_ - available_); + + available_ += count; + bytes_remaining_ += count; +} + +bool CordInputStream::Skip(int count) { + // Short circuit if we stay inside the current chunk. + if (static_cast(count) <= available_) { + available_ -= count; + bytes_remaining_ -= count; + return true; + } + + // Sanity check the skip count. + if (static_cast(count) <= bytes_remaining_) { + // Skip to end: do not return EOF condition: skipping into EOF is ok. + NextChunk(count); + return true; + } + NextChunk(bytes_remaining_); + return false; +} + +int64_t CordInputStream::ByteCount() const { + return length_ - bytes_remaining_; +} + +bool CordInputStream::ReadCord(absl::Cord* cord, int count) { + // Advance the iterator to the current position + const size_t used = size_ - available_; + absl::Cord::Advance(&it_, used); + + // Read the cord, adjusting the iterator position. + // Make sure to cap at available bytes to avoid hard crashes. + const size_t n = std::min(static_cast(count), bytes_remaining_); + cord->Append(absl::Cord::AdvanceAndRead(&it_, n)); + + // Update current chunk data. + bytes_remaining_ -= n; + LoadChunkData(); + + return n == static_cast(count); +} + + +CordOutputStream::CordOutputStream(size_t size_hint) : size_hint_(size_hint) {} + +CordOutputStream::CordOutputStream(absl::Cord cord, size_t size_hint) + : cord_(std::move(cord)), + size_hint_(size_hint), + state_(cord_.empty() ? State::kEmpty : State::kSteal) {} + +CordOutputStream::CordOutputStream(absl::CordBuffer buffer, size_t size_hint) + : size_hint_(size_hint), + state_(buffer.length() < buffer.capacity() ? State::kPartial + : State::kFull), + buffer_(std::move(buffer)) {} + +CordOutputStream::CordOutputStream(absl::Cord cord, absl::CordBuffer buffer, + size_t size_hint) + : cord_(std::move(cord)), + size_hint_(size_hint), + state_(buffer.length() < buffer.capacity() ? State::kPartial + : State::kFull), + buffer_(std::move(buffer)) {} + +bool CordOutputStream::Next(void** data, int* size) { + // Use 128 bytes as a minimum buffer size if we don't have any application + // provided size hints. This number is picked somewhat arbitrary as 'small + // enough to avoid excessive waste on small data, and large enough to not + // waste CPU and memory on tiny buffer overhead'. + // It is worth noting that absent size hints, we pick 'current size' as + // the default buffer size (capped at max flat size), which means we quickly + // double the buffer size. This is in contrast to `Cord::Append()` functions + // accepting strings which use a conservative 10% growth. + static const size_t kMinBlockSize = 128; + + size_t desired_size, max_size; + const size_t cord_size = cord_.size() + buffer_.length(); + if (size_hint_ > cord_size) { + // Try to hit size_hint_ exactly so the caller doesn't receive a larger + // buffer than indicated, requiring a non-zero call to BackUp() to undo + // the buffer capacity we returned beyond the indicated size hint. + desired_size = size_hint_ - cord_size; + max_size = desired_size; + } else { + // We're past the size hint or don't have a size hint. Try to allocate a + // block as large as what we have so far, or at least kMinBlockSize bytes. + // CordBuffer will truncate this to an appropriate size if it is too large. + desired_size = std::max(cord_size, kMinBlockSize); + max_size = std::numeric_limits::max(); + } + + switch (state_) { + case State::kSteal: + // Steal last buffer from Cord if available. + assert(buffer_.length() == 0); + buffer_ = cord_.GetAppendBuffer(desired_size); + break; + case State::kPartial: + // Use existing capacity in 'buffer_` + assert(buffer_.length() < buffer_.capacity()); + break; + case State::kFull: + assert(buffer_.length() > 0); + cord_.Append(std::move(buffer_)); + PROTOBUF_FALLTHROUGH_INTENDED; + case State::kEmpty: + assert(buffer_.length() == 0); + buffer_ = absl::CordBuffer::CreateWithDefaultLimit(desired_size); + break; + } + + // Get all available capacity from the buffer. + absl::Span span = buffer_.available(); + assert(!span.empty()); + *data = span.data(); + + // Only hand out up to 'max_size', which is limited if there is a size hint + // specified, and we have more available than the size hint. + if (span.size() > max_size) { + *size = static_cast(max_size); + buffer_.IncreaseLengthBy(max_size); + state_ = State::kPartial; + } else { + *size = static_cast(span.size()); + buffer_.IncreaseLengthBy(span.size()); + state_ = State::kFull; + } + + return true; +} + +void CordOutputStream::BackUp(int count) { + // Check if something to do, else state remains unchanged. + assert(0 <= count && count <= ByteCount()); + if (count == 0) return; + + // Backup() is not supposed to backup beyond last Next() call + const int buffer_length = static_cast(buffer_.length()); + assert(count <= buffer_length); + if (count <= buffer_length) { + buffer_.SetLength(static_cast(buffer_length - count)); + state_ = State::kPartial; + } else { + buffer_ = {}; + cord_.RemoveSuffix(static_cast(count)); + state_ = State::kSteal; + } +} + +int64_t CordOutputStream::ByteCount() const { + return static_cast(cord_.size() + buffer_.length()); +} + +bool CordOutputStream::WriteCord(const absl::Cord& cord) { + cord_.Append(std::move(buffer_)); + cord_.Append(cord); + state_ = State::kSteal; // Attempt to utilize existing capacity in `cord' + return true; +} + +absl::Cord CordOutputStream::Consume() { + cord_.Append(std::move(buffer_)); + state_ = State::kEmpty; + return std::move(cord_); +} + } // namespace io } // namespace protobuf diff --git a/src/google/protobuf/io/zero_copy_stream_impl_lite.h b/src/google/protobuf/io/zero_copy_stream_impl_lite.h index 3c2aa89df5..3e23514579 100644 --- a/src/google/protobuf/io/zero_copy_stream_impl_lite.h +++ b/src/google/protobuf/io/zero_copy_stream_impl_lite.h @@ -388,6 +388,134 @@ class PROTOBUF_EXPORT LimitingInputStream PROTOBUF_FUTURE_FINAL int64_t prior_bytes_read_; // Bytes read on underlying stream at construction }; +// =================================================================== + +// A ZeroCopyInputStream backed by a Cord. This stream implements ReadCord() +// in a way that can share memory between the source and destination cords +// rather than copying. +class PROTOBUF_EXPORT CordInputStream final : public ZeroCopyInputStream { + public: + // Creates an InputStream that reads from the given Cord. `cord` must + // not be null and must outlive this CordInputStream instance. `cord` must + // not be modified while this instance is actively being used: any change + // to `cord` will lead to undefined behavior on any subsequent call into + // this instance. + explicit CordInputStream( + const absl::Cord* cord ABSL_ATTRIBUTE_LIFETIME_BOUND); + + + // `CordInputStream` is neither copiable nor assignable + CordInputStream(const CordInputStream&) = delete; + CordInputStream& operator=(const CordInputStream&) = delete; + + // implements ZeroCopyInputStream ---------------------------------- + bool Next(const void** data, int* size) override; + void BackUp(int count) override; + bool Skip(int count) override; + int64_t ByteCount() const override; + bool ReadCord(absl::Cord* cord, int count) override; + + + private: + // Moves `it_` to the next available chunk skipping `skip` extra bytes + // and updates the chunk data pointers. + bool NextChunk(size_t skip); + + // Updates the current chunk data context `data_`, `size_` and `available_`. + // If `bytes_remaining_` is zero, sets `size_` and `available_` to zero. + // Returns true if more data is available, false otherwise. + bool LoadChunkData(); + + absl::Cord::CharIterator it_; + size_t length_; + size_t bytes_remaining_; + const char* data_; + size_t size_; + size_t available_; +}; + +// =================================================================== + +// A ZeroCopyOutputStream that writes to a Cord. This stream implements +// WriteCord() in a way that can share memory between the source and +// destination cords rather than copying. +class PROTOBUF_EXPORT CordOutputStream final : public ZeroCopyOutputStream { + public: + // Creates an OutputStream streaming serialized data into a Cord. `size_hint`, + // if given, is the expected total size of the resulting Cord. This is a hint + // only, used for optimization. Callers can obtain the generated Cord value by + // invoking `Consume()`. + explicit CordOutputStream(size_t size_hint = 0); + + // Creates an OutputStream with an initial Cord value. This constructor can be + // used by applications wanting to directly append serialization data to a + // given cord. In such cases, donating the existing value as in: + // + // CordOutputStream stream(std::move(cord)); + // message.SerializeToZeroCopyStream(&stream); + // cord = std::move(stream.Consume()); + // + // is more efficient then appending the serialized cord in application code: + // + // CordOutputStream stream; + // message.SerializeToZeroCopyStream(&stream); + // cord.Append(stream.Consume()); + // + // The former allows `CordOutputStream` to utilize pre-existing privately + // owned Cord buffers from the donated cord where the latter does not, which + // may lead to more memory usage when serialuzing data into existing cords. + explicit CordOutputStream(absl::Cord cord, size_t size_hint = 0); + + // Creates an OutputStream with an initial Cord value and initial buffer. + // This donates both the preexisting cord in `cord`, as well as any + // pre-existing data and additional capacity in `buffer`. + // This function is mainly intended to be used in internal serialization logic + // using eager buffer initialization in EpsCopyOutputStream. + // The donated buffer can be empty, partially empty or full: the outputstream + // will DTRT in all cases and preserve any pre-existing data. + explicit CordOutputStream(absl::Cord cord, absl::CordBuffer buffer, + size_t size_hint = 0); + + // Creates an OutputStream with an initial buffer. + // This method is logically identical to, but more efficient than: + // `CordOutputStream(absl::Cord(), std::move(buffer), size_hint)` + explicit CordOutputStream(absl::CordBuffer buffer, size_t size_hint = 0); + + // `CordOutputStream` is neither copiable nor assignable + CordOutputStream(const CordOutputStream&) = delete; + CordOutputStream& operator=(const CordOutputStream&) = delete; + + // implements `ZeroCopyOutputStream` --------------------------------- + bool Next(void** data, int* size) final; + void BackUp(int count) final; + int64_t ByteCount() const final; + bool WriteCord(const absl::Cord& cord) final; + + // Consumes the serialized data as a cord value. `Consume()` internally + // flushes any pending state 'as if' BackUp(0) was called. While a final call + // to BackUp() is generally required by the `ZeroCopyOutputStream` contract, + // applications using `CordOutputStream` directly can call `Consume()` without + // a preceding call to `BackUp()`. + // + // While it will rarely be useful in practice (and especially in the presence + // of size hints) an instance is safe to be used after a call to `Consume()`. + // The only logical change in state is that all serialized data is extracted, + // and any new serialization calls will serialize into new cord data. + absl::Cord Consume(); + + private: + // State of `buffer_` and 'cord_. As a default CordBuffer instance always has + // inlined capacity, we track state explicitly to avoid returning 'existing + // capacity' from the default or 'moved from' CordBuffer. 'kSteal' indicates + // we should (attempt to) steal the next buffer from the cord. + enum class State { kEmpty, kFull, kPartial, kSteal }; + + absl::Cord cord_; + size_t size_hint_; + State state_ = State::kEmpty; + absl::CordBuffer buffer_; +}; + // =================================================================== diff --git a/src/google/protobuf/io/zero_copy_stream_unittest.cc b/src/google/protobuf/io/zero_copy_stream_unittest.cc index 6964b06646..4235260b92 100644 --- a/src/google/protobuf/io/zero_copy_stream_unittest.cc +++ b/src/google/protobuf/io/zero_copy_stream_unittest.cc @@ -68,6 +68,8 @@ #include "google/protobuf/testing/file.h" #include "absl/strings/cord.h" +#include "absl/strings/cord_buffer.h" +#include "absl/strings/string_view.h" #include "google/protobuf/io/coded_stream.h" #include "google/protobuf/io/io_win32.h" #include "google/protobuf/io/zero_copy_stream_impl.h" @@ -82,6 +84,11 @@ #include "google/protobuf/testing/file.h" #include "google/protobuf/testing/googletest.h" #include +#include "absl/status/status.h" +#include "absl/strings/cord.h" +#include "absl/strings/cord_buffer.h" +#include "absl/strings/string_view.h" + // Must be included last. #include "google/protobuf/port_def.inc" @@ -874,6 +881,568 @@ TEST(DefaultWriteCordTest, WriteTooLargeCord) { EXPECT_EQ(buffer, source.Subcord(0, output.ByteCount())); } +TEST(CordInputStreamTest, SkipToEnd) { + absl::Cord source(std::string(10000, 'z')); + CordInputStream stream(&source); + EXPECT_TRUE(stream.Skip(10000)); + EXPECT_EQ(stream.ByteCount(), 10000); +} + +TEST_F(IoTest, CordIo) { + CordOutputStream output; + int size = WriteStuff(&output); + absl::Cord cord = output.Consume(); + EXPECT_EQ(size, cord.size()); + + { + CordInputStream input(&cord); + ReadStuff(&input); + } +} + +template +absl::Cord MakeFragmentedCord(const Container& c) { + absl::Cord result; + for (const auto& s : c) { + absl::string_view sv(s); + auto buffer = absl::CordBuffer::CreateWithDefaultLimit(sv.size()); + absl::Span out = buffer.available_up_to(sv.size()); + memcpy(out.data(), sv.data(), out.size()); + buffer.SetLength(out.size()); + result.Append(std::move(buffer)); + } + return result; +} + +// Test that we can read correctly from a fragmented Cord. +TEST_F(IoTest, FragmentedCordInput) { + std::string str; + { + StringOutputStream output(&str); + WriteStuff(&output); + } + + for (int i = 0; i < kBlockSizeCount; i++) { + int block_size = kBlockSizes[i]; + if (block_size < 0) { + // Skip the -1 case. + continue; + } + absl::string_view str_piece = str; + + // Create a fragmented cord by splitting the input into many cord + // functions. + std::vector fragments; + while (!str_piece.empty()) { + size_t n = std::min(str_piece.size(), block_size); + fragments.push_back(str_piece.substr(0, n)); + str_piece.remove_prefix(n); + } + absl::Cord fragmented_cord = MakeFragmentedCord(fragments); + + CordInputStream input(&fragmented_cord); + ReadStuff(&input); + } +} + +TEST_F(IoTest, ReadSmallCord) { + absl::Cord source; + source.Append("foo bar"); + + absl::Cord dest; + CordInputStream input(&source); + EXPECT_TRUE(input.Skip(1)); + EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2)); + + EXPECT_EQ(absl::Cord("oo ba"), dest); +} + +TEST_F(IoTest, ReadSmallCordAfterBackUp) { + absl::Cord source; + source.Append("foo bar"); + + absl::Cord dest; + CordInputStream input(&source); + + const void* buffer; + int size; + EXPECT_TRUE(input.Next(&buffer, &size)); + input.BackUp(size - 1); + + EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2)); + + EXPECT_EQ(absl::Cord("oo ba"), dest); +} + +TEST_F(IoTest, ReadLargeCord) { + absl::Cord source; + for (int i = 0; i < 1024; i++) { + source.Append("foo bar"); + } + + absl::Cord dest; + CordInputStream input(&source); + EXPECT_TRUE(input.Skip(1)); + EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2)); + + absl::Cord expected = source; + expected.RemovePrefix(1); + expected.RemoveSuffix(1); + + EXPECT_EQ(expected, dest); +} + +TEST_F(IoTest, ReadLargeCordAfterBackUp) { + absl::Cord source; + for (int i = 0; i < 1024; i++) { + source.Append("foo bar"); + } + + absl::Cord dest; + CordInputStream input(&source); + + const void* buffer; + int size; + EXPECT_TRUE(input.Next(&buffer, &size)); + input.BackUp(size - 1); + + EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2)); + + absl::Cord expected = source; + expected.RemovePrefix(1); + expected.RemoveSuffix(1); + + EXPECT_EQ(expected, dest); + + EXPECT_TRUE(input.Next(&buffer, &size)); + EXPECT_EQ("r", std::string(reinterpret_cast(buffer), size)); +} + +TEST_F(IoTest, ReadCordEof) { + absl::Cord source; + source.Append("foo bar"); + + absl::Cord dest; + CordInputStream input(&source); + input.Skip(1); + EXPECT_FALSE(input.ReadCord(&dest, source.size())); + + absl::Cord expected = source; + expected.RemovePrefix(1); + EXPECT_EQ(expected, dest); +} + +TEST(CordOutputStreamTest, Empty) { + CordOutputStream output; + EXPECT_TRUE(output.Consume().empty()); +} + +TEST(CordOutputStreamTest, ConsumesCordClearingState) { + CordOutputStream output(absl::Cord("abcdef")); + EXPECT_EQ(output.Consume(), "abcdef"); + EXPECT_TRUE(output.Consume().empty()); +} + +TEST(CordOutputStreamTest, DonateEmptyCordBuffer) { + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500); + absl::Span available = buffer.available(); + void* available_data = available.data(); + CordOutputStream output(std::move(buffer)); + void* data; + int size; + EXPECT_TRUE(output.Next(&data, &size)); + EXPECT_EQ(data, available_data); + EXPECT_EQ(size, static_cast(available.size())); + memset(data, 'a', static_cast(size)); + + absl::Cord cord = output.Consume(); + ASSERT_TRUE(cord.TryFlat()); + absl::string_view flat = *cord.TryFlat(); + EXPECT_EQ(flat, std::string(static_cast(size), 'a')); + EXPECT_EQ(flat.data(), available_data); +} + +TEST(CordOutputStreamTest, DonatePartialCordBuffer) { + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500); + absl::Span available = buffer.available(); + memset(available.data(), 'a', 100); + buffer.IncreaseLengthBy(100); + void* available_data = available.data(); + CordOutputStream output(std::move(buffer)); + + absl::Cord cord = output.Consume(); + ASSERT_TRUE(cord.TryFlat()); + absl::string_view flat = *cord.TryFlat(); + EXPECT_EQ(flat, std::string(100, 'a')); + EXPECT_EQ(flat.data(), available_data); +} + +TEST(CordOutputStreamTest, DonatePartialCordBufferAndUseExtraCapacity) { + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500); + absl::Span available = buffer.available(); + memset(available.data(), 'a', 100); + buffer.IncreaseLengthBy(100); + void* available_data = available.data(); + void* next_available_data = available.data() + 100; + CordOutputStream output(std::move(buffer)); + void* data; + int size; + EXPECT_TRUE(output.Next(&data, &size)); + EXPECT_EQ(data, next_available_data); + EXPECT_EQ(size, static_cast(available.size() - 100)); + memset(data, 'b', static_cast(size)); + + absl::Cord cord = output.Consume(); + ASSERT_TRUE(cord.TryFlat()); + absl::string_view flat = *cord.TryFlat(); + EXPECT_EQ(flat, std::string(100, 'a') + + std::string(static_cast(size), 'b')); + EXPECT_EQ(flat.data(), available_data); +} + +TEST(CordOutputStreamTest, DonateCordAndPartialCordBufferAndUseExtraCapacity) { + absl::Cord cord(std::string(400, 'a')); + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500); + absl::Span available = buffer.available(); + memset(available.data(), 'b', 100); + buffer.IncreaseLengthBy(100); + void* next_available_data = available.data() + 100; + CordOutputStream output(std::move(cord), std::move(buffer)); + void* data; + int size; + EXPECT_TRUE(output.Next(&data, &size)); + EXPECT_EQ(data, next_available_data); + EXPECT_EQ(size, static_cast(available.size() - 100)); + memset(data, 'c', static_cast(size)); + + cord = output.Consume(); + EXPECT_FALSE(cord.TryFlat()); + EXPECT_EQ(cord, std::string(400, 'a') + std::string(100, 'b') + + std::string(static_cast(size), 'c')); +} + +TEST(CordOutputStreamTest, DonateFullCordBuffer) { + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500); + absl::Span available = buffer.available(); + memset(available.data(), 'a', available.size()); + buffer.IncreaseLengthBy(available.size()); + CordOutputStream output(std::move(buffer)); + void* data; + int size; + EXPECT_TRUE(output.Next(&data, &size)); + memset(data, 'b', static_cast(size)); + + absl::Cord cord = output.Consume(); + EXPECT_FALSE(cord.TryFlat()); + EXPECT_EQ(cord, std::string(available.size(), 'a') + + std::string(static_cast(size), 'b')); +} + +TEST(CordOutputStreamTest, DonateFullCordBufferAndCord) { + absl::Cord cord(std::string(400, 'a')); + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500); + absl::Span available = buffer.available(); + memset(available.data(), 'b', available.size()); + buffer.IncreaseLengthBy(available.size()); + CordOutputStream output(std::move(cord), std::move(buffer)); + void* data; + int size; + EXPECT_TRUE(output.Next(&data, &size)); + memset(data, 'c', static_cast(size)); + + cord = output.Consume(); + EXPECT_FALSE(cord.TryFlat()); + EXPECT_EQ(cord, std::string(400, 'a') + + std::string(available.size(), 'b') + + std::string(static_cast(size), 'c')); +} + +TEST(CordOutputStreamTest, DonateFullCordBufferAndBackup) { + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500); + absl::Span available = buffer.available(); + memset(available.data(), 'a', available.size()); + buffer.IncreaseLengthBy(available.size()); + + // We back up by 100 before calling Next() + void* available_data = available.data(); + void* next_available_data = available.data() + available.size() - 100; + CordOutputStream output(std::move(buffer)); + output.BackUp(100); + + void* data; + int size; + EXPECT_TRUE(output.Next(&data, &size)); + EXPECT_EQ(data, next_available_data); + EXPECT_EQ(size, 100); + memset(data, 'b', 100); + + absl::Cord cord = output.Consume(); + ASSERT_TRUE(cord.TryFlat()); + absl::string_view flat = *cord.TryFlat(); + EXPECT_EQ(flat, + std::string(available.size() - 100, 'a') + std::string(100, 'b')); + EXPECT_EQ(flat.data(), available_data); +} + +TEST(CordOutputStreamTest, DonateCordAndFullCordBufferAndBackup) { + absl::Cord cord(std::string(400, 'a')); + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500); + absl::Span available = buffer.available(); + memset(available.data(), 'b', available.size()); + buffer.IncreaseLengthBy(available.size()); + + // We back up by 100 before calling Next() + void* next_available_data = available.data() + available.size() - 100; + CordOutputStream output(std::move(cord), std::move(buffer)); + output.BackUp(100); + + void* data; + int size; + EXPECT_TRUE(output.Next(&data, &size)); + EXPECT_EQ(data, next_available_data); + EXPECT_EQ(size, 100); + memset(data, 'c', 100); + + cord = output.Consume(); + EXPECT_FALSE(cord.TryFlat()); + EXPECT_EQ(cord, std::string(400, 'a') + + std::string(available.size() - 100, 'b') + + std::string(100, 'c')); +} + +TEST(CordOutputStreamTest, ProperHintCreatesSingleFlatCord) { + CordOutputStream output(2000); + void* data; + int size; + ASSERT_TRUE(output.Next(&data, &size)); + ASSERT_EQ(size, 2000); + memset(data, 'a', 2000); + + absl::Cord cord = output.Consume(); + ASSERT_TRUE(cord.TryFlat()); + absl::string_view flat = *cord.TryFlat(); + EXPECT_EQ(flat, std::string(2000, 'a')); +} + +TEST(CordOutputStreamTest, SizeHintDicatesTotalSize) { + absl::Cord cord(std::string(500, 'a')); + CordOutputStream output(std::move(cord), 2000); + void* data; + int size; + + int remaining = 1500; + while (remaining > 0) { + ASSERT_TRUE(output.Next(&data, &size)); + ASSERT_LE(size, remaining); + memset(data, 'b', static_cast(size)); + remaining -= size; + } + ASSERT_EQ(remaining, 0); + + cord = output.Consume(); + EXPECT_EQ(cord, absl::StrCat(std::string(500, 'a'), std::string(1500, 'b'))); +} + +TEST(CordOutputStreamTest, BackUpReusesPartialBuffer) { + CordOutputStream output(2000); + void* data; + int size; + + ASSERT_TRUE(output.Next(&data, &size)); + ASSERT_EQ(size, 2000); + memset(data, '1', 100); + output.BackUp(1900); + + ASSERT_TRUE(output.Next(&data, &size)); + ASSERT_EQ(size, 1900); + memset(data, '2', 200); + output.BackUp(1700); + + ASSERT_TRUE(output.Next(&data, &size)); + ASSERT_EQ(size, 1700); + memset(data, '3', 400); + output.BackUp(1300); + + ASSERT_TRUE(output.Next(&data, &size)); + ASSERT_EQ(size, 1300); + memset(data, '4', 1300); + + absl::Cord cord = output.Consume(); + ASSERT_TRUE(cord.TryFlat()); + absl::string_view flat = *cord.TryFlat(); + EXPECT_EQ(flat, absl::StrCat(std::string(100, '1'), std::string(200, '2'), + std::string(400, '3'), std::string(1300, '4'))); +} + +TEST(CordOutputStreamTest, UsesPrivateCapacityInDonatedCord) { + absl::Cord cord; + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(2000); + memset(buffer.data(), 'a', 500); + buffer.SetLength(500); + cord.Append(std::move(buffer)); + + CordOutputStream output(std::move(cord), 2000); + void* data; + int size; + + ASSERT_TRUE(output.Next(&data, &size)); + ASSERT_EQ(size, 1500); + memset(data, 'b', 1500); + + cord = output.Consume(); + ASSERT_TRUE(cord.TryFlat()); + absl::string_view flat = *cord.TryFlat(); + EXPECT_EQ(flat, absl::StrCat(std::string(500, 'a'), std::string(1500, 'b'))); +} + +TEST(CordOutputStreamTest, UsesPrivateCapacityInAppendedCord) { + absl::Cord cord; + absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(2000); + memset(buffer.data(), 'a', 500); + buffer.SetLength(500); + cord.Append(std::move(buffer)); + + CordOutputStream output(2000); + void* data; + int size; + + // Add cord. Clearing it makes it privately owned by 'output' as it's non + // trivial size guarantees it is ref counted, not deep copied. + output.WriteCord(cord); + cord.Clear(); + + ASSERT_TRUE(output.Next(&data, &size)); + ASSERT_EQ(size, 1500); + memset(data, 'b', 1500); + + cord = output.Consume(); + ASSERT_TRUE(cord.TryFlat()); + absl::string_view flat = *cord.TryFlat(); + EXPECT_EQ(flat, absl::StrCat(std::string(500, 'a'), std::string(1500, 'b'))); +} + +TEST(CordOutputStreamTest, CapsSizeAtHintButUsesCapacityBeyondHint) { + // This tests verifies that when we provide a hint of 'x' bytes, that the + // returned size from Next() will be capped at 'size_hint', but that if we + // exceed size_hint, it will use the capacity in any internal buffer beyond + // the size hint. We test this by providing a hint that is too large to be + // inlined, but so small that we have a guarantee it's smaller than the + // minimum flat size so we will have a 'capped' larger buffer as state. + size_t size_hint = sizeof(absl::Cord) + 1; + CordOutputStream output(size_hint); + void* data; + int size; + + ASSERT_TRUE(output.Next(&data, &size)); + ASSERT_EQ(size, size_hint); + memset(data, 'a', static_cast(size)); + + ASSERT_TRUE(output.Next(&data, &size)); + memset(data, 'b', static_cast(size)); + + // We should have received the same buffer on each Next() call + absl::Cord cord = output.Consume(); + ASSERT_TRUE(cord.TryFlat()); + absl::string_view flat = *cord.TryFlat(); + EXPECT_EQ(flat, absl::StrCat(std::string(size_hint, 'a'), + std::string(static_cast(size), 'b'))); +} + +TEST(CordOutputStreamTest, SizeDoublesWithoutHint) { + CordOutputStream output; + void* data; + int size; + + // Whitebox: we are guaranteed at least 128 bytes initially. We also assume + // that the maximum size is roughly 4KiB - overhead without being precise. + int min_size = 128; + const int max_size = 4000; + ASSERT_TRUE(output.Next(&data, &size)); + memset(data, 0, static_cast(size)); + ASSERT_GE(size, min_size); + + for (int i = 0; i < 6; ++i) { + ASSERT_TRUE(output.Next(&data, &size)); + memset(data, 0, static_cast(size)); + ASSERT_GE(size, min_size); + min_size = (std::min)(min_size * 2, max_size); + } +} + +TEST_F(IoTest, WriteSmallCord) { + absl::Cord source; + source.Append("foo bar"); + + CordOutputStream output(absl::Cord("existing:")); + EXPECT_TRUE(output.WriteCord(source)); + absl::Cord cord = output.Consume(); + EXPECT_EQ(absl::Cord("existing:foo bar"), cord); +} + +TEST_F(IoTest, WriteLargeCord) { + absl::Cord source; + for (int i = 0; i < 1024; i++) { + source.Append("foo bar"); + } + + CordOutputStream output(absl::Cord("existing:")); + EXPECT_TRUE(output.WriteCord(source)); + absl::Cord cord = output.Consume(); + + absl::Cord expected = source; + expected.Prepend("existing:"); + EXPECT_EQ(expected, cord); +} + +// Test that large size hints lead to large block sizes. +TEST_F(IoTest, CordOutputSizeHint) { + CordOutputStream output1; + CordOutputStream output2(12345); + + void* data1; + void* data2; + int size1, size2; + ASSERT_TRUE(output1.Next(&data1, &size1)); + ASSERT_TRUE(output2.Next(&data2, &size2)); + + // Prevent 'unflushed output' debug checks and warnings + output1.BackUp(size1); + output2.BackUp(size2); + + EXPECT_GT(size2, size1); + + // Prevent any warnings on unused or unflushed data + output1.Consume(); + output2.Consume(); +} + +// Test that when we use a size hint, we get a buffer boundary exactly on that +// byte. +TEST_F(IoTest, CordOutputBufferEndsAtSizeHint) { + static const int kSizeHint = 12345; + + CordOutputStream output(kSizeHint); + + void* data; + int size; + int total_read = 0; + + while (total_read < kSizeHint) { + ASSERT_TRUE(output.Next(&data, &size)); + memset(data, 0, static_cast(size)); // Avoid uninitialized data UB + total_read += size; + } + + EXPECT_EQ(kSizeHint, total_read); + + // We should be able to keep going past the size hint. + ASSERT_TRUE(output.Next(&data, &size)); + EXPECT_GT(size, 0); + + // Prevent any warnings on unused or unflushed data + output.Consume(); +} + // To test files, we create a temporary file, write, read, truncate, repeat. TEST_F(IoTest, FileIo) { diff --git a/src/google/protobuf/lite_unittest.cc b/src/google/protobuf/lite_unittest.cc index d89a17d7f0..a9f02a9ab1 100644 --- a/src/google/protobuf/lite_unittest.cc +++ b/src/google/protobuf/lite_unittest.cc @@ -33,6 +33,7 @@ #include #include #include +#include #include "google/protobuf/stubs/logging.h" #include "google/protobuf/stubs/common.h" @@ -51,6 +52,9 @@ #include "google/protobuf/unittest_lite.pb.h" #include "google/protobuf/wire_format_lite.h" +// Must be included last +#include "google/protobuf/port_def.inc" + namespace google { namespace protobuf {