Add CordInputStream and CordOutputStream providing stream support directly from/to Cord data

PiperOrigin-RevId: 493150162
pull/11160/head
Martijn Vels 2 years ago committed by Copybara-Service
parent 474152de0f
commit 8afd1b670a
  1. 213
      src/google/protobuf/io/zero_copy_stream_impl_lite.cc
  2. 128
      src/google/protobuf/io/zero_copy_stream_impl_lite.h
  3. 569
      src/google/protobuf/io/zero_copy_stream_unittest.cc
  4. 4
      src/google/protobuf/lite_unittest.cc

@ -489,6 +489,219 @@ bool LimitingInputStream::ReadCord(absl::Cord* cord, int count) {
// ===================================================================
CordInputStream::CordInputStream(const absl::Cord* cord)
: it_(cord->char_begin()),
length_(cord->size()),
bytes_remaining_(length_) {
LoadChunkData();
}
bool CordInputStream::LoadChunkData() {
if (bytes_remaining_ != 0) {
absl::string_view sv = absl::Cord::ChunkRemaining(it_);
data_ = sv.data();
size_ = available_ = sv.size();
return true;
}
size_ = available_ = 0;
return false;
}
bool CordInputStream::NextChunk(size_t skip) {
// `size_ == 0` indicates we're at EOF.
if (size_ == 0) return false;
// The caller consumed 'size_ - available_' bytes that are not yet accounted
// for in the iterator position to get to the start of the next chunk.
const size_t distance = size_ - available_ + skip;
absl::Cord::Advance(&it_, distance);
bytes_remaining_ -= skip;
return LoadChunkData();
}
bool CordInputStream::Next(const void** data, int* size) {
if (available_ > 0 || NextChunk(0)) {
*data = data_ + size_ - available_;
*size = available_;
bytes_remaining_ -= available_;
available_ = 0;
return true;
}
return false;
}
void CordInputStream::BackUp(int count) {
// Backup is only allowed on last returned chunk from `Next()`.
GOOGLE_CHECK_LE(static_cast<size_t>(count), size_ - available_);
available_ += count;
bytes_remaining_ += count;
}
bool CordInputStream::Skip(int count) {
// Short circuit if we stay inside the current chunk.
if (static_cast<size_t>(count) <= available_) {
available_ -= count;
bytes_remaining_ -= count;
return true;
}
// Sanity check the skip count.
if (static_cast<size_t>(count) <= bytes_remaining_) {
// Skip to end: do not return EOF condition: skipping into EOF is ok.
NextChunk(count);
return true;
}
NextChunk(bytes_remaining_);
return false;
}
int64_t CordInputStream::ByteCount() const {
return length_ - bytes_remaining_;
}
bool CordInputStream::ReadCord(absl::Cord* cord, int count) {
// Advance the iterator to the current position
const size_t used = size_ - available_;
absl::Cord::Advance(&it_, used);
// Read the cord, adjusting the iterator position.
// Make sure to cap at available bytes to avoid hard crashes.
const size_t n = std::min(static_cast<size_t>(count), bytes_remaining_);
cord->Append(absl::Cord::AdvanceAndRead(&it_, n));
// Update current chunk data.
bytes_remaining_ -= n;
LoadChunkData();
return n == static_cast<size_t>(count);
}
CordOutputStream::CordOutputStream(size_t size_hint) : size_hint_(size_hint) {}
CordOutputStream::CordOutputStream(absl::Cord cord, size_t size_hint)
: cord_(std::move(cord)),
size_hint_(size_hint),
state_(cord_.empty() ? State::kEmpty : State::kSteal) {}
CordOutputStream::CordOutputStream(absl::CordBuffer buffer, size_t size_hint)
: size_hint_(size_hint),
state_(buffer.length() < buffer.capacity() ? State::kPartial
: State::kFull),
buffer_(std::move(buffer)) {}
CordOutputStream::CordOutputStream(absl::Cord cord, absl::CordBuffer buffer,
size_t size_hint)
: cord_(std::move(cord)),
size_hint_(size_hint),
state_(buffer.length() < buffer.capacity() ? State::kPartial
: State::kFull),
buffer_(std::move(buffer)) {}
bool CordOutputStream::Next(void** data, int* size) {
// Use 128 bytes as a minimum buffer size if we don't have any application
// provided size hints. This number is picked somewhat arbitrary as 'small
// enough to avoid excessive waste on small data, and large enough to not
// waste CPU and memory on tiny buffer overhead'.
// It is worth noting that absent size hints, we pick 'current size' as
// the default buffer size (capped at max flat size), which means we quickly
// double the buffer size. This is in contrast to `Cord::Append()` functions
// accepting strings which use a conservative 10% growth.
static const size_t kMinBlockSize = 128;
size_t desired_size, max_size;
const size_t cord_size = cord_.size() + buffer_.length();
if (size_hint_ > cord_size) {
// Try to hit size_hint_ exactly so the caller doesn't receive a larger
// buffer than indicated, requiring a non-zero call to BackUp() to undo
// the buffer capacity we returned beyond the indicated size hint.
desired_size = size_hint_ - cord_size;
max_size = desired_size;
} else {
// We're past the size hint or don't have a size hint. Try to allocate a
// block as large as what we have so far, or at least kMinBlockSize bytes.
// CordBuffer will truncate this to an appropriate size if it is too large.
desired_size = std::max(cord_size, kMinBlockSize);
max_size = std::numeric_limits<size_t>::max();
}
switch (state_) {
case State::kSteal:
// Steal last buffer from Cord if available.
assert(buffer_.length() == 0);
buffer_ = cord_.GetAppendBuffer(desired_size);
break;
case State::kPartial:
// Use existing capacity in 'buffer_`
assert(buffer_.length() < buffer_.capacity());
break;
case State::kFull:
assert(buffer_.length() > 0);
cord_.Append(std::move(buffer_));
PROTOBUF_FALLTHROUGH_INTENDED;
case State::kEmpty:
assert(buffer_.length() == 0);
buffer_ = absl::CordBuffer::CreateWithDefaultLimit(desired_size);
break;
}
// Get all available capacity from the buffer.
absl::Span<char> span = buffer_.available();
assert(!span.empty());
*data = span.data();
// Only hand out up to 'max_size', which is limited if there is a size hint
// specified, and we have more available than the size hint.
if (span.size() > max_size) {
*size = static_cast<int>(max_size);
buffer_.IncreaseLengthBy(max_size);
state_ = State::kPartial;
} else {
*size = static_cast<int>(span.size());
buffer_.IncreaseLengthBy(span.size());
state_ = State::kFull;
}
return true;
}
void CordOutputStream::BackUp(int count) {
// Check if something to do, else state remains unchanged.
assert(0 <= count && count <= ByteCount());
if (count == 0) return;
// Backup() is not supposed to backup beyond last Next() call
const int buffer_length = static_cast<int>(buffer_.length());
assert(count <= buffer_length);
if (count <= buffer_length) {
buffer_.SetLength(static_cast<size_t>(buffer_length - count));
state_ = State::kPartial;
} else {
buffer_ = {};
cord_.RemoveSuffix(static_cast<size_t>(count));
state_ = State::kSteal;
}
}
int64_t CordOutputStream::ByteCount() const {
return static_cast<int64_t>(cord_.size() + buffer_.length());
}
bool CordOutputStream::WriteCord(const absl::Cord& cord) {
cord_.Append(std::move(buffer_));
cord_.Append(cord);
state_ = State::kSteal; // Attempt to utilize existing capacity in `cord'
return true;
}
absl::Cord CordOutputStream::Consume() {
cord_.Append(std::move(buffer_));
state_ = State::kEmpty;
return std::move(cord_);
}
} // namespace io
} // namespace protobuf

