Merge pull request #14686 from markdroth/caching_byte_stream_fix

Orphan underlying byte stream as soon as it's been drained.
pull/14700/head
Mark D. Roth 7 years ago committed by GitHub
commit b62d77f05b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 35
      src/core/lib/transport/byte_stream.cc
  2. 3
      src/core/lib/transport/byte_stream.h

@ -79,17 +79,19 @@ void SliceBufferByteStream::Shutdown(grpc_error* error) {
//
ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream)
: underlying_stream_(std::move(underlying_stream)) {
: underlying_stream_(std::move(underlying_stream)),
length_(underlying_stream_->length()),
flags_(underlying_stream_->flags()) {
grpc_slice_buffer_init(&cache_buffer_);
}
ByteStreamCache::~ByteStreamCache() {
if (underlying_stream_ != nullptr) Destroy();
}
ByteStreamCache::~ByteStreamCache() { Destroy(); }
void ByteStreamCache::Destroy() {
underlying_stream_.reset();
grpc_slice_buffer_destroy_internal(&cache_buffer_);
if (cache_buffer_.length > 0) {
grpc_slice_buffer_destroy_internal(&cache_buffer_);
}
}
//
@ -97,9 +99,7 @@ void ByteStreamCache::Destroy() {
//
ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache)
: ByteStream(cache->underlying_stream_->length(),
cache->underlying_stream_->flags()),
cache_(cache) {}
: ByteStream(cache->length_, cache->flags_), cache_(cache) {}
ByteStreamCache::CachingByteStream::~CachingByteStream() {}
@ -115,6 +115,7 @@ bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint,
grpc_closure* on_complete) {
if (shutdown_error_ != GRPC_ERROR_NONE) return true;
if (cursor_ < cache_->cache_buffer_.count) return true;
GPR_ASSERT(cache_->underlying_stream_ != nullptr);
return cache_->underlying_stream_->Next(max_size_hint, on_complete);
}
@ -125,13 +126,20 @@ grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
if (cursor_ < cache_->cache_buffer_.count) {
*slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]);
++cursor_;
offset_ += GRPC_SLICE_LENGTH(*slice);
return GRPC_ERROR_NONE;
}
GPR_ASSERT(cache_->underlying_stream_ != nullptr);
grpc_error* error = cache_->underlying_stream_->Pull(slice);
if (error == GRPC_ERROR_NONE) {
++cursor_;
grpc_slice_buffer_add(&cache_->cache_buffer_,
grpc_slice_ref_internal(*slice));
++cursor_;
offset_ += GRPC_SLICE_LENGTH(*slice);
// Orphan the underlying stream if it's been drained.
if (offset_ == cache_->underlying_stream_->length()) {
cache_->underlying_stream_.reset();
}
}
return error;
}
@ -139,9 +147,14 @@ grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) {
GRPC_ERROR_UNREF(shutdown_error_);
shutdown_error_ = GRPC_ERROR_REF(error);
cache_->underlying_stream_->Shutdown(error);
if (cache_->underlying_stream_ != nullptr) {
cache_->underlying_stream_->Shutdown(error);
}
}
void ByteStreamCache::CachingByteStream::Reset() { cursor_ = 0; }
void ByteStreamCache::CachingByteStream::Reset() {
cursor_ = 0;
offset_ = 0;
}
} // namespace grpc_core

@ -139,6 +139,7 @@ class ByteStreamCache {
private:
ByteStreamCache* cache_;
size_t cursor_ = 0;
size_t offset_ = 0;
grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
};
@ -153,6 +154,8 @@ class ByteStreamCache {
private:
OrphanablePtr<ByteStream> underlying_stream_;
uint32_t length_;
uint32_t flags_;
grpc_slice_buffer cache_buffer_;
};

Loading…
Cancel
Save