mirror of https://github.com/grpc/grpc.git
Reland (again) bytestream removal (#29987)
* Revert "Revert "Reland bytestream removal (#29911)" (#29964)"
This reverts commit e6c6840db3
.
* initial fc fuzzer
* fixes
* add rq to fc fuzzer
* fleshing things out
* Automated change: Fix sanity tests
* cleanup
* send with payload
* ensure if no reader no flow control tokens are granted
* remove some public methods
* remove bogus benchmarks
* account for pending size
* Automated change: Fix sanity tests
* Automated change: Fix sanity tests
* better logic
* Automated change: Fix sanity tests
* fix
* fixes
* fuzz pending size
* Automated change: Fix sanity tests
* fix
* Automated change: Fix sanity tests
* huh
* increase too short timeout
* review feedback
* review feedback
* fix u32 overflow
* fix
* robustness fixes for channelz_servicer_test
* fix
* Automated change: Fix sanity tests
* fix
* fix
* fix
* Automated change: Fix sanity tests
* dont send window updates if read closed
* Automated change: Fix sanity tests
Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30132/head
parent
17811f67de
commit
eb5ae61470
87 changed files with 2171 additions and 3873 deletions
@ -1,167 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/transport/byte_stream.h" |
||||
|
||||
#include <memory> |
||||
#include <utility> |
||||
|
||||
#include "absl/status/status.h" |
||||
|
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/slice/slice_refcount.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
//
|
||||
// SliceBufferByteStream
|
||||
//
|
||||
|
||||
SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer, |
||||
uint32_t flags) |
||||
: ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) { |
||||
GPR_ASSERT(slice_buffer->length <= UINT32_MAX); |
||||
grpc_slice_buffer_init(&backing_buffer_); |
||||
grpc_slice_buffer_swap(slice_buffer, &backing_buffer_); |
||||
if (backing_buffer_.count == 0) { |
||||
grpc_slice_buffer_add_indexed(&backing_buffer_, grpc_empty_slice()); |
||||
GPR_ASSERT(backing_buffer_.count > 0); |
||||
} |
||||
} |
||||
|
||||
SliceBufferByteStream::~SliceBufferByteStream() {} |
||||
|
||||
void SliceBufferByteStream::Orphan() { |
||||
grpc_slice_buffer_destroy_internal(&backing_buffer_); |
||||
GRPC_ERROR_UNREF(shutdown_error_); |
||||
shutdown_error_ = GRPC_ERROR_NONE; |
||||
// Note: We do not actually delete the object here, since
|
||||
// SliceBufferByteStream is usually allocated as part of a larger
|
||||
// object and has an OrphanablePtr of itself passed down through the
|
||||
// filter stack.
|
||||
} |
||||
|
||||
bool SliceBufferByteStream::Next(size_t /*max_size_hint*/, |
||||
grpc_closure* /*on_complete*/) { |
||||
GPR_DEBUG_ASSERT(backing_buffer_.count > 0); |
||||
return true; |
||||
} |
||||
|
||||
grpc_error_handle SliceBufferByteStream::Pull(grpc_slice* slice) { |
||||
if (GPR_UNLIKELY(shutdown_error_ != GRPC_ERROR_NONE)) { |
||||
return GRPC_ERROR_REF(shutdown_error_); |
||||
} |
||||
*slice = grpc_slice_buffer_take_first(&backing_buffer_); |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
void SliceBufferByteStream::Shutdown(grpc_error_handle error) { |
||||
GRPC_ERROR_UNREF(shutdown_error_); |
||||
shutdown_error_ = error; |
||||
} |
||||
|
||||
//
|
||||
// ByteStreamCache
|
||||
//
|
||||
|
||||
ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream) |
||||
: underlying_stream_(std::move(underlying_stream)), |
||||
length_(underlying_stream_->length()), |
||||
flags_(underlying_stream_->flags()) { |
||||
grpc_slice_buffer_init(&cache_buffer_); |
||||
} |
||||
|
||||
ByteStreamCache::~ByteStreamCache() { Destroy(); } |
||||
|
||||
void ByteStreamCache::Destroy() { |
||||
underlying_stream_.reset(); |
||||
if (cache_buffer_.length > 0) { |
||||
grpc_slice_buffer_destroy_internal(&cache_buffer_); |
||||
} |
||||
} |
||||
|
||||
//
|
||||
// ByteStreamCache::CachingByteStream
|
||||
//
|
||||
|
||||
ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache) |
||||
: ByteStream(cache->length_, cache->flags_), cache_(cache) {} |
||||
|
||||
ByteStreamCache::CachingByteStream::~CachingByteStream() {} |
||||
|
||||
void ByteStreamCache::CachingByteStream::Orphan() { |
||||
GRPC_ERROR_UNREF(shutdown_error_); |
||||
shutdown_error_ = GRPC_ERROR_NONE; |
||||
// Note: We do not actually delete the object here, since
|
||||
// CachingByteStream is usually allocated as part of a larger
|
||||
// object and has an OrphanablePtr of itself passed down through the
|
||||
// filter stack.
|
||||
} |
||||
|
||||
bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint, |
||||
grpc_closure* on_complete) { |
||||
if (shutdown_error_ != GRPC_ERROR_NONE) return true; |
||||
if (cursor_ < cache_->cache_buffer_.count) return true; |
||||
GPR_ASSERT(cache_->underlying_stream_ != nullptr); |
||||
return cache_->underlying_stream_->Next(max_size_hint, on_complete); |
||||
} |
||||
|
||||
grpc_error_handle ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) { |
||||
if (shutdown_error_ != GRPC_ERROR_NONE) { |
||||
return GRPC_ERROR_REF(shutdown_error_); |
||||
} |
||||
if (cursor_ < cache_->cache_buffer_.count) { |
||||
*slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]); |
||||
++cursor_; |
||||
offset_ += GRPC_SLICE_LENGTH(*slice); |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
GPR_ASSERT(cache_->underlying_stream_ != nullptr); |
||||
grpc_error_handle error = cache_->underlying_stream_->Pull(slice); |
||||
if (error == GRPC_ERROR_NONE) { |
||||
grpc_slice_buffer_add(&cache_->cache_buffer_, |
||||
grpc_slice_ref_internal(*slice)); |
||||
++cursor_; |
||||
offset_ += GRPC_SLICE_LENGTH(*slice); |
||||
// Orphan the underlying stream if it's been drained.
|
||||
if (offset_ == cache_->underlying_stream_->length()) { |
||||
cache_->underlying_stream_.reset(); |
||||
} |
||||
} |
||||
return error; |
||||
} |
||||
|
||||
void ByteStreamCache::CachingByteStream::Shutdown(grpc_error_handle error) { |
||||
GRPC_ERROR_UNREF(shutdown_error_); |
||||
shutdown_error_ = GRPC_ERROR_REF(error); |
||||
if (cache_->underlying_stream_ != nullptr) { |
||||
cache_->underlying_stream_->Shutdown(error); |
||||
} |
||||
} |
||||
|
||||
void ByteStreamCache::CachingByteStream::Reset() { |
||||
cursor_ = 0; |
||||
offset_ = 0; |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -1,170 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H |
||||
#define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
#include <stdint.h> |
||||
|
||||
#include <grpc/slice.h> |
||||
|
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
|
||||
/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
|
||||
* compression for the message. (Does not apply for stream compression.) */ |
||||
#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) |
||||
/** Internal bit flag for determining whether the message was compressed and had
|
||||
* to be decompressed by the message_decompress filter. (Does not apply for |
||||
* stream compression.) */ |
||||
#define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u) |
||||
/** Mask of all valid internal flags. */ |
||||
#define GRPC_WRITE_INTERNAL_USED_MASK \ |
||||
(GRPC_WRITE_INTERNAL_COMPRESS | GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED) |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class ByteStream : public Orphanable { |
||||
public: |
||||
~ByteStream() override {} |
||||
|
||||
// Returns true if the bytes are available immediately (in which case
|
||||
// on_complete will not be called), or false if the bytes will be available
|
||||
// asynchronously (in which case on_complete will be called when they
|
||||
// are available). Should not be called if there is no data left on the
|
||||
// stream.
|
||||
//
|
||||
// max_size_hint can be set as a hint as to the maximum number
|
||||
// of bytes that would be acceptable to read.
|
||||
virtual bool Next(size_t max_size_hint, grpc_closure* on_complete) = 0; |
||||
|
||||
// Returns the next slice in the byte stream when it is available, as
|
||||
// indicated by Next().
|
||||
//
|
||||
// Once a slice is returned into *slice, it is owned by the caller.
|
||||
virtual grpc_error_handle Pull(grpc_slice* slice) = 0; |
||||
|
||||
// Shuts down the byte stream.
|
||||
//
|
||||
// If there is a pending call to on_complete from Next(), it will be
|
||||
// invoked with the error passed to Shutdown().
|
||||
//
|
||||
// The next call to Pull() (if any) will return the error passed to
|
||||
// Shutdown().
|
||||
virtual void Shutdown(grpc_error_handle error) = 0; |
||||
|
||||
uint32_t length() const { return length_; } |
||||
uint32_t flags() const { return flags_; } |
||||
|
||||
void set_flags(uint32_t flags) { flags_ = flags; } |
||||
|
||||
protected: |
||||
ByteStream(uint32_t length, uint32_t flags) |
||||
: length_(length), flags_(flags) {} |
||||
|
||||
private: |
||||
const uint32_t length_; |
||||
uint32_t flags_; |
||||
}; |
||||
|
||||
//
|
||||
// SliceBufferByteStream
|
||||
//
|
||||
// A ByteStream that wraps a slice buffer.
|
||||
//
|
||||
|
||||
class SliceBufferByteStream : public ByteStream { |
||||
public: |
||||
// Removes all slices in slice_buffer, leaving it empty.
|
||||
SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags); |
||||
|
||||
~SliceBufferByteStream() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
bool Next(size_t max_size_hint, grpc_closure* on_complete) override; |
||||
grpc_error_handle Pull(grpc_slice* slice) override; |
||||
void Shutdown(grpc_error_handle error) override; |
||||
|
||||
private: |
||||
grpc_error_handle shutdown_error_ = GRPC_ERROR_NONE; |
||||
grpc_slice_buffer backing_buffer_; |
||||
}; |
||||
|
||||
//
|
||||
// CachingByteStream
|
||||
//
|
||||
// A ByteStream that that wraps an underlying byte stream but caches
|
||||
// the resulting slices in a slice buffer. If an initial attempt fails
|
||||
// without fully draining the underlying stream, a new caching stream
|
||||
// can be created from the same underlying cache, in which case it will
|
||||
// return whatever is in the backing buffer before continuing to read the
|
||||
// underlying stream.
|
||||
//
|
||||
// NOTE: No synchronization is done, so it is not safe to have multiple
|
||||
// CachingByteStreams simultaneously drawing from the same underlying
|
||||
// ByteStreamCache at the same time.
|
||||
//
|
||||
|
||||
class ByteStreamCache { |
||||
public: |
||||
class CachingByteStream : public ByteStream { |
||||
public: |
||||
explicit CachingByteStream(ByteStreamCache* cache); |
||||
|
||||
~CachingByteStream() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
bool Next(size_t max_size_hint, grpc_closure* on_complete) override; |
||||
grpc_error_handle Pull(grpc_slice* slice) override; |
||||
void Shutdown(grpc_error_handle error) override; |
||||
|
||||
// Resets the byte stream to the start of the underlying stream.
|
||||
void Reset(); |
||||
|
||||
private: |
||||
ByteStreamCache* cache_; |
||||
size_t cursor_ = 0; |
||||
size_t offset_ = 0; |
||||
grpc_error_handle shutdown_error_ = GRPC_ERROR_NONE; |
||||
}; |
||||
|
||||
explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream); |
||||
|
||||
~ByteStreamCache(); |
||||
|
||||
// Must not be destroyed while still in use by a CachingByteStream.
|
||||
void Destroy(); |
||||
|
||||
grpc_slice_buffer* cache_buffer() { return &cache_buffer_; } |
||||
|
||||
private: |
||||
OrphanablePtr<ByteStream> underlying_stream_; |
||||
uint32_t length_; |
||||
uint32_t flags_; |
||||
grpc_slice_buffer cache_buffer_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ |
@ -0,0 +1,118 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/grpc_security.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/sync.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/client_channel.h" |
||||
#include "src/core/ext/filters/http/server/http_server_filter.h" |
||||
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" |
||||
#include "src/core/lib/channel/connected_channel.h" |
||||
#include "src/core/lib/gprpp/host_port.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "src/core/lib/surface/server.h" |
||||
#include "test/core/end2end/end2end_tests.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
struct fullstack_fixture_data { |
||||
std::string localaddr; |
||||
}; |
||||
|
||||
static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( |
||||
const grpc_channel_args* /*client_args*/, |
||||
const grpc_channel_args* /*server_args*/) { |
||||
grpc_end2end_test_fixture f; |
||||
int port = grpc_pick_unused_port_or_die(); |
||||
fullstack_fixture_data* ffd = new fullstack_fixture_data(); |
||||
memset(&f, 0, sizeof(f)); |
||||
|
||||
ffd->localaddr = grpc_core::JoinHostPort("localhost", port); |
||||
|
||||
f.fixture_data = ffd; |
||||
f.cq = grpc_completion_queue_create_for_next(nullptr); |
||||
|
||||
return f; |
||||
} |
||||
|
||||
void chttp2_init_client_fullstack(grpc_end2end_test_fixture* f, |
||||
const grpc_channel_args* client_args) { |
||||
fullstack_fixture_data* ffd = |
||||
static_cast<fullstack_fixture_data*>(f->fixture_data); |
||||
grpc_channel_credentials* creds = grpc_insecure_credentials_create(); |
||||
grpc_arg no_retry = grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0); |
||||
client_args = grpc_channel_args_copy_and_add(client_args, &no_retry, 1); |
||||
f->client = grpc_channel_create(ffd->localaddr.c_str(), creds, client_args); |
||||
grpc_channel_args_destroy(client_args); |
||||
grpc_channel_credentials_release(creds); |
||||
GPR_ASSERT(f->client); |
||||
} |
||||
|
||||
void chttp2_init_server_fullstack(grpc_end2end_test_fixture* f, |
||||
const grpc_channel_args* server_args) { |
||||
fullstack_fixture_data* ffd = |
||||
static_cast<fullstack_fixture_data*>(f->fixture_data); |
||||
if (f->server) { |
||||
grpc_server_destroy(f->server); |
||||
} |
||||
f->server = grpc_server_create(server_args, nullptr); |
||||
grpc_server_register_completion_queue(f->server, f->cq, nullptr); |
||||
grpc_server_credentials* server_creds = |
||||
grpc_insecure_server_credentials_create(); |
||||
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr.c_str(), |
||||
server_creds)); |
||||
grpc_server_credentials_release(server_creds); |
||||
grpc_server_start(f->server); |
||||
} |
||||
|
||||
void chttp2_tear_down_fullstack(grpc_end2end_test_fixture* f) { |
||||
fullstack_fixture_data* ffd = |
||||
static_cast<fullstack_fixture_data*>(f->fixture_data); |
||||
delete ffd; |
||||
} |
||||
|
||||
/* All test configurations */ |
||||
static grpc_end2end_test_config configs[] = { |
||||
{"chttp2/fullstack", |
||||
FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | |
||||
FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | |
||||
FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, |
||||
nullptr, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, |
||||
chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, |
||||
}; |
||||
|
||||
int main(int argc, char** argv) { |
||||
size_t i; |
||||
|
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
grpc_end2end_tests_pre_init(); |
||||
grpc_init(); |
||||
|
||||
for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { |
||||
grpc_end2end_tests(argc, argv, configs[i]); |
||||
} |
||||
|
||||
grpc_shutdown(); |
||||
|
||||
return 0; |
||||
} |
@ -1,100 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
/* Test of gpr synchronization support. */ |
||||
|
||||
#include "src/core/lib/gprpp/manual_constructor.h" |
||||
|
||||
#include <stdio.h> |
||||
#include <stdlib.h> |
||||
|
||||
#include <cstring> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/sync.h> |
||||
|
||||
#include "test/core/util/test_config.h" |
||||
|
||||
class A { |
||||
public: |
||||
A() {} |
||||
virtual ~A() {} |
||||
virtual const char* foo() { return "A_foo"; } |
||||
virtual const char* bar() { return "A_bar"; } |
||||
}; |
||||
|
||||
class B : public A { |
||||
public: |
||||
B() {} |
||||
~B() override {} |
||||
const char* foo() override { return "B_foo"; } |
||||
char get_junk() { return junk[0]; } |
||||
|
||||
private: |
||||
char junk[1000]; |
||||
}; |
||||
|
||||
class C : public B { |
||||
public: |
||||
C() {} |
||||
~C() override {} |
||||
const char* bar() override { return "C_bar"; } |
||||
char get_more_junk() { return more_junk[0]; } |
||||
|
||||
private: |
||||
char more_junk[1000]; |
||||
}; |
||||
|
||||
class D : public A { |
||||
public: |
||||
const char* bar() override { return "D_bar"; } |
||||
}; |
||||
|
||||
static void basic_test() { |
||||
grpc_core::PolymorphicManualConstructor<A, B> poly; |
||||
poly.Init<B>(); |
||||
GPR_ASSERT(!strcmp(poly->foo(), "B_foo")); |
||||
GPR_ASSERT(!strcmp(poly->bar(), "A_bar")); |
||||
} |
||||
|
||||
static void complex_test() { |
||||
grpc_core::PolymorphicManualConstructor<A, B, C, D> polyB; |
||||
polyB.Init<B>(); |
||||
GPR_ASSERT(!strcmp(polyB->foo(), "B_foo")); |
||||
GPR_ASSERT(!strcmp(polyB->bar(), "A_bar")); |
||||
|
||||
grpc_core::PolymorphicManualConstructor<A, B, C, D> polyC; |
||||
polyC.Init<C>(); |
||||
GPR_ASSERT(!strcmp(polyC->foo(), "B_foo")); |
||||
GPR_ASSERT(!strcmp(polyC->bar(), "C_bar")); |
||||
|
||||
grpc_core::PolymorphicManualConstructor<A, B, C, D> polyD; |
||||
polyD.Init<D>(); |
||||
GPR_ASSERT(!strcmp(polyD->foo(), "A_foo")); |
||||
GPR_ASSERT(!strcmp(polyD->bar(), "D_bar")); |
||||
} |
||||
|
||||
/* ------------------------------------------------- */ |
||||
|
||||
int main(int argc, char* argv[]) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
basic_test(); |
||||
complex_test(); |
||||
return 0; |
||||
} |
@ -1,254 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/lib/transport/byte_stream.h" |
||||
|
||||
#include <gtest/gtest.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
//
|
||||
// SliceBufferByteStream tests
|
||||
//
|
||||
|
||||
void NotCalledClosure(void* /*arg*/, grpc_error_handle /*error*/) { |
||||
GPR_ASSERT(false); |
||||
} |
||||
|
||||
TEST(SliceBufferByteStream, Basic) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
// Create byte stream.
|
||||
SliceBufferByteStream stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
EXPECT_EQ(6U, stream.length()); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read each slice. Note that Next() always returns synchronously.
|
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||
grpc_slice_unref_internal(output); |
||||
} |
||||
// Clean up.
|
||||
stream.Orphan(); |
||||
} |
||||
|
||||
TEST(SliceBufferByteStream, Shutdown) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
// Create byte stream.
|
||||
SliceBufferByteStream stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
EXPECT_EQ(6U, stream.length()); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read the first slice.
|
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[0], output)); |
||||
grpc_slice_unref_internal(output); |
||||
// Now shutdown.
|
||||
grpc_error_handle shutdown_error = |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error"); |
||||
stream.Shutdown(GRPC_ERROR_REF(shutdown_error)); |
||||
// After shutdown, the next pull() should return the error.
|
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == shutdown_error); |
||||
GRPC_ERROR_UNREF(error); |
||||
GRPC_ERROR_UNREF(shutdown_error); |
||||
// Clean up.
|
||||
stream.Orphan(); |
||||
} |
||||
|
||||
//
|
||||
// CachingByteStream tests
|
||||
//
|
||||
|
||||
TEST(CachingByteStream, Basic) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer byte stream.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
SliceBufferByteStream underlying_stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
// Create cache and caching stream.
|
||||
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream))); |
||||
ByteStreamCache::CachingByteStream stream(&cache); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read each slice. Note that next() always returns synchronously,
|
||||
// because the underlying byte stream always does.
|
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||
grpc_slice_unref_internal(output); |
||||
} |
||||
// Clean up.
|
||||
stream.Orphan(); |
||||
cache.Destroy(); |
||||
} |
||||
|
||||
TEST(CachingByteStream, Reset) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer byte stream.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
SliceBufferByteStream underlying_stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
// Create cache and caching stream.
|
||||
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream))); |
||||
ByteStreamCache::CachingByteStream stream(&cache); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read one slice.
|
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[0], output)); |
||||
grpc_slice_unref_internal(output); |
||||
// Reset the caching stream. The reads should start over from the
|
||||
// first slice.
|
||||
stream.Reset(); |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||
grpc_slice_unref_internal(output); |
||||
} |
||||
// Clean up.
|
||||
stream.Orphan(); |
||||
cache.Destroy(); |
||||
} |
||||
|
||||
TEST(CachingByteStream, SharedCache) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer byte stream.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
SliceBufferByteStream underlying_stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
// Create cache and two caching streams.
|
||||
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream))); |
||||
ByteStreamCache::CachingByteStream stream1(&cache); |
||||
ByteStreamCache::CachingByteStream stream2(&cache); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read one slice from stream1.
|
||||
EXPECT_TRUE(stream1.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream1.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[0], output)); |
||||
grpc_slice_unref_internal(output); |
||||
// Read all slices from stream2.
|
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
EXPECT_TRUE(stream2.Next(~(size_t)0, &closure)); |
||||
error = stream2.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||
grpc_slice_unref_internal(output); |
||||
} |
||||
// Now read the second slice from stream1.
|
||||
EXPECT_TRUE(stream1.Next(~(size_t)0, &closure)); |
||||
error = stream1.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[1], output)); |
||||
grpc_slice_unref_internal(output); |
||||
// Clean up.
|
||||
stream1.Orphan(); |
||||
stream2.Orphan(); |
||||
cache.Destroy(); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
grpc_init(); |
||||
int retval = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return retval; |
||||
} |
@ -0,0 +1,408 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <limits> |
||||
#include <queue> |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/flow_control.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/libfuzzer/libfuzzer_macro.h" |
||||
#include "test/core/transport/chttp2/flow_control_fuzzer.pb.h" |
||||
|
||||
bool squelch = true; |
||||
|
||||
extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); |
||||
|
||||
namespace grpc_core { |
||||
namespace chttp2 { |
||||
namespace { |
||||
|
||||
constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000; |
||||
|
||||
gpr_timespec g_now; |
||||
gpr_timespec now_impl(gpr_clock_type clock_type) { |
||||
GPR_ASSERT(clock_type != GPR_TIMESPAN); |
||||
gpr_timespec ts = g_now; |
||||
ts.clock_type = clock_type; |
||||
return ts; |
||||
} |
||||
|
||||
void InitGlobals() { |
||||
g_now = {1, 0, GPR_CLOCK_MONOTONIC}; |
||||
TestOnlySetProcessEpoch(g_now); |
||||
gpr_now_impl = now_impl; |
||||
} |
||||
|
||||
class FlowControlFuzzer { |
||||
public: |
||||
explicit FlowControlFuzzer(bool enable_bdp) { |
||||
ExecCtx exec_ctx; |
||||
tfc_ = absl::make_unique<TransportFlowControl>("fuzzer", enable_bdp, |
||||
&memory_owner_); |
||||
} |
||||
|
||||
~FlowControlFuzzer() { |
||||
ExecCtx exec_ctx; |
||||
streams_.clear(); |
||||
tfc_.reset(); |
||||
memory_owner_.Release(allocated_memory_); |
||||
} |
||||
|
||||
void Perform(const flow_control_fuzzer::Action& action); |
||||
void AssertNoneStuck() const; |
||||
void AssertAnnouncedOverInitialWindowSizeCorrect() const; |
||||
|
||||
private: |
||||
struct StreamPayload { |
||||
uint32_t id; |
||||
uint64_t size; |
||||
}; |
||||
|
||||
struct SendToRemote { |
||||
bool bdp_ping = false; |
||||
absl::optional<uint32_t> initial_window_size; |
||||
uint32_t transport_window_update; |
||||
std::vector<StreamPayload> stream_window_updates; |
||||
}; |
||||
|
||||
struct SendFromRemote { |
||||
bool bdp_pong = false; |
||||
absl::optional<uint32_t> ack_initial_window_size; |
||||
std::vector<StreamPayload> stream_writes; |
||||
}; |
||||
|
||||
struct Stream { |
||||
explicit Stream(uint32_t id, TransportFlowControl* tfc) : id(id), fc(tfc) {} |
||||
uint32_t id; |
||||
StreamFlowControl fc; |
||||
int64_t queued_writes = 0; |
||||
int64_t window_delta = 0; |
||||
}; |
||||
|
||||
void PerformAction(FlowControlAction action, Stream* stream); |
||||
Stream* GetStream(uint32_t id) { |
||||
auto it = streams_.find(id); |
||||
if (it == streams_.end()) { |
||||
it = streams_.emplace(id, Stream{id, tfc_.get()}).first; |
||||
} |
||||
return &it->second; |
||||
} |
||||
|
||||
MemoryQuotaRefPtr memory_quota_ = MakeMemoryQuota("fuzzer"); |
||||
MemoryOwner memory_owner_ = memory_quota_->CreateMemoryOwner("owner"); |
||||
std::unique_ptr<TransportFlowControl> tfc_; |
||||
absl::optional<uint32_t> queued_initial_window_size_; |
||||
absl::optional<uint32_t> queued_send_max_frame_size_; |
||||
bool scheduled_write_ = false; |
||||
bool sending_initial_window_size_ = false; |
||||
std::deque<SendToRemote> send_to_remote_; |
||||
std::deque<SendFromRemote> send_from_remote_; |
||||
uint32_t remote_initial_window_size_ = kDefaultWindow; |
||||
int64_t remote_transport_window_size_ = kDefaultWindow; |
||||
std::map<uint32_t, Stream> streams_; |
||||
std::queue<uint32_t> streams_to_update_; |
||||
uint64_t allocated_memory_ = 0; |
||||
Timestamp next_bdp_ping_ = Timestamp::ProcessEpoch(); |
||||
}; |
||||
|
||||
void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) { |
||||
ExecCtx exec_ctx; |
||||
bool sending_payload = false; |
||||
switch (action.action_case()) { |
||||
case flow_control_fuzzer::Action::ACTION_NOT_SET: |
||||
break; |
||||
case flow_control_fuzzer::Action::kSetMemoryQuota: { |
||||
memory_quota_->SetSize( |
||||
Clamp(action.set_memory_quota(), uint64_t(1), |
||||
uint64_t(std::numeric_limits<int64_t>::max()))); |
||||
} break; |
||||
case flow_control_fuzzer::Action::kStepTimeMs: { |
||||
g_now = gpr_time_add( |
||||
g_now, gpr_time_from_millis(Clamp(action.step_time_ms(), uint64_t(1), |
||||
kMaxAdvanceTimeMillis), |
||||
GPR_TIMESPAN)); |
||||
exec_ctx.InvalidateNow(); |
||||
if (exec_ctx.Now() >= next_bdp_ping_) { |
||||
scheduled_write_ = true; |
||||
} |
||||
} break; |
||||
case flow_control_fuzzer::Action::kPeriodicUpdate: { |
||||
PerformAction(tfc_->PeriodicUpdate(), nullptr); |
||||
} break; |
||||
case flow_control_fuzzer::Action::kPerformSendToRemote: { |
||||
scheduled_write_ = true; |
||||
} break; |
||||
case flow_control_fuzzer::Action::kPerformSendToRemoteWithPayload: { |
||||
scheduled_write_ = true; |
||||
sending_payload = true; |
||||
} break; |
||||
case flow_control_fuzzer::Action::kReadSendToRemote: { |
||||
if (send_to_remote_.empty()) break; |
||||
auto sent_to_remote = send_to_remote_.front(); |
||||
if (sent_to_remote.initial_window_size.has_value()) { |
||||
if (!squelch) { |
||||
fprintf(stderr, "Setting initial window size to %d\n", |
||||
sent_to_remote.initial_window_size.value()); |
||||
} |
||||
SendFromRemote send_from_remote; |
||||
send_from_remote.ack_initial_window_size = |
||||
sent_to_remote.initial_window_size; |
||||
for (const auto& id_stream : streams_) { |
||||
GPR_ASSERT(id_stream.second.window_delta + |
||||
*sent_to_remote.initial_window_size <= |
||||
(1u << 31) - 1); |
||||
} |
||||
remote_initial_window_size_ = *sent_to_remote.initial_window_size; |
||||
send_from_remote_.push_back(send_from_remote); |
||||
} |
||||
if (sent_to_remote.bdp_ping) { |
||||
SendFromRemote send_from_remote; |
||||
send_from_remote.bdp_pong = true; |
||||
send_from_remote_.push_back(send_from_remote); |
||||
} |
||||
for (auto stream_update : sent_to_remote.stream_window_updates) { |
||||
Stream* s = GetStream(stream_update.id); |
||||
if (!squelch) { |
||||
fprintf(stderr, |
||||
"[%" PRIu32 "]: increase window delta by %" PRIu64 |
||||
" from %" PRId64 "\n", |
||||
stream_update.id, stream_update.size, s->window_delta); |
||||
} |
||||
s->window_delta += stream_update.size; |
||||
GPR_ASSERT(s->window_delta <= chttp2::kMaxWindowDelta); |
||||
} |
||||
remote_transport_window_size_ += sent_to_remote.transport_window_update; |
||||
send_to_remote_.pop_front(); |
||||
} break; |
||||
case flow_control_fuzzer::Action::kReadSendFromRemote: { |
||||
if (send_from_remote_.empty()) break; |
||||
auto sent_from_remote = send_from_remote_.front(); |
||||
if (sent_from_remote.ack_initial_window_size.has_value()) { |
||||
if (!squelch) { |
||||
fprintf(stderr, "Received ACK for initial window size %d\n", |
||||
*sent_from_remote.ack_initial_window_size); |
||||
} |
||||
tfc_->SetAckedInitialWindow(*sent_from_remote.ack_initial_window_size); |
||||
sending_initial_window_size_ = false; |
||||
} |
||||
if (sent_from_remote.bdp_pong) { |
||||
next_bdp_ping_ = tfc_->bdp_estimator()->CompletePing(); |
||||
} |
||||
for (const auto& stream_write : sent_from_remote.stream_writes) { |
||||
Stream* stream = GetStream(stream_write.id); |
||||
if (!squelch) { |
||||
fprintf(stderr, "[%" PRIu32 "]: recv write of %" PRIu64 "\n", |
||||
stream_write.id, stream_write.size); |
||||
} |
||||
if (auto* bdp = tfc_->bdp_estimator()) { |
||||
bdp->AddIncomingBytes(stream_write.size); |
||||
} |
||||
StreamFlowControl::IncomingUpdateContext upd(&stream->fc); |
||||
GPR_ASSERT(upd.RecvData(stream_write.size).ok()); |
||||
PerformAction(upd.MakeAction(), stream); |
||||
} |
||||
send_from_remote_.pop_front(); |
||||
} break; |
||||
case flow_control_fuzzer::Action::kStreamWrite: { |
||||
Stream* s = GetStream(action.stream_write().id()); |
||||
s->queued_writes += action.stream_write().size(); |
||||
} break; |
||||
case flow_control_fuzzer::Action::kPerformSendFromRemote: { |
||||
SendFromRemote send; |
||||
for (auto& id_stream : streams_) { |
||||
auto send_amount = std::min( |
||||
{id_stream.second.queued_writes, remote_transport_window_size_, |
||||
remote_initial_window_size_ + id_stream.second.window_delta}); |
||||
if (send_amount <= 0) continue; |
||||
send.stream_writes.push_back({id_stream.first, uint64_t(send_amount)}); |
||||
id_stream.second.queued_writes -= send_amount; |
||||
id_stream.second.window_delta -= send_amount; |
||||
remote_transport_window_size_ -= send_amount; |
||||
} |
||||
send_from_remote_.push_back(send); |
||||
} break; |
||||
case flow_control_fuzzer::Action::kSetMinProgressSize: { |
||||
Stream* s = GetStream(action.set_min_progress_size().id()); |
||||
StreamFlowControl::IncomingUpdateContext upd(&s->fc); |
||||
upd.SetMinProgressSize(action.set_min_progress_size().size()); |
||||
PerformAction(upd.MakeAction(), s); |
||||
} break; |
||||
case flow_control_fuzzer::Action::kAllocateMemory: { |
||||
auto allocate = std::min( |
||||
size_t(action.allocate_memory()), |
||||
grpc_event_engine::experimental::MemoryRequest::max_allowed_size()); |
||||
allocated_memory_ += allocate; |
||||
memory_owner_.Reserve(allocate); |
||||
} break; |
||||
case flow_control_fuzzer::Action::kDeallocateMemory: { |
||||
auto deallocate = |
||||
std::min(uint64_t(action.deallocate_memory()), allocated_memory_); |
||||
allocated_memory_ -= deallocate; |
||||
memory_owner_.Release(deallocate); |
||||
} break; |
||||
case flow_control_fuzzer::Action::kSetPendingSize: { |
||||
Stream* s = GetStream(action.set_min_progress_size().id()); |
||||
StreamFlowControl::IncomingUpdateContext upd(&s->fc); |
||||
upd.SetPendingSize(action.set_pending_size().size()); |
||||
PerformAction(upd.MakeAction(), s); |
||||
} break; |
||||
} |
||||
if (scheduled_write_) { |
||||
SendToRemote send; |
||||
if (exec_ctx.Now() >= next_bdp_ping_) { |
||||
if (auto* bdp = tfc_->bdp_estimator()) { |
||||
bdp->SchedulePing(); |
||||
bdp->StartPing(); |
||||
next_bdp_ping_ = Timestamp::InfFuture(); |
||||
send.bdp_ping = true; |
||||
} |
||||
} |
||||
if (!sending_initial_window_size_ && |
||||
queued_initial_window_size_.has_value()) { |
||||
sending_initial_window_size_ = true; |
||||
send.initial_window_size = |
||||
absl::exchange(queued_initial_window_size_, absl::nullopt); |
||||
} |
||||
while (!streams_to_update_.empty()) { |
||||
auto* stream = GetStream(streams_to_update_.front()); |
||||
streams_to_update_.pop(); |
||||
send.stream_window_updates.push_back( |
||||
{stream->id, stream->fc.MaybeSendUpdate()}); |
||||
} |
||||
send.transport_window_update = tfc_->MaybeSendUpdate(sending_payload); |
||||
queued_send_max_frame_size_.reset(); |
||||
send_to_remote_.emplace_back(std::move(send)); |
||||
scheduled_write_ = false; |
||||
} |
||||
} |
||||
|
||||
void FlowControlFuzzer::PerformAction(FlowControlAction action, |
||||
Stream* stream) { |
||||
if (!squelch) { |
||||
fprintf(stderr, "[%" PRId64 "]: ACTION: %s\n", |
||||
stream == nullptr ? int64_t(-1) : int64_t(stream->id), |
||||
action.DebugString().c_str()); |
||||
} |
||||
|
||||
auto with_urgency = [this](FlowControlAction::Urgency urgency, |
||||
std::function<void()> f) { |
||||
switch (urgency) { |
||||
case FlowControlAction::Urgency::NO_ACTION_NEEDED: |
||||
break; |
||||
case FlowControlAction::Urgency::UPDATE_IMMEDIATELY: |
||||
scheduled_write_ = true; |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case FlowControlAction::Urgency::QUEUE_UPDATE: |
||||
f(); |
||||
break; |
||||
} |
||||
}; |
||||
with_urgency(action.send_stream_update(), |
||||
[this, stream]() { streams_to_update_.push(stream->id); }); |
||||
with_urgency(action.send_transport_update(), []() {}); |
||||
with_urgency(action.send_initial_window_update(), [this, &action]() { |
||||
GPR_ASSERT(action.initial_window_size() >= chttp2::kMinInitialWindowSize); |
||||
GPR_ASSERT(action.initial_window_size() <= chttp2::kMaxInitialWindowSize); |
||||
queued_initial_window_size_ = action.initial_window_size(); |
||||
}); |
||||
with_urgency(action.send_max_frame_size_update(), [this, &action]() { |
||||
queued_send_max_frame_size_ = action.max_frame_size(); |
||||
}); |
||||
} |
||||
|
||||
void FlowControlFuzzer::AssertNoneStuck() const { |
||||
GPR_ASSERT(!scheduled_write_); |
||||
|
||||
// Reconcile all the values to get the view of the remote that is knowable to
|
||||
// the flow control system.
|
||||
std::map<uint32_t, int64_t> reconciled_stream_deltas; |
||||
int64_t reconciled_transport_window = remote_transport_window_size_; |
||||
int64_t reconciled_initial_window = remote_initial_window_size_; |
||||
for (const auto& id_stream : streams_) { |
||||
reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta; |
||||
} |
||||
|
||||
// Anything that's been sent from flow control -> remote needs to be added to
|
||||
// the remote.
|
||||
for (const auto& send_to_remote : send_to_remote_) { |
||||
if (send_to_remote.initial_window_size.has_value()) { |
||||
reconciled_initial_window = *send_to_remote.initial_window_size; |
||||
} |
||||
reconciled_transport_window += send_to_remote.transport_window_update; |
||||
for (const auto& stream_update : send_to_remote.stream_window_updates) { |
||||
reconciled_stream_deltas[stream_update.id] += stream_update.size; |
||||
} |
||||
} |
||||
|
||||
// Anything that's been sent from remote -> flow control needs to be wound
|
||||
// back into the remote.
|
||||
for (const auto& send_from_remote : send_from_remote_) { |
||||
for (const auto& stream_write : send_from_remote.stream_writes) { |
||||
reconciled_stream_deltas[stream_write.id] += stream_write.size; |
||||
reconciled_transport_window += stream_write.size; |
||||
} |
||||
} |
||||
|
||||
// Finally, if a stream has indicated it's willing to read, the reconciled
|
||||
// remote *MUST* be in a state where it could send at least one byte.
|
||||
for (const auto& id_stream : streams_) { |
||||
if (id_stream.second.fc.min_progress_size() == 0) continue; |
||||
int64_t stream_window = |
||||
reconciled_stream_deltas[id_stream.first] + reconciled_initial_window; |
||||
if (stream_window <= 0 || reconciled_transport_window <= 0) { |
||||
fprintf(stderr, |
||||
"FAILED: stream %d has stream_window=%" PRId64 |
||||
", transport_window=%" PRId64 ", delta=%" PRId64 |
||||
", init_window_size=%" PRId64 ", min_progress_size=%" PRId64 "\n", |
||||
id_stream.first, stream_window, reconciled_transport_window, |
||||
reconciled_stream_deltas[id_stream.first], |
||||
reconciled_initial_window, |
||||
int64_t(id_stream.second.fc.min_progress_size())); |
||||
abort(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void FlowControlFuzzer::AssertAnnouncedOverInitialWindowSizeCorrect() const { |
||||
uint64_t value_from_streams = 0; |
||||
|
||||
for (const auto& id_stream : streams_) { |
||||
const auto& stream = id_stream.second; |
||||
if (stream.fc.announced_window_delta() > 0) { |
||||
value_from_streams += stream.fc.announced_window_delta(); |
||||
} |
||||
} |
||||
|
||||
GPR_ASSERT(value_from_streams == |
||||
tfc_->announced_stream_total_over_incoming_window()); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace chttp2
|
||||
} // namespace grpc_core
|
||||
|
||||
DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg& msg) { |
||||
grpc_core::chttp2::InitGlobals(); |
||||
grpc_core::chttp2::FlowControlFuzzer fuzzer(msg.enable_bdp()); |
||||
for (const auto& action : msg.actions()) { |
||||
if (!squelch) { |
||||
fprintf(stderr, "%s\n", action.DebugString().c_str()); |
||||
} |
||||
fuzzer.Perform(action); |
||||
fuzzer.AssertNoneStuck(); |
||||
fuzzer.AssertAnnouncedOverInitialWindowSizeCorrect(); |
||||
} |
||||
} |
@ -0,0 +1,47 @@ |
||||
// Copyright 2022 gRPC authors. |
||||
// |
||||
// Licensed under the Apache License, Version 2.0 (the "License"); |
||||
// you may not use this file except in compliance with the License. |
||||
// You may obtain a copy of the License at |
||||
// |
||||
// http://www.apache.org/licenses/LICENSE-2.0 |
||||
// |
||||
// Unless required by applicable law or agreed to in writing, software |
||||
// distributed under the License is distributed on an "AS IS" BASIS, |
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
// See the License for the specific language governing permissions and |
||||
// limitations under the License. |
||||
|
||||
syntax = "proto3"; |
||||
|
||||
package flow_control_fuzzer; |
||||
|
||||
message Empty {}; |
||||
|
||||
message StreamWrite { |
||||
uint32 id = 1; |
||||
uint32 size = 2; |
||||
} |
||||
|
||||
message Action { |
||||
oneof action { |
||||
uint64 set_memory_quota = 1; |
||||
uint64 step_time_ms = 2; |
||||
Empty periodic_update = 3; |
||||
Empty perform_send_to_remote = 4; |
||||
Empty read_send_to_remote = 5; |
||||
Empty read_send_from_remote = 6; |
||||
StreamWrite stream_write = 7; |
||||
Empty perform_send_from_remote = 8; |
||||
StreamWrite set_min_progress_size = 9; |
||||
uint32 allocate_memory = 10; |
||||
uint32 deallocate_memory = 11; |
||||
Empty perform_send_to_remote_with_payload = 12; |
||||
StreamWrite set_pending_size = 13; |
||||
} |
||||
} |
||||
|
||||
message Msg { |
||||
bool enable_bdp = 1; |
||||
repeated Action actions = 2; |
||||
} |
@ -0,0 +1,90 @@ |
||||
enable_bdp: true |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
set_min_progress_size { |
||||
id: 11008 |
||||
size: 2883584 |
||||
} |
||||
} |
||||
actions { |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
stream_write { |
||||
id: 11008 |
||||
size: 2883584 |
||||
} |
||||
} |
||||
actions { |
||||
read_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
set_memory_quota: 0 |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
} |
@ -0,0 +1,14 @@ |
||||
enable_bdp: true |
||||
actions { |
||||
set_min_progress_size { |
||||
size: 11 |
||||
} |
||||
} |
||||
actions { |
||||
periodic_update { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
@ -0,0 +1,90 @@ |
||||
enable_bdp: true |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
stream_write { |
||||
id: 11008 |
||||
size: 2883584 |
||||
} |
||||
} |
||||
actions { |
||||
set_min_progress_size { |
||||
id: 11008 |
||||
size: 2883584 |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
set_memory_quota: 0 |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
} |
@ -0,0 +1,40 @@ |
||||
actions { |
||||
set_min_progress_size { |
||||
size: 33554432 |
||||
} |
||||
} |
||||
actions { |
||||
stream_write { |
||||
size: 33554432 |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
periodic_update { |
||||
} |
||||
} |
||||
actions { |
||||
read_send_from_remote { |
||||
} |
||||
} |
||||
actions { |
||||
perform_send_from_remote { |
||||
} |
||||
} |
@ -0,0 +1,14 @@ |
||||
enable_bdp: true |
||||
actions { |
||||
allocate_memory: 1593844738 |
||||
} |
||||
actions { |
||||
perform_send_to_remote { |
||||
} |
||||
} |
||||
actions { |
||||
set_memory_quota: 0 |
||||
} |
||||
actions { |
||||
deallocate_memory: 1593844738 |
||||
} |
@ -1,380 +1,171 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2021 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/flow_control.h" |
||||
|
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <functional> |
||||
#include <set> |
||||
#include <thread> |
||||
|
||||
#include <gmock/gmock.h> |
||||
#include <gtest/gtest.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/grpc_security.h> |
||||
#include <grpc/impl/codegen/grpc_types.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/string_util.h> |
||||
#include <grpc/support/time.h> |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
|
||||
#include "src/core/ext/filters/client_channel/backup_poller.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gprpp/host_port.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "test/core/end2end/cq_verifier.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
namespace grpc_core { |
||||
namespace chttp2 { |
||||
|
||||
namespace { |
||||
|
||||
class TransportTargetWindowSizeMocker |
||||
: public grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker { |
||||
public: |
||||
static constexpr uint32_t kLargeInitialWindowSize = 1u << 31; |
||||
static constexpr uint32_t kSmallInitialWindowSize = 0; |
||||
|
||||
double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate( |
||||
double /* current_target */) override { |
||||
// Protecting access to variable window_size_ shared between client and
|
||||
// server.
|
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (alternating_initial_window_sizes_) { |
||||
window_size_ = (window_size_ == kLargeInitialWindowSize) |
||||
? kSmallInitialWindowSize |
||||
: kLargeInitialWindowSize; |
||||
} |
||||
return window_size_; |
||||
} |
||||
|
||||
// Alternates the initial window size targets. Computes a low values if it was
|
||||
// previously high, or a high value if it was previously low.
|
||||
void AlternateTargetInitialWindowSizes() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
alternating_initial_window_sizes_ = true; |
||||
} |
||||
|
||||
void Reset() { |
||||
// Protecting access to variable window_size_ shared between client and
|
||||
// server.
|
||||
grpc_core::MutexLock lock(&mu_); |
||||
alternating_initial_window_sizes_ = false; |
||||
window_size_ = kLargeInitialWindowSize; |
||||
} |
||||
|
||||
private: |
||||
grpc_core::Mutex mu_; |
||||
bool alternating_initial_window_sizes_ ABSL_GUARDED_BY(mu_) = false; |
||||
double window_size_ ABSL_GUARDED_BY(mu_) = kLargeInitialWindowSize; |
||||
}; |
||||
|
||||
TransportTargetWindowSizeMocker* g_target_initial_window_size_mocker; |
||||
|
||||
void* tag(intptr_t t) { return reinterpret_cast<void*>(t); } |
||||
|
||||
void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) { |
||||
grpc_connectivity_state state = |
||||
grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */); |
||||
while (state != GRPC_CHANNEL_READY) { |
||||
grpc_channel_watch_connectivity_state( |
||||
channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr); |
||||
grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), |
||||
nullptr); |
||||
state = grpc_channel_check_connectivity_state(channel, 0); |
||||
} |
||||
auto* g_memory_owner = new MemoryOwner( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryOwner("test")); |
||||
} |
||||
|
||||
void VerifyChannelConnected(grpc_channel* channel, grpc_completion_queue* cq) { |
||||
// Verify channel is connected. Use a ping to make sure that clients
|
||||
// tries sending/receiving bytes if the channel is connected.
|
||||
grpc_channel_ping(channel, cq, reinterpret_cast<void*>(2000), nullptr); |
||||
grpc_event ev = grpc_completion_queue_next( |
||||
cq, grpc_timeout_seconds_to_deadline(5), nullptr); |
||||
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); |
||||
GPR_ASSERT(ev.tag == reinterpret_cast<void*>(2000)); |
||||
GPR_ASSERT(ev.success == 1); |
||||
GPR_ASSERT(grpc_channel_check_connectivity_state(channel, 0) == |
||||
GRPC_CHANNEL_READY); |
||||
TEST(FlowControl, NoOp) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
// Check initial values are per http2 spec
|
||||
EXPECT_EQ(tfc.acked_init_window(), 65535); |
||||
EXPECT_EQ(tfc.remote_window(), 65535); |
||||
EXPECT_EQ(tfc.target_frame_size(), 16384); |
||||
EXPECT_EQ(sfc.remote_window_delta(), 0); |
||||
EXPECT_EQ(sfc.min_progress_size(), 0); |
||||
EXPECT_EQ(sfc.announced_window_delta(), 0); |
||||
} |
||||
|
||||
// Shuts down and destroys the server.
|
||||
void ServerShutdownAndDestroy(grpc_server* server, grpc_completion_queue* cq) { |
||||
// Shutdown and destroy server
|
||||
grpc_server_shutdown_and_notify(server, cq, reinterpret_cast<void*>(1000)); |
||||
while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), |
||||
nullptr) |
||||
.tag != reinterpret_cast<void*>(1000)) { |
||||
TEST(FlowControl, SendData) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
{ |
||||
StreamFlowControl::OutgoingUpdateContext sfc_upd(&sfc); |
||||
sfc_upd.SentData(1024); |
||||
} |
||||
grpc_server_destroy(server); |
||||
EXPECT_EQ(sfc.remote_window_delta(), -1024); |
||||
EXPECT_EQ(tfc.remote_window(), 65535 - 1024); |
||||
} |
||||
|
||||
grpc_slice LargeSlice(void) { |
||||
grpc_slice slice = grpc_slice_malloc(10000000); // ~10MB
|
||||
memset(GRPC_SLICE_START_PTR(slice), 'x', GRPC_SLICE_LENGTH(slice)); |
||||
return slice; |
||||
TEST(FlowControl, InitialTransportUpdate) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
EXPECT_EQ(TransportFlowControl::IncomingUpdateContext(&tfc).MakeAction(), |
||||
FlowControlAction()); |
||||
} |
||||
|
||||
void PerformCallWithLargePayload(grpc_channel* channel, grpc_server* server, |
||||
grpc_completion_queue* cq) { |
||||
grpc_slice request_payload_slice = LargeSlice(); |
||||
grpc_slice response_payload_slice = LargeSlice(); |
||||
grpc_call* c; |
||||
grpc_call* s; |
||||
grpc_byte_buffer* request_payload = |
||||
grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
||||
grpc_byte_buffer* response_payload = |
||||
grpc_raw_byte_buffer_create(&response_payload_slice, 1); |
||||
cq_verifier* cqv = cq_verifier_create(cq); |
||||
grpc_op ops[6]; |
||||
grpc_op* op; |
||||
grpc_metadata_array initial_metadata_recv; |
||||
grpc_metadata_array trailing_metadata_recv; |
||||
grpc_metadata_array request_metadata_recv; |
||||
grpc_byte_buffer* request_payload_recv = nullptr; |
||||
grpc_byte_buffer* response_payload_recv = nullptr; |
||||
grpc_call_details call_details; |
||||
grpc_status_code status; |
||||
grpc_call_error error; |
||||
grpc_slice details; |
||||
int was_cancelled = 2; |
||||
|
||||
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30); |
||||
c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, |
||||
grpc_slice_from_static_string("/foo"), nullptr, |
||||
deadline, nullptr); |
||||
GPR_ASSERT(c); |
||||
|
||||
grpc_metadata_array_init(&initial_metadata_recv); |
||||
grpc_metadata_array_init(&trailing_metadata_recv); |
||||
grpc_metadata_array_init(&request_metadata_recv); |
||||
grpc_call_details_init(&call_details); |
||||
|
||||
memset(ops, 0, sizeof(ops)); |
||||
op = ops; |
||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
op->data.send_initial_metadata.count = 0; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_MESSAGE; |
||||
op->data.send_message.send_message = request_payload; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
||||
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_MESSAGE; |
||||
op->data.recv_message.recv_message = &response_payload_recv; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
||||
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; |
||||
op->data.recv_status_on_client.status = &status; |
||||
op->data.recv_status_on_client.status_details = &details; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1), |
||||
nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
|
||||
error = grpc_server_request_call(server, &s, &call_details, |
||||
&request_metadata_recv, cq, cq, tag(101)); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
CQ_EXPECT_COMPLETION(cqv, tag(101), 1); |
||||
cq_verify(cqv); |
||||
|
||||
memset(ops, 0, sizeof(ops)); |
||||
op = ops; |
||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
op->data.send_initial_metadata.count = 0; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_MESSAGE; |
||||
op->data.recv_message.recv_message = &request_payload_recv; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102), |
||||
nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
|
||||
CQ_EXPECT_COMPLETION(cqv, tag(102), 1); |
||||
cq_verify(cqv); |
||||
|
||||
memset(ops, 0, sizeof(ops)); |
||||
op = ops; |
||||
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
||||
op->data.recv_close_on_server.cancelled = &was_cancelled; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_MESSAGE; |
||||
op->data.send_message.send_message = response_payload; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
||||
op->data.send_status_from_server.trailing_metadata_count = 0; |
||||
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; |
||||
grpc_slice status_details = grpc_slice_from_static_string("xyz"); |
||||
op->data.send_status_from_server.status_details = &status_details; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103), |
||||
nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
|
||||
CQ_EXPECT_COMPLETION(cqv, tag(103), 1); |
||||
CQ_EXPECT_COMPLETION(cqv, tag(1), 1); |
||||
cq_verify(cqv); |
||||
|
||||
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); |
||||
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); |
||||
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); |
||||
GPR_ASSERT(was_cancelled == 0); |
||||
|
||||
grpc_slice_unref(details); |
||||
grpc_metadata_array_destroy(&initial_metadata_recv); |
||||
grpc_metadata_array_destroy(&trailing_metadata_recv); |
||||
grpc_metadata_array_destroy(&request_metadata_recv); |
||||
grpc_call_details_destroy(&call_details); |
||||
|
||||
grpc_call_unref(c); |
||||
grpc_call_unref(s); |
||||
|
||||
cq_verifier_destroy(cqv); |
||||
TEST(FlowControl, InitialStreamUpdate) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
EXPECT_EQ(StreamFlowControl::IncomingUpdateContext(&sfc).MakeAction(), |
||||
FlowControlAction()); |
||||
} |
||||
|
||||
grpc_byte_buffer_destroy(request_payload); |
||||
grpc_byte_buffer_destroy(response_payload); |
||||
grpc_byte_buffer_destroy(request_payload_recv); |
||||
grpc_byte_buffer_destroy(response_payload_recv); |
||||
grpc_slice_unref(request_payload_slice); |
||||
grpc_slice_unref(response_payload_slice); |
||||
TEST(FlowControl, RecvData) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); |
||||
EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(1024)); |
||||
sfc_upd.MakeAction(); |
||||
EXPECT_EQ(tfc.announced_window(), 65535 - 1024); |
||||
EXPECT_EQ(sfc.announced_window_delta(), -1024); |
||||
} |
||||
|
||||
class FlowControlTest : public ::testing::Test { |
||||
protected: |
||||
void SetUp() override { |
||||
cq_ = grpc_completion_queue_create_for_next(nullptr); |
||||
// create the server
|
||||
std::string server_address = |
||||
grpc_core::JoinHostPort("localhost", grpc_pick_unused_port_or_die()); |
||||
grpc_arg server_args[] = { |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 0), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
||||
grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), |
||||
server_args}; |
||||
server_ = grpc_server_create(&server_channel_args, nullptr); |
||||
grpc_server_register_completion_queue(server_, cq_, nullptr); |
||||
grpc_server_credentials* server_creds = |
||||
grpc_insecure_server_credentials_create(); |
||||
GPR_ASSERT(grpc_server_add_http2_port(server_, server_address.c_str(), |
||||
server_creds)); |
||||
grpc_server_credentials_release(server_creds); |
||||
grpc_server_start(server_); |
||||
// create the channel (bdp pings are enabled by default)
|
||||
grpc_arg client_args[] = { |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 1), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
||||
grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), |
||||
client_args}; |
||||
grpc_channel_credentials* creds = grpc_insecure_credentials_create(); |
||||
channel_ = grpc_channel_create(server_address.c_str(), creds, |
||||
&client_channel_args); |
||||
grpc_channel_credentials_release(creds); |
||||
VerifyChannelReady(channel_, cq_); |
||||
g_target_initial_window_size_mocker->Reset(); |
||||
TEST(FlowControl, TrackMinProgressSize) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
{ |
||||
StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); |
||||
sfc_upd.SetMinProgressSize(5); |
||||
sfc_upd.MakeAction(); |
||||
} |
||||
|
||||
void TearDown() override { |
||||
// shutdown and destroy the client and server
|
||||
grpc_channel_destroy(channel_); |
||||
ServerShutdownAndDestroy(server_, cq_); |
||||
grpc_completion_queue_shutdown(cq_); |
||||
while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME), |
||||
nullptr) |
||||
.type != GRPC_QUEUE_SHUTDOWN) { |
||||
} |
||||
grpc_completion_queue_destroy(cq_); |
||||
EXPECT_EQ(sfc.min_progress_size(), 5); |
||||
{ |
||||
StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); |
||||
sfc_upd.SetMinProgressSize(10); |
||||
sfc_upd.MakeAction(); |
||||
} |
||||
EXPECT_EQ(sfc.min_progress_size(), 10); |
||||
{ |
||||
StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); |
||||
EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5)); |
||||
sfc_upd.MakeAction(); |
||||
} |
||||
EXPECT_EQ(sfc.min_progress_size(), 5); |
||||
{ |
||||
StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); |
||||
EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5)); |
||||
sfc_upd.MakeAction(); |
||||
} |
||||
EXPECT_EQ(sfc.min_progress_size(), 0); |
||||
{ |
||||
StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); |
||||
EXPECT_EQ(absl::OkStatus(), sfc_upd.RecvData(5)); |
||||
sfc_upd.MakeAction(); |
||||
} |
||||
EXPECT_EQ(sfc.min_progress_size(), 0); |
||||
} |
||||
|
||||
grpc_server* server_ = nullptr; |
||||
grpc_channel* channel_ = nullptr; |
||||
grpc_completion_queue* cq_ = nullptr; |
||||
}; |
||||
|
||||
TEST_F(FlowControlTest, |
||||
TestLargeWindowSizeUpdatesDoNotCauseIllegalFlowControlWindows) { |
||||
for (int i = 0; i < 10; ++i) { |
||||
PerformCallWithLargePayload(channel_, server_, cq_); |
||||
VerifyChannelConnected(channel_, cq_); |
||||
TEST(FlowControl, NoUpdateWithoutReader) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
for (int i = 0; i < 65535; i++) { |
||||
StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); |
||||
EXPECT_EQ(sfc_upd.RecvData(1), absl::OkStatus()); |
||||
EXPECT_EQ(sfc_upd.MakeAction().send_stream_update(), |
||||
FlowControlAction::Urgency::NO_ACTION_NEEDED); |
||||
} |
||||
// Empty window needing 1 byte to progress should trigger an immediate read.
|
||||
{ |
||||
StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); |
||||
sfc_upd.SetMinProgressSize(1); |
||||
EXPECT_EQ(sfc.min_progress_size(), 1); |
||||
EXPECT_EQ(sfc_upd.MakeAction().send_stream_update(), |
||||
FlowControlAction::Urgency::UPDATE_IMMEDIATELY); |
||||
} |
||||
EXPECT_GT(tfc.MaybeSendUpdate(false), 0); |
||||
EXPECT_GT(sfc.MaybeSendUpdate(), 0); |
||||
} |
||||
|
||||
TEST_F(FlowControlTest, TestWindowSizeUpdatesDoNotCauseStalledStreams) { |
||||
g_target_initial_window_size_mocker->AlternateTargetInitialWindowSizes(); |
||||
for (int i = 0; i < 100; ++i) { |
||||
PerformCallWithLargePayload(channel_, server_, cq_); |
||||
VerifyChannelConnected(channel_, cq_); |
||||
TEST(FlowControl, GradualReadsUpdate) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
int immediate_updates = 0; |
||||
int queued_updates = 0; |
||||
for (int i = 0; i < 65535; i++) { |
||||
StreamFlowControl::IncomingUpdateContext sfc_upd(&sfc); |
||||
EXPECT_EQ(sfc_upd.RecvData(1), absl::OkStatus()); |
||||
sfc_upd.SetPendingSize(0); |
||||
switch (sfc_upd.MakeAction().send_stream_update()) { |
||||
case FlowControlAction::Urgency::UPDATE_IMMEDIATELY: |
||||
immediate_updates++; |
||||
break; |
||||
case FlowControlAction::Urgency::QUEUE_UPDATE: |
||||
queued_updates++; |
||||
break; |
||||
case FlowControlAction::Urgency::NO_ACTION_NEEDED: |
||||
break; |
||||
} |
||||
} |
||||
EXPECT_GT(immediate_updates, 0); |
||||
EXPECT_GT(queued_updates, 0); |
||||
EXPECT_EQ(immediate_updates + queued_updates, 65535); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace chttp2
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
// Make sure that we will have an active poller on all client-side fd's that
|
||||
// are capable of sending and receiving even in the case that we don't have an
|
||||
// active RPC operation on the fd.
|
||||
GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); |
||||
grpc_core::chttp2::g_test_only_transport_flow_control_window_check = true; |
||||
g_target_initial_window_size_mocker = new TransportTargetWindowSizeMocker(); |
||||
grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker = |
||||
g_target_initial_window_size_mocker; |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
grpc_init(); |
||||
auto result = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return result; |
||||
return RUN_ALL_TESTS(); |
||||
} |
||||
|
Loading…
Reference in new issue