@ -388,6 +388,134 @@ class PROTOBUF_EXPORT LimitingInputStream PROTOBUF_FUTURE_FINAL
int64_t prior_bytes_read_; // Bytes read on underlying stream at construction
};
// ===================================================================
// A ZeroCopyInputStream backed by a Cord. This stream implements ReadCord()
// in a way that can share memory between the source and destination cords
// rather than copying.
class PROTOBUF_EXPORT CordInputStream final : public ZeroCopyInputStream {
public:
// Creates an InputStream that reads from the given Cord. `cord` must
// not be null and must outlive this CordInputStream instance. `cord` must
// not be modified while this instance is actively being used: any change
// to `cord` will lead to undefined behavior on any subsequent call into
// this instance.
explicit CordInputStream(
const absl::Cord* cord ABSL_ATTRIBUTE_LIFETIME_BOUND);
// `CordInputStream` is neither copiable nor assignable
CordInputStream(const CordInputStream&) = delete;
CordInputStream& operator=(const CordInputStream&) = delete;
// implements ZeroCopyInputStream ----------------------------------
bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool Skip(int count) override;
int64_t ByteCount() const override;
bool ReadCord(absl::Cord* cord, int count) override;
private:
// Moves `it_` to the next available chunk skipping `skip` extra bytes
// and updates the chunk data pointers.
bool NextChunk(size_t skip);
// Updates the current chunk data context `data_`, `size_` and `available_`.
// If `bytes_remaining_` is zero, sets `size_` and `available_` to zero.
// Returns true if more data is available, false otherwise.
bool LoadChunkData();
absl::Cord::CharIterator it_;
size_t length_;
size_t bytes_remaining_;
const char* data_;
size_t size_;
size_t available_;
};
// ===================================================================
// A ZeroCopyOutputStream that writes to a Cord. This stream implements
// WriteCord() in a way that can share memory between the source and
// destination cords rather than copying.
class PROTOBUF_EXPORT CordOutputStream final : public ZeroCopyOutputStream {
public:
// Creates an OutputStream streaming serialized data into a Cord. `size_hint`,
// if given, is the expected total size of the resulting Cord. This is a hint
// only, used for optimization. Callers can obtain the generated Cord value by
// invoking `Consume()`.
explicit CordOutputStream(size_t size_hint = 0);
// Creates an OutputStream with an initial Cord value. This constructor can be
// used by applications wanting to directly append serialization data to a
// given cord. In such cases, donating the existing value as in:
//
// CordOutputStream stream(std::move(cord));
// message.SerializeToZeroCopyStream(&stream);
// cord = std::move(stream.Consume());
//
// is more efficient then appending the serialized cord in application code:
//
// CordOutputStream stream;
// message.SerializeToZeroCopyStream(&stream);
// cord.Append(stream.Consume());
//
// The former allows `CordOutputStream` to utilize pre-existing privately
// owned Cord buffers from the donated cord where the latter does not, which
// may lead to more memory usage when serialuzing data into existing cords.
explicit CordOutputStream(absl::Cord cord, size_t size_hint = 0);
// Creates an OutputStream with an initial Cord value and initial buffer.
// This donates both the preexisting cord in `cord`, as well as any
// pre-existing data and additional capacity in `buffer`.
// This function is mainly intended to be used in internal serialization logic
// using eager buffer initialization in EpsCopyOutputStream.
// The donated buffer can be empty, partially empty or full: the outputstream
// will DTRT in all cases and preserve any pre-existing data.
explicit CordOutputStream(absl::Cord cord, absl::CordBuffer buffer,
size_t size_hint = 0);
// Creates an OutputStream with an initial buffer.
// This method is logically identical to, but more efficient than:
// `CordOutputStream(absl::Cord(), std::move(buffer), size_hint)`
explicit CordOutputStream(absl::CordBuffer buffer, size_t size_hint = 0);
// `CordOutputStream` is neither copiable nor assignable
CordOutputStream(const CordOutputStream&) = delete;
CordOutputStream& operator=(const CordOutputStream&) = delete;
// implements `ZeroCopyOutputStream` ---------------------------------
bool Next(void** data, int* size) final;
void BackUp(int count) final;
int64_t ByteCount() const final;
bool WriteCord(const absl::Cord& cord) final;
// Consumes the serialized data as a cord value. `Consume()` internally
// flushes any pending state 'as if' BackUp(0) was called. While a final call
// to BackUp() is generally required by the `ZeroCopyOutputStream` contract,
// applications using `CordOutputStream` directly can call `Consume()` without
// a preceding call to `BackUp()`.
//
// While it will rarely be useful in practice (and especially in the presence
// of size hints) an instance is safe to be used after a call to `Consume()`.
// The only logical change in state is that all serialized data is extracted,
// and any new serialization calls will serialize into new cord data.
absl::Cord Consume();
private:
// State of `buffer_` and 'cord_. As a default CordBuffer instance always has
// inlined capacity, we track state explicitly to avoid returning 'existing
// capacity' from the default or 'moved from' CordBuffer. 'kSteal' indicates
// we should (attempt to) steal the next buffer from the cord.
enum class State { kEmpty, kFull, kPartial, kSteal };
absl::Cord cord_;
size_t size_hint_;
State state_ = State::kEmpty;
absl::CordBuffer buffer_;
};
// ===================================================================

@ -68,6 +68,8 @@
#include "google/protobuf/testing/file.h"
#include "absl/strings/cord.h"
#include "absl/strings/cord_buffer.h"
#include "absl/strings/string_view.h"
#include "google/protobuf/io/coded_stream.h"
#include "google/protobuf/io/io_win32.h"
#include "google/protobuf/io/zero_copy_stream_impl.h"
@ -82,6 +84,11 @@
#include "google/protobuf/testing/file.h"
#include "google/protobuf/testing/googletest.h"
#include <gtest/gtest.h>
#include "absl/status/status.h"
#include "absl/strings/cord.h"
#include "absl/strings/cord_buffer.h"
#include "absl/strings/string_view.h"
// Must be included last.
#include "google/protobuf/port_def.inc"
@ -874,6 +881,568 @@ TEST(DefaultWriteCordTest, WriteTooLargeCord) {
EXPECT_EQ(buffer, source.Subcord(0, output.ByteCount()));
}
TEST(CordInputStreamTest, SkipToEnd) {
absl::Cord source(std::string(10000, 'z'));
CordInputStream stream(&source);
EXPECT_TRUE(stream.Skip(10000));
EXPECT_EQ(stream.ByteCount(), 10000);
}
TEST_F(IoTest, CordIo) {
CordOutputStream output;
int size = WriteStuff(&output);
absl::Cord cord = output.Consume();
EXPECT_EQ(size, cord.size());
{
CordInputStream input(&cord);
ReadStuff(&input);
}
}
template <typename Container>
absl::Cord MakeFragmentedCord(const Container& c) {
absl::Cord result;
for (const auto& s : c) {
absl::string_view sv(s);
auto buffer = absl::CordBuffer::CreateWithDefaultLimit(sv.size());
absl::Span<char> out = buffer.available_up_to(sv.size());
memcpy(out.data(), sv.data(), out.size());
buffer.SetLength(out.size());
result.Append(std::move(buffer));
}
return result;
}
// Test that we can read correctly from a fragmented Cord.
TEST_F(IoTest, FragmentedCordInput) {
std::string str;
{
StringOutputStream output(&str);
WriteStuff(&output);
}
for (int i = 0; i < kBlockSizeCount; i++) {
int block_size = kBlockSizes[i];
if (block_size < 0) {
// Skip the -1 case.
continue;
}
absl::string_view str_piece = str;
// Create a fragmented cord by splitting the input into many cord
// functions.
std::vector<absl::string_view> fragments;
while (!str_piece.empty()) {
size_t n = std::min<size_t>(str_piece.size(), block_size);
fragments.push_back(str_piece.substr(0, n));
str_piece.remove_prefix(n);
}
absl::Cord fragmented_cord = MakeFragmentedCord(fragments);
CordInputStream input(&fragmented_cord);
ReadStuff(&input);
}
}
TEST_F(IoTest, ReadSmallCord) {
absl::Cord source;
source.Append("foo bar");
absl::Cord dest;
CordInputStream input(&source);
EXPECT_TRUE(input.Skip(1));
EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
EXPECT_EQ(absl::Cord("oo ba"), dest);
}
TEST_F(IoTest, ReadSmallCordAfterBackUp) {
absl::Cord source;
source.Append("foo bar");
absl::Cord dest;
CordInputStream input(&source);
const void* buffer;
int size;
EXPECT_TRUE(input.Next(&buffer, &size));
input.BackUp(size - 1);
EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
EXPECT_EQ(absl::Cord("oo ba"), dest);
}
TEST_F(IoTest, ReadLargeCord) {
absl::Cord source;
for (int i = 0; i < 1024; i++) {
source.Append("foo bar");
}
absl::Cord dest;
CordInputStream input(&source);
EXPECT_TRUE(input.Skip(1));
EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
absl::Cord expected = source;
expected.RemovePrefix(1);
expected.RemoveSuffix(1);
EXPECT_EQ(expected, dest);
}
TEST_F(IoTest, ReadLargeCordAfterBackUp) {
absl::Cord source;
for (int i = 0; i < 1024; i++) {
source.Append("foo bar");
}
absl::Cord dest;
CordInputStream input(&source);
const void* buffer;
int size;
EXPECT_TRUE(input.Next(&buffer, &size));
input.BackUp(size - 1);
EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
absl::Cord expected = source;
expected.RemovePrefix(1);
expected.RemoveSuffix(1);
EXPECT_EQ(expected, dest);
EXPECT_TRUE(input.Next(&buffer, &size));
EXPECT_EQ("r", std::string(reinterpret_cast<const char*>(buffer), size));
}
TEST_F(IoTest, ReadCordEof) {
absl::Cord source;
source.Append("foo bar");
absl::Cord dest;
CordInputStream input(&source);
input.Skip(1);
EXPECT_FALSE(input.ReadCord(&dest, source.size()));
absl::Cord expected = source;
expected.RemovePrefix(1);
EXPECT_EQ(expected, dest);
}
TEST(CordOutputStreamTest, Empty) {
CordOutputStream output;
EXPECT_TRUE(output.Consume().empty());
}
TEST(CordOutputStreamTest, ConsumesCordClearingState) {
CordOutputStream output(absl::Cord("abcdef"));
EXPECT_EQ(output.Consume(), "abcdef");
EXPECT_TRUE(output.Consume().empty());
}
TEST(CordOutputStreamTest, DonateEmptyCordBuffer) {
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
absl::Span<char> available = buffer.available();
void* available_data = available.data();
CordOutputStream output(std::move(buffer));
void* data;
int size;
EXPECT_TRUE(output.Next(&data, &size));
EXPECT_EQ(data, available_data);
EXPECT_EQ(size, static_cast<int>(available.size()));
memset(data, 'a', static_cast<size_t>(size));
absl::Cord cord = output.Consume();
ASSERT_TRUE(cord.TryFlat());
absl::string_view flat = *cord.TryFlat();
EXPECT_EQ(flat, std::string(static_cast<size_t>(size), 'a'));
EXPECT_EQ(flat.data(), available_data);
}
TEST(CordOutputStreamTest, DonatePartialCordBuffer) {
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
absl::Span<char> available = buffer.available();
memset(available.data(), 'a', 100);
buffer.IncreaseLengthBy(100);
void* available_data = available.data();
CordOutputStream output(std::move(buffer));
absl::Cord cord = output.Consume();
ASSERT_TRUE(cord.TryFlat());
absl::string_view flat = *cord.TryFlat();
EXPECT_EQ(flat, std::string(100, 'a'));
EXPECT_EQ(flat.data(), available_data);
}
TEST(CordOutputStreamTest, DonatePartialCordBufferAndUseExtraCapacity) {
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
absl::Span<char> available = buffer.available();
memset(available.data(), 'a', 100);
buffer.IncreaseLengthBy(100);
void* available_data = available.data();
void* next_available_data = available.data() + 100;
CordOutputStream output(std::move(buffer));
void* data;
int size;
EXPECT_TRUE(output.Next(&data, &size));
EXPECT_EQ(data, next_available_data);
EXPECT_EQ(size, static_cast<int>(available.size() - 100));
memset(data, 'b', static_cast<size_t>(size));
absl::Cord cord = output.Consume();
ASSERT_TRUE(cord.TryFlat());
absl::string_view flat = *cord.TryFlat();
EXPECT_EQ(flat, std::string(100, 'a') +
std::string(static_cast<size_t>(size), 'b'));
EXPECT_EQ(flat.data(), available_data);
}
TEST(CordOutputStreamTest, DonateCordAndPartialCordBufferAndUseExtraCapacity) {
absl::Cord cord(std::string(400, 'a'));
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
absl::Span<char> available = buffer.available();
memset(available.data(), 'b', 100);
buffer.IncreaseLengthBy(100);
void* next_available_data = available.data() + 100;
CordOutputStream output(std::move(cord), std::move(buffer));
void* data;
int size;
EXPECT_TRUE(output.Next(&data, &size));
EXPECT_EQ(data, next_available_data);
EXPECT_EQ(size, static_cast<int>(available.size() - 100));
memset(data, 'c', static_cast<size_t>(size));
cord = output.Consume();
EXPECT_FALSE(cord.TryFlat());
EXPECT_EQ(cord, std::string(400, 'a') + std::string(100, 'b') +
std::string(static_cast<size_t>(size), 'c'));
}
TEST(CordOutputStreamTest, DonateFullCordBuffer) {
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
absl::Span<char> available = buffer.available();
memset(available.data(), 'a', available.size());
buffer.IncreaseLengthBy(available.size());
CordOutputStream output(std::move(buffer));
void* data;
int size;
EXPECT_TRUE(output.Next(&data, &size));
memset(data, 'b', static_cast<size_t>(size));
absl::Cord cord = output.Consume();
EXPECT_FALSE(cord.TryFlat());
EXPECT_EQ(cord, std::string(available.size(), 'a') +
std::string(static_cast<size_t>(size), 'b'));
}
TEST(CordOutputStreamTest, DonateFullCordBufferAndCord) {
absl::Cord cord(std::string(400, 'a'));
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
absl::Span<char> available = buffer.available();
memset(available.data(), 'b', available.size());
buffer.IncreaseLengthBy(available.size());
CordOutputStream output(std::move(cord), std::move(buffer));
void* data;
int size;
EXPECT_TRUE(output.Next(&data, &size));
memset(data, 'c', static_cast<size_t>(size));
cord = output.Consume();
EXPECT_FALSE(cord.TryFlat());
EXPECT_EQ(cord, std::string(400, 'a') +
std::string(available.size(), 'b') +
std::string(static_cast<size_t>(size), 'c'));
}
TEST(CordOutputStreamTest, DonateFullCordBufferAndBackup) {
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
absl::Span<char> available = buffer.available();
memset(available.data(), 'a', available.size());
buffer.IncreaseLengthBy(available.size());
// We back up by 100 before calling Next()
void* available_data = available.data();
void* next_available_data = available.data() + available.size() - 100;
CordOutputStream output(std::move(buffer));
output.BackUp(100);
void* data;
int size;
EXPECT_TRUE(output.Next(&data, &size));
EXPECT_EQ(data, next_available_data);
EXPECT_EQ(size, 100);
memset(data, 'b', 100);
absl::Cord cord = output.Consume();
ASSERT_TRUE(cord.TryFlat());
absl::string_view flat = *cord.TryFlat();
EXPECT_EQ(flat,
std::string(available.size() - 100, 'a') + std::string(100, 'b'));
EXPECT_EQ(flat.data(), available_data);
}
TEST(CordOutputStreamTest, DonateCordAndFullCordBufferAndBackup) {
absl::Cord cord(std::string(400, 'a'));
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
absl::Span<char> available = buffer.available();
memset(available.data(), 'b', available.size());
buffer.IncreaseLengthBy(available.size());
// We back up by 100 before calling Next()
void* next_available_data = available.data() + available.size() - 100;
CordOutputStream output(std::move(cord), std::move(buffer));
output.BackUp(100);
void* data;
int size;
EXPECT_TRUE(output.Next(&data, &size));
EXPECT_EQ(data, next_available_data);
EXPECT_EQ(size, 100);
memset(data, 'c', 100);
cord = output.Consume();
EXPECT_FALSE(cord.TryFlat());
EXPECT_EQ(cord, std::string(400, 'a') +
std::string(available.size() - 100, 'b') +
std::string(100, 'c'));
}
TEST(CordOutputStreamTest, ProperHintCreatesSingleFlatCord) {
CordOutputStream output(2000);
void* data;
int size;
ASSERT_TRUE(output.Next(&data, &size));
ASSERT_EQ(size, 2000);
memset(data, 'a', 2000);
absl::Cord cord = output.Consume();
ASSERT_TRUE(cord.TryFlat());
absl::string_view flat = *cord.TryFlat();
EXPECT_EQ(flat, std::string(2000, 'a'));
}
TEST(CordOutputStreamTest, SizeHintDicatesTotalSize) {
absl::Cord cord(std::string(500, 'a'));
CordOutputStream output(std::move(cord), 2000);
void* data;
int size;
int remaining = 1500;
while (remaining > 0) {
ASSERT_TRUE(output.Next(&data, &size));
ASSERT_LE(size, remaining);
memset(data, 'b', static_cast<size_t>(size));
remaining -= size;
}
ASSERT_EQ(remaining, 0);
cord = output.Consume();
EXPECT_EQ(cord, absl::StrCat(std::string(500, 'a'), std::string(1500, 'b')));
}
TEST(CordOutputStreamTest, BackUpReusesPartialBuffer) {
CordOutputStream output(2000);
void* data;
int size;
ASSERT_TRUE(output.Next(&data, &size));
ASSERT_EQ(size, 2000);
memset(data, '1', 100);
output.BackUp(1900);
ASSERT_TRUE(output.Next(&data, &size));
ASSERT_EQ(size, 1900);
memset(data, '2', 200);
output.BackUp(1700);
ASSERT_TRUE(output.Next(&data, &size));
ASSERT_EQ(size, 1700);
memset(data, '3', 400);
output.BackUp(1300);
ASSERT_TRUE(output.Next(&data, &size));
ASSERT_EQ(size, 1300);
memset(data, '4', 1300);
absl::Cord cord = output.Consume();
ASSERT_TRUE(cord.TryFlat());
absl::string_view flat = *cord.TryFlat();
EXPECT_EQ(flat, absl::StrCat(std::string(100, '1'), std::string(200, '2'),
std::string(400, '3'), std::string(1300, '4')));
}
TEST(CordOutputStreamTest, UsesPrivateCapacityInDonatedCord) {
absl::Cord cord;
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(2000);
memset(buffer.data(), 'a', 500);
buffer.SetLength(500);
cord.Append(std::move(buffer));
CordOutputStream output(std::move(cord), 2000);
void* data;
int size;
ASSERT_TRUE(output.Next(&data, &size));
ASSERT_EQ(size, 1500);
memset(data, 'b', 1500);
cord = output.Consume();
ASSERT_TRUE(cord.TryFlat());
absl::string_view flat = *cord.TryFlat();
EXPECT_EQ(flat, absl::StrCat(std::string(500, 'a'), std::string(1500, 'b')));
}
TEST(CordOutputStreamTest, UsesPrivateCapacityInAppendedCord) {
absl::Cord cord;
absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(2000);
memset(buffer.data(), 'a', 500);
buffer.SetLength(500);
cord.Append(std::move(buffer));
CordOutputStream output(2000);
void* data;
int size;
// Add cord. Clearing it makes it privately owned by 'output' as it's non
// trivial size guarantees it is ref counted, not deep copied.
output.WriteCord(cord);
cord.Clear();
ASSERT_TRUE(output.Next(&data, &size));
ASSERT_EQ(size, 1500);
memset(data, 'b', 1500);
cord = output.Consume();
ASSERT_TRUE(cord.TryFlat());
absl::string_view flat = *cord.TryFlat();
EXPECT_EQ(flat, absl::StrCat(std::string(500, 'a'), std::string(1500, 'b')));
}
TEST(CordOutputStreamTest, CapsSizeAtHintButUsesCapacityBeyondHint) {
// This tests verifies that when we provide a hint of 'x' bytes, that the
// returned size from Next() will be capped at 'size_hint', but that if we
// exceed size_hint, it will use the capacity in any internal buffer beyond
// the size hint. We test this by providing a hint that is too large to be
// inlined, but so small that we have a guarantee it's smaller than the
// minimum flat size so we will have a 'capped' larger buffer as state.
size_t size_hint = sizeof(absl::Cord) + 1;
CordOutputStream output(size_hint);
void* data;
int size;
ASSERT_TRUE(output.Next(&data, &size));
ASSERT_EQ(size, size_hint);
memset(data, 'a', static_cast<size_t>(size));
ASSERT_TRUE(output.Next(&data, &size));
memset(data, 'b', static_cast<size_t>(size));
// We should have received the same buffer on each Next() call
absl::Cord cord = output.Consume();
ASSERT_TRUE(cord.TryFlat());
absl::string_view flat = *cord.TryFlat();
EXPECT_EQ(flat, absl::StrCat(std::string(size_hint, 'a'),
std::string(static_cast<size_t>(size), 'b')));
}
TEST(CordOutputStreamTest, SizeDoublesWithoutHint) {
CordOutputStream output;
void* data;
int size;
// Whitebox: we are guaranteed at least 128 bytes initially. We also assume
// that the maximum size is roughly 4KiB - overhead without being precise.
int min_size = 128;
const int max_size = 4000;
ASSERT_TRUE(output.Next(&data, &size));
memset(data, 0, static_cast<size_t>(size));
ASSERT_GE(size, min_size);
for (int i = 0; i < 6; ++i) {
ASSERT_TRUE(output.Next(&data, &size));
memset(data, 0, static_cast<size_t>(size));
ASSERT_GE(size, min_size);
min_size = (std::min)(min_size * 2, max_size);
}
}
TEST_F(IoTest, WriteSmallCord) {
absl::Cord source;
source.Append("foo bar");
CordOutputStream output(absl::Cord("existing:"));
EXPECT_TRUE(output.WriteCord(source));
absl::Cord cord = output.Consume();
EXPECT_EQ(absl::Cord("existing:foo bar"), cord);
}
TEST_F(IoTest, WriteLargeCord) {
absl::Cord source;
for (int i = 0; i < 1024; i++) {
source.Append("foo bar");
}
CordOutputStream output(absl::Cord("existing:"));
EXPECT_TRUE(output.WriteCord(source));
absl::Cord cord = output.Consume();
absl::Cord expected = source;
expected.Prepend("existing:");
EXPECT_EQ(expected, cord);
}
// Test that large size hints lead to large block sizes.
TEST_F(IoTest, CordOutputSizeHint) {
CordOutputStream output1;
CordOutputStream output2(12345);
void* data1;
void* data2;
int size1, size2;
ASSERT_TRUE(output1.Next(&data1, &size1));
ASSERT_TRUE(output2.Next(&data2, &size2));
// Prevent 'unflushed output' debug checks and warnings
output1.BackUp(size1);
output2.BackUp(size2);
EXPECT_GT(size2, size1);
// Prevent any warnings on unused or unflushed data
output1.Consume();
output2.Consume();
}
// Test that when we use a size hint, we get a buffer boundary exactly on that
// byte.
TEST_F(IoTest, CordOutputBufferEndsAtSizeHint) {
static const int kSizeHint = 12345;
CordOutputStream output(kSizeHint);
void* data;
int size;
int total_read = 0;
while (total_read < kSizeHint) {
ASSERT_TRUE(output.Next(&data, &size));
memset(data, 0, static_cast<size_t>(size)); // Avoid uninitialized data UB
total_read += size;
}
EXPECT_EQ(kSizeHint, total_read);
// We should be able to keep going past the size hint.
ASSERT_TRUE(output.Next(&data, &size));
EXPECT_GT(size, 0);
// Prevent any warnings on unused or unflushed data
output.Consume();
}
// To test files, we create a temporary file, write, read, truncate, repeat.
TEST_F(IoTest, FileIo) {

@ -33,6 +33,7 @@
#include <climits>
#include <iostream>
#include <string>
#include <utility>
#include "google/protobuf/stubs/logging.h"
#include "google/protobuf/stubs/common.h"
@ -51,6 +52,9 @@
#include "google/protobuf/unittest_lite.pb.h"
#include "google/protobuf/wire_format_lite.h"
// Must be included last
#include "google/protobuf/port_def.inc"
namespace google {
namespace protobuf {

Loading…
Cancel
Save