mirror of https://github.com/grpc/grpc.git
This reverts commit 93cdc8b77e
.
pull/29911/head
parent
a709faa868
commit
713a1581d5
77 changed files with 3532 additions and 1704 deletions
@ -0,0 +1,165 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/transport/byte_stream.h" |
||||
|
||||
#include <memory> |
||||
#include <utility> |
||||
|
||||
#include <grpc/slice_buffer.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/slice/slice_refcount.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
//
|
||||
// SliceBufferByteStream
|
||||
//
|
||||
|
||||
SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer, |
||||
uint32_t flags) |
||||
: ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) { |
||||
GPR_ASSERT(slice_buffer->length <= UINT32_MAX); |
||||
grpc_slice_buffer_init(&backing_buffer_); |
||||
grpc_slice_buffer_swap(slice_buffer, &backing_buffer_); |
||||
if (backing_buffer_.count == 0) { |
||||
grpc_slice_buffer_add_indexed(&backing_buffer_, grpc_empty_slice()); |
||||
GPR_ASSERT(backing_buffer_.count > 0); |
||||
} |
||||
} |
||||
|
||||
SliceBufferByteStream::~SliceBufferByteStream() {} |
||||
|
||||
void SliceBufferByteStream::Orphan() { |
||||
grpc_slice_buffer_destroy_internal(&backing_buffer_); |
||||
GRPC_ERROR_UNREF(shutdown_error_); |
||||
shutdown_error_ = GRPC_ERROR_NONE; |
||||
// Note: We do not actually delete the object here, since
|
||||
// SliceBufferByteStream is usually allocated as part of a larger
|
||||
// object and has an OrphanablePtr of itself passed down through the
|
||||
// filter stack.
|
||||
} |
||||
|
||||
bool SliceBufferByteStream::Next(size_t /*max_size_hint*/, |
||||
grpc_closure* /*on_complete*/) { |
||||
GPR_DEBUG_ASSERT(backing_buffer_.count > 0); |
||||
return true; |
||||
} |
||||
|
||||
grpc_error_handle SliceBufferByteStream::Pull(grpc_slice* slice) { |
||||
if (GPR_UNLIKELY(shutdown_error_ != GRPC_ERROR_NONE)) { |
||||
return GRPC_ERROR_REF(shutdown_error_); |
||||
} |
||||
*slice = grpc_slice_buffer_take_first(&backing_buffer_); |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
void SliceBufferByteStream::Shutdown(grpc_error_handle error) { |
||||
GRPC_ERROR_UNREF(shutdown_error_); |
||||
shutdown_error_ = error; |
||||
} |
||||
|
||||
//
|
||||
// ByteStreamCache
|
||||
//
|
||||
|
||||
ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream) |
||||
: underlying_stream_(std::move(underlying_stream)), |
||||
length_(underlying_stream_->length()), |
||||
flags_(underlying_stream_->flags()) { |
||||
grpc_slice_buffer_init(&cache_buffer_); |
||||
} |
||||
|
||||
ByteStreamCache::~ByteStreamCache() { Destroy(); } |
||||
|
||||
void ByteStreamCache::Destroy() { |
||||
underlying_stream_.reset(); |
||||
if (cache_buffer_.length > 0) { |
||||
grpc_slice_buffer_destroy_internal(&cache_buffer_); |
||||
} |
||||
} |
||||
|
||||
//
|
||||
// ByteStreamCache::CachingByteStream
|
||||
//
|
||||
|
||||
ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache) |
||||
: ByteStream(cache->length_, cache->flags_), cache_(cache) {} |
||||
|
||||
ByteStreamCache::CachingByteStream::~CachingByteStream() {} |
||||
|
||||
void ByteStreamCache::CachingByteStream::Orphan() { |
||||
GRPC_ERROR_UNREF(shutdown_error_); |
||||
shutdown_error_ = GRPC_ERROR_NONE; |
||||
// Note: We do not actually delete the object here, since
|
||||
// CachingByteStream is usually allocated as part of a larger
|
||||
// object and has an OrphanablePtr of itself passed down through the
|
||||
// filter stack.
|
||||
} |
||||
|
||||
bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint, |
||||
grpc_closure* on_complete) { |
||||
if (shutdown_error_ != GRPC_ERROR_NONE) return true; |
||||
if (cursor_ < cache_->cache_buffer_.count) return true; |
||||
GPR_ASSERT(cache_->underlying_stream_ != nullptr); |
||||
return cache_->underlying_stream_->Next(max_size_hint, on_complete); |
||||
} |
||||
|
||||
grpc_error_handle ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) { |
||||
if (shutdown_error_ != GRPC_ERROR_NONE) { |
||||
return GRPC_ERROR_REF(shutdown_error_); |
||||
} |
||||
if (cursor_ < cache_->cache_buffer_.count) { |
||||
*slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]); |
||||
++cursor_; |
||||
offset_ += GRPC_SLICE_LENGTH(*slice); |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
GPR_ASSERT(cache_->underlying_stream_ != nullptr); |
||||
grpc_error_handle error = cache_->underlying_stream_->Pull(slice); |
||||
if (error == GRPC_ERROR_NONE) { |
||||
grpc_slice_buffer_add(&cache_->cache_buffer_, |
||||
grpc_slice_ref_internal(*slice)); |
||||
++cursor_; |
||||
offset_ += GRPC_SLICE_LENGTH(*slice); |
||||
// Orphan the underlying stream if it's been drained.
|
||||
if (offset_ == cache_->underlying_stream_->length()) { |
||||
cache_->underlying_stream_.reset(); |
||||
} |
||||
} |
||||
return error; |
||||
} |
||||
|
||||
void ByteStreamCache::CachingByteStream::Shutdown(grpc_error_handle error) { |
||||
GRPC_ERROR_UNREF(shutdown_error_); |
||||
shutdown_error_ = GRPC_ERROR_REF(error); |
||||
if (cache_->underlying_stream_ != nullptr) { |
||||
cache_->underlying_stream_->Shutdown(error); |
||||
} |
||||
} |
||||
|
||||
void ByteStreamCache::CachingByteStream::Reset() { |
||||
cursor_ = 0; |
||||
offset_ = 0; |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,170 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H |
||||
#define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
#include <stdint.h> |
||||
|
||||
#include <grpc/slice.h> |
||||
|
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
|
||||
/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
|
||||
* compression for the message. (Does not apply for stream compression.) */ |
||||
#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) |
||||
/** Internal bit flag for determining whether the message was compressed and had
|
||||
* to be decompressed by the message_decompress filter. (Does not apply for |
||||
* stream compression.) */ |
||||
#define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u) |
||||
/** Mask of all valid internal flags. */ |
||||
#define GRPC_WRITE_INTERNAL_USED_MASK \ |
||||
(GRPC_WRITE_INTERNAL_COMPRESS | GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED) |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class ByteStream : public Orphanable { |
||||
public: |
||||
~ByteStream() override {} |
||||
|
||||
// Returns true if the bytes are available immediately (in which case
|
||||
// on_complete will not be called), or false if the bytes will be available
|
||||
// asynchronously (in which case on_complete will be called when they
|
||||
// are available). Should not be called if there is no data left on the
|
||||
// stream.
|
||||
//
|
||||
// max_size_hint can be set as a hint as to the maximum number
|
||||
// of bytes that would be acceptable to read.
|
||||
virtual bool Next(size_t max_size_hint, grpc_closure* on_complete) = 0; |
||||
|
||||
// Returns the next slice in the byte stream when it is available, as
|
||||
// indicated by Next().
|
||||
//
|
||||
// Once a slice is returned into *slice, it is owned by the caller.
|
||||
virtual grpc_error_handle Pull(grpc_slice* slice) = 0; |
||||
|
||||
// Shuts down the byte stream.
|
||||
//
|
||||
// If there is a pending call to on_complete from Next(), it will be
|
||||
// invoked with the error passed to Shutdown().
|
||||
//
|
||||
// The next call to Pull() (if any) will return the error passed to
|
||||
// Shutdown().
|
||||
virtual void Shutdown(grpc_error_handle error) = 0; |
||||
|
||||
uint32_t length() const { return length_; } |
||||
uint32_t flags() const { return flags_; } |
||||
|
||||
void set_flags(uint32_t flags) { flags_ = flags; } |
||||
|
||||
protected: |
||||
ByteStream(uint32_t length, uint32_t flags) |
||||
: length_(length), flags_(flags) {} |
||||
|
||||
private: |
||||
const uint32_t length_; |
||||
uint32_t flags_; |
||||
}; |
||||
|
||||
//
|
||||
// SliceBufferByteStream
|
||||
//
|
||||
// A ByteStream that wraps a slice buffer.
|
||||
//
|
||||
|
||||
class SliceBufferByteStream : public ByteStream { |
||||
public: |
||||
// Removes all slices in slice_buffer, leaving it empty.
|
||||
SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags); |
||||
|
||||
~SliceBufferByteStream() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
bool Next(size_t max_size_hint, grpc_closure* on_complete) override; |
||||
grpc_error_handle Pull(grpc_slice* slice) override; |
||||
void Shutdown(grpc_error_handle error) override; |
||||
|
||||
private: |
||||
grpc_error_handle shutdown_error_ = GRPC_ERROR_NONE; |
||||
grpc_slice_buffer backing_buffer_; |
||||
}; |
||||
|
||||
//
|
||||
// CachingByteStream
|
||||
//
|
||||
// A ByteStream that that wraps an underlying byte stream but caches
|
||||
// the resulting slices in a slice buffer. If an initial attempt fails
|
||||
// without fully draining the underlying stream, a new caching stream
|
||||
// can be created from the same underlying cache, in which case it will
|
||||
// return whatever is in the backing buffer before continuing to read the
|
||||
// underlying stream.
|
||||
//
|
||||
// NOTE: No synchronization is done, so it is not safe to have multiple
|
||||
// CachingByteStreams simultaneously drawing from the same underlying
|
||||
// ByteStreamCache at the same time.
|
||||
//
|
||||
|
||||
class ByteStreamCache { |
||||
public: |
||||
class CachingByteStream : public ByteStream { |
||||
public: |
||||
explicit CachingByteStream(ByteStreamCache* cache); |
||||
|
||||
~CachingByteStream() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
bool Next(size_t max_size_hint, grpc_closure* on_complete) override; |
||||
grpc_error_handle Pull(grpc_slice* slice) override; |
||||
void Shutdown(grpc_error_handle error) override; |
||||
|
||||
// Resets the byte stream to the start of the underlying stream.
|
||||
void Reset(); |
||||
|
||||
private: |
||||
ByteStreamCache* cache_; |
||||
size_t cursor_ = 0; |
||||
size_t offset_ = 0; |
||||
grpc_error_handle shutdown_error_ = GRPC_ERROR_NONE; |
||||
}; |
||||
|
||||
explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream); |
||||
|
||||
~ByteStreamCache(); |
||||
|
||||
// Must not be destroyed while still in use by a CachingByteStream.
|
||||
void Destroy(); |
||||
|
||||
grpc_slice_buffer* cache_buffer() { return &cache_buffer_; } |
||||
|
||||
private: |
||||
OrphanablePtr<ByteStream> underlying_stream_; |
||||
uint32_t length_; |
||||
uint32_t flags_; |
||||
grpc_slice_buffer cache_buffer_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ |
@ -1,118 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/grpc_security.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/sync.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/client_channel.h" |
||||
#include "src/core/ext/filters/http/server/http_server_filter.h" |
||||
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" |
||||
#include "src/core/lib/channel/connected_channel.h" |
||||
#include "src/core/lib/gprpp/host_port.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "src/core/lib/surface/server.h" |
||||
#include "test/core/end2end/end2end_tests.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
struct fullstack_fixture_data { |
||||
std::string localaddr; |
||||
}; |
||||
|
||||
static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( |
||||
const grpc_channel_args* /*client_args*/, |
||||
const grpc_channel_args* /*server_args*/) { |
||||
grpc_end2end_test_fixture f; |
||||
int port = grpc_pick_unused_port_or_die(); |
||||
fullstack_fixture_data* ffd = new fullstack_fixture_data(); |
||||
memset(&f, 0, sizeof(f)); |
||||
|
||||
ffd->localaddr = grpc_core::JoinHostPort("localhost", port); |
||||
|
||||
f.fixture_data = ffd; |
||||
f.cq = grpc_completion_queue_create_for_next(nullptr); |
||||
|
||||
return f; |
||||
} |
||||
|
||||
void chttp2_init_client_fullstack(grpc_end2end_test_fixture* f, |
||||
const grpc_channel_args* client_args) { |
||||
fullstack_fixture_data* ffd = |
||||
static_cast<fullstack_fixture_data*>(f->fixture_data); |
||||
grpc_channel_credentials* creds = grpc_insecure_credentials_create(); |
||||
grpc_arg no_retry = grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0); |
||||
client_args = grpc_channel_args_copy_and_add(client_args, &no_retry, 1); |
||||
f->client = grpc_channel_create(ffd->localaddr.c_str(), creds, client_args); |
||||
grpc_channel_args_destroy(client_args); |
||||
grpc_channel_credentials_release(creds); |
||||
GPR_ASSERT(f->client); |
||||
} |
||||
|
||||
void chttp2_init_server_fullstack(grpc_end2end_test_fixture* f, |
||||
const grpc_channel_args* server_args) { |
||||
fullstack_fixture_data* ffd = |
||||
static_cast<fullstack_fixture_data*>(f->fixture_data); |
||||
if (f->server) { |
||||
grpc_server_destroy(f->server); |
||||
} |
||||
f->server = grpc_server_create(server_args, nullptr); |
||||
grpc_server_register_completion_queue(f->server, f->cq, nullptr); |
||||
grpc_server_credentials* server_creds = |
||||
grpc_insecure_server_credentials_create(); |
||||
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr.c_str(), |
||||
server_creds)); |
||||
grpc_server_credentials_release(server_creds); |
||||
grpc_server_start(f->server); |
||||
} |
||||
|
||||
void chttp2_tear_down_fullstack(grpc_end2end_test_fixture* f) { |
||||
fullstack_fixture_data* ffd = |
||||
static_cast<fullstack_fixture_data*>(f->fixture_data); |
||||
delete ffd; |
||||
} |
||||
|
||||
/* All test configurations */ |
||||
static grpc_end2end_test_config configs[] = { |
||||
{"chttp2/fullstack", |
||||
FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | |
||||
FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | |
||||
FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, |
||||
nullptr, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, |
||||
chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, |
||||
}; |
||||
|
||||
int main(int argc, char** argv) { |
||||
size_t i; |
||||
|
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
grpc_end2end_tests_pre_init(); |
||||
grpc_init(); |
||||
|
||||
for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { |
||||
grpc_end2end_tests(argc, argv, configs[i]); |
||||
} |
||||
|
||||
grpc_shutdown(); |
||||
|
||||
return 0; |
||||
} |
@ -0,0 +1,100 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
/* Test of gpr synchronization support. */ |
||||
|
||||
#include "src/core/lib/gprpp/manual_constructor.h" |
||||
|
||||
#include <stdio.h> |
||||
#include <stdlib.h> |
||||
|
||||
#include <cstring> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/sync.h> |
||||
|
||||
#include "test/core/util/test_config.h" |
||||
|
||||
class A { |
||||
public: |
||||
A() {} |
||||
virtual ~A() {} |
||||
virtual const char* foo() { return "A_foo"; } |
||||
virtual const char* bar() { return "A_bar"; } |
||||
}; |
||||
|
||||
class B : public A { |
||||
public: |
||||
B() {} |
||||
~B() override {} |
||||
const char* foo() override { return "B_foo"; } |
||||
char get_junk() { return junk[0]; } |
||||
|
||||
private: |
||||
char junk[1000]; |
||||
}; |
||||
|
||||
class C : public B { |
||||
public: |
||||
C() {} |
||||
~C() override {} |
||||
const char* bar() override { return "C_bar"; } |
||||
char get_more_junk() { return more_junk[0]; } |
||||
|
||||
private: |
||||
char more_junk[1000]; |
||||
}; |
||||
|
||||
class D : public A { |
||||
public: |
||||
const char* bar() override { return "D_bar"; } |
||||
}; |
||||
|
||||
static void basic_test() { |
||||
grpc_core::PolymorphicManualConstructor<A, B> poly; |
||||
poly.Init<B>(); |
||||
GPR_ASSERT(!strcmp(poly->foo(), "B_foo")); |
||||
GPR_ASSERT(!strcmp(poly->bar(), "A_bar")); |
||||
} |
||||
|
||||
static void complex_test() { |
||||
grpc_core::PolymorphicManualConstructor<A, B, C, D> polyB; |
||||
polyB.Init<B>(); |
||||
GPR_ASSERT(!strcmp(polyB->foo(), "B_foo")); |
||||
GPR_ASSERT(!strcmp(polyB->bar(), "A_bar")); |
||||
|
||||
grpc_core::PolymorphicManualConstructor<A, B, C, D> polyC; |
||||
polyC.Init<C>(); |
||||
GPR_ASSERT(!strcmp(polyC->foo(), "B_foo")); |
||||
GPR_ASSERT(!strcmp(polyC->bar(), "C_bar")); |
||||
|
||||
grpc_core::PolymorphicManualConstructor<A, B, C, D> polyD; |
||||
polyD.Init<D>(); |
||||
GPR_ASSERT(!strcmp(polyD->foo(), "A_foo")); |
||||
GPR_ASSERT(!strcmp(polyD->bar(), "D_bar")); |
||||
} |
||||
|
||||
/* ------------------------------------------------- */ |
||||
|
||||
int main(int argc, char* argv[]) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
basic_test(); |
||||
complex_test(); |
||||
return 0; |
||||
} |
@ -0,0 +1,254 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2017 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/lib/transport/byte_stream.h" |
||||
|
||||
#include <gtest/gtest.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
//
|
||||
// SliceBufferByteStream tests
|
||||
//
|
||||
|
||||
void NotCalledClosure(void* /*arg*/, grpc_error_handle /*error*/) { |
||||
GPR_ASSERT(false); |
||||
} |
||||
|
||||
TEST(SliceBufferByteStream, Basic) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
// Create byte stream.
|
||||
SliceBufferByteStream stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
EXPECT_EQ(6U, stream.length()); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read each slice. Note that Next() always returns synchronously.
|
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||
grpc_slice_unref_internal(output); |
||||
} |
||||
// Clean up.
|
||||
stream.Orphan(); |
||||
} |
||||
|
||||
TEST(SliceBufferByteStream, Shutdown) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
// Create byte stream.
|
||||
SliceBufferByteStream stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
EXPECT_EQ(6U, stream.length()); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read the first slice.
|
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[0], output)); |
||||
grpc_slice_unref_internal(output); |
||||
// Now shutdown.
|
||||
grpc_error_handle shutdown_error = |
||||
GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error"); |
||||
stream.Shutdown(GRPC_ERROR_REF(shutdown_error)); |
||||
// After shutdown, the next pull() should return the error.
|
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == shutdown_error); |
||||
GRPC_ERROR_UNREF(error); |
||||
GRPC_ERROR_UNREF(shutdown_error); |
||||
// Clean up.
|
||||
stream.Orphan(); |
||||
} |
||||
|
||||
//
|
||||
// CachingByteStream tests
|
||||
//
|
||||
|
||||
TEST(CachingByteStream, Basic) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer byte stream.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
SliceBufferByteStream underlying_stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
// Create cache and caching stream.
|
||||
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream))); |
||||
ByteStreamCache::CachingByteStream stream(&cache); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read each slice. Note that next() always returns synchronously,
|
||||
// because the underlying byte stream always does.
|
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||
grpc_slice_unref_internal(output); |
||||
} |
||||
// Clean up.
|
||||
stream.Orphan(); |
||||
cache.Destroy(); |
||||
} |
||||
|
||||
TEST(CachingByteStream, Reset) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer byte stream.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
SliceBufferByteStream underlying_stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
// Create cache and caching stream.
|
||||
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream))); |
||||
ByteStreamCache::CachingByteStream stream(&cache); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read one slice.
|
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[0], output)); |
||||
grpc_slice_unref_internal(output); |
||||
// Reset the caching stream. The reads should start over from the
|
||||
// first slice.
|
||||
stream.Reset(); |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||
error = stream.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||
grpc_slice_unref_internal(output); |
||||
} |
||||
// Clean up.
|
||||
stream.Orphan(); |
||||
cache.Destroy(); |
||||
} |
||||
|
||||
TEST(CachingByteStream, SharedCache) { |
||||
ExecCtx exec_ctx; |
||||
// Create and populate slice buffer byte stream.
|
||||
grpc_slice_buffer buffer; |
||||
grpc_slice_buffer_init(&buffer); |
||||
grpc_slice input[] = { |
||||
grpc_slice_from_static_string("foo"), |
||||
grpc_slice_from_static_string("bar"), |
||||
}; |
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
grpc_slice_buffer_add(&buffer, input[i]); |
||||
} |
||||
SliceBufferByteStream underlying_stream(&buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&buffer); |
||||
// Create cache and two caching streams.
|
||||
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream))); |
||||
ByteStreamCache::CachingByteStream stream1(&cache); |
||||
ByteStreamCache::CachingByteStream stream2(&cache); |
||||
grpc_closure closure; |
||||
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Read one slice from stream1.
|
||||
EXPECT_TRUE(stream1.Next(~(size_t)0, &closure)); |
||||
grpc_slice output; |
||||
grpc_error_handle error = stream1.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[0], output)); |
||||
grpc_slice_unref_internal(output); |
||||
// Read all slices from stream2.
|
||||
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||
EXPECT_TRUE(stream2.Next(~(size_t)0, &closure)); |
||||
error = stream2.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||
grpc_slice_unref_internal(output); |
||||
} |
||||
// Now read the second slice from stream1.
|
||||
EXPECT_TRUE(stream1.Next(~(size_t)0, &closure)); |
||||
error = stream1.Pull(&output); |
||||
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||
EXPECT_TRUE(grpc_slice_eq(input[1], output)); |
||||
grpc_slice_unref_internal(output); |
||||
// Clean up.
|
||||
stream1.Orphan(); |
||||
stream2.Orphan(); |
||||
cache.Destroy(); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
grpc_init(); |
||||
int retval = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return retval; |
||||
} |
@ -1,379 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2021 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <functional> |
||||
#include <set> |
||||
#include <thread> |
||||
|
||||
#include <gmock/gmock.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/grpc_security.h> |
||||
#include <grpc/impl/codegen/grpc_types.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/string_util.h> |
||||
#include <grpc/support/time.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/backup_poller.h" |
||||
#include "src/core/ext/transport/chttp2/transport/flow_control.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gprpp/host_port.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "test/core/end2end/cq_verifier.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
namespace { |
||||
|
||||
class TransportTargetWindowSizeMocker |
||||
: public grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker { |
||||
public: |
||||
static constexpr uint32_t kLargeInitialWindowSize = 1u << 31; |
||||
static constexpr uint32_t kSmallInitialWindowSize = 0; |
||||
|
||||
double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate( |
||||
double /* current_target */) override { |
||||
// Protecting access to variable window_size_ shared between client and
|
||||
// server.
|
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (alternating_initial_window_sizes_) { |
||||
window_size_ = (window_size_ == kLargeInitialWindowSize) |
||||
? kSmallInitialWindowSize |
||||
: kLargeInitialWindowSize; |
||||
} |
||||
return window_size_; |
||||
} |
||||
|
||||
// Alternates the initial window size targets. Computes a low values if it was
|
||||
// previously high, or a high value if it was previously low.
|
||||
void AlternateTargetInitialWindowSizes() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
alternating_initial_window_sizes_ = true; |
||||
} |
||||
|
||||
void Reset() { |
||||
// Protecting access to variable window_size_ shared between client and
|
||||
// server.
|
||||
grpc_core::MutexLock lock(&mu_); |
||||
alternating_initial_window_sizes_ = false; |
||||
window_size_ = kLargeInitialWindowSize; |
||||
} |
||||
|
||||
private: |
||||
grpc_core::Mutex mu_; |
||||
bool alternating_initial_window_sizes_ ABSL_GUARDED_BY(mu_) = false; |
||||
double window_size_ ABSL_GUARDED_BY(mu_) = kLargeInitialWindowSize; |
||||
}; |
||||
|
||||
TransportTargetWindowSizeMocker* g_target_initial_window_size_mocker; |
||||
|
||||
void* tag(intptr_t t) { return reinterpret_cast<void*>(t); } |
||||
|
||||
void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) { |
||||
grpc_connectivity_state state = |
||||
grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */); |
||||
while (state != GRPC_CHANNEL_READY) { |
||||
grpc_channel_watch_connectivity_state( |
||||
channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr); |
||||
grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), |
||||
nullptr); |
||||
state = grpc_channel_check_connectivity_state(channel, 0); |
||||
} |
||||
} |
||||
|
||||
void VerifyChannelConnected(grpc_channel* channel, grpc_completion_queue* cq) { |
||||
// Verify channel is connected. Use a ping to make sure that clients
|
||||
// tries sending/receiving bytes if the channel is connected.
|
||||
grpc_channel_ping(channel, cq, reinterpret_cast<void*>(2000), nullptr); |
||||
grpc_event ev = grpc_completion_queue_next( |
||||
cq, grpc_timeout_seconds_to_deadline(5), nullptr); |
||||
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); |
||||
GPR_ASSERT(ev.tag == reinterpret_cast<void*>(2000)); |
||||
GPR_ASSERT(ev.success == 1); |
||||
GPR_ASSERT(grpc_channel_check_connectivity_state(channel, 0) == |
||||
GRPC_CHANNEL_READY); |
||||
} |
||||
|
||||
// Shuts down and destroys the server.
|
||||
void ServerShutdownAndDestroy(grpc_server* server, grpc_completion_queue* cq) { |
||||
// Shutdown and destroy server
|
||||
grpc_server_shutdown_and_notify(server, cq, reinterpret_cast<void*>(1000)); |
||||
while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), |
||||
nullptr) |
||||
.tag != reinterpret_cast<void*>(1000)) { |
||||
} |
||||
grpc_server_destroy(server); |
||||
} |
||||
|
||||
grpc_slice LargeSlice(void) { |
||||
grpc_slice slice = grpc_slice_malloc(10000000); // ~10MB
|
||||
memset(GRPC_SLICE_START_PTR(slice), 'x', GRPC_SLICE_LENGTH(slice)); |
||||
return slice; |
||||
} |
||||
|
||||
void PerformCallWithLargePayload(grpc_channel* channel, grpc_server* server, |
||||
grpc_completion_queue* cq) { |
||||
grpc_slice request_payload_slice = LargeSlice(); |
||||
grpc_slice response_payload_slice = LargeSlice(); |
||||
grpc_call* c; |
||||
grpc_call* s; |
||||
grpc_byte_buffer* request_payload = |
||||
grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
||||
grpc_byte_buffer* response_payload = |
||||
grpc_raw_byte_buffer_create(&response_payload_slice, 1); |
||||
cq_verifier* cqv = cq_verifier_create(cq); |
||||
grpc_op ops[6]; |
||||
grpc_op* op; |
||||
grpc_metadata_array initial_metadata_recv; |
||||
grpc_metadata_array trailing_metadata_recv; |
||||
grpc_metadata_array request_metadata_recv; |
||||
grpc_byte_buffer* request_payload_recv = nullptr; |
||||
grpc_byte_buffer* response_payload_recv = nullptr; |
||||
grpc_call_details call_details; |
||||
grpc_status_code status; |
||||
grpc_call_error error; |
||||
grpc_slice details; |
||||
int was_cancelled = 2; |
||||
|
||||
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30); |
||||
c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, |
||||
grpc_slice_from_static_string("/foo"), nullptr, |
||||
deadline, nullptr); |
||||
GPR_ASSERT(c); |
||||
|
||||
grpc_metadata_array_init(&initial_metadata_recv); |
||||
grpc_metadata_array_init(&trailing_metadata_recv); |
||||
grpc_metadata_array_init(&request_metadata_recv); |
||||
grpc_call_details_init(&call_details); |
||||
|
||||
memset(ops, 0, sizeof(ops)); |
||||
op = ops; |
||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
op->data.send_initial_metadata.count = 0; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_MESSAGE; |
||||
op->data.send_message.send_message = request_payload; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
||||
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_MESSAGE; |
||||
op->data.recv_message.recv_message = &response_payload_recv; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
||||
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; |
||||
op->data.recv_status_on_client.status = &status; |
||||
op->data.recv_status_on_client.status_details = &details; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1), |
||||
nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
|
||||
error = grpc_server_request_call(server, &s, &call_details, |
||||
&request_metadata_recv, cq, cq, tag(101)); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
CQ_EXPECT_COMPLETION(cqv, tag(101), 1); |
||||
cq_verify(cqv); |
||||
|
||||
memset(ops, 0, sizeof(ops)); |
||||
op = ops; |
||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
op->data.send_initial_metadata.count = 0; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_MESSAGE; |
||||
op->data.recv_message.recv_message = &request_payload_recv; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102), |
||||
nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
|
||||
CQ_EXPECT_COMPLETION(cqv, tag(102), 1); |
||||
cq_verify(cqv); |
||||
|
||||
memset(ops, 0, sizeof(ops)); |
||||
op = ops; |
||||
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
||||
op->data.recv_close_on_server.cancelled = &was_cancelled; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_MESSAGE; |
||||
op->data.send_message.send_message = response_payload; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
||||
op->data.send_status_from_server.trailing_metadata_count = 0; |
||||
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; |
||||
grpc_slice status_details = grpc_slice_from_static_string("xyz"); |
||||
op->data.send_status_from_server.status_details = &status_details; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103), |
||||
nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
|
||||
CQ_EXPECT_COMPLETION(cqv, tag(103), 1); |
||||
CQ_EXPECT_COMPLETION(cqv, tag(1), 1); |
||||
cq_verify(cqv); |
||||
|
||||
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); |
||||
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); |
||||
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); |
||||
GPR_ASSERT(was_cancelled == 0); |
||||
|
||||
grpc_slice_unref(details); |
||||
grpc_metadata_array_destroy(&initial_metadata_recv); |
||||
grpc_metadata_array_destroy(&trailing_metadata_recv); |
||||
grpc_metadata_array_destroy(&request_metadata_recv); |
||||
grpc_call_details_destroy(&call_details); |
||||
|
||||
grpc_call_unref(c); |
||||
grpc_call_unref(s); |
||||
|
||||
cq_verifier_destroy(cqv); |
||||
|
||||
grpc_byte_buffer_destroy(request_payload); |
||||
grpc_byte_buffer_destroy(response_payload); |
||||
grpc_byte_buffer_destroy(request_payload_recv); |
||||
grpc_byte_buffer_destroy(response_payload_recv); |
||||
grpc_slice_unref(request_payload_slice); |
||||
grpc_slice_unref(response_payload_slice); |
||||
} |
||||
|
||||
class FlowControlTest : public ::testing::Test { |
||||
protected: |
||||
void SetUp() override { |
||||
cq_ = grpc_completion_queue_create_for_next(nullptr); |
||||
// create the server
|
||||
std::string server_address = |
||||
grpc_core::JoinHostPort("localhost", grpc_pick_unused_port_or_die()); |
||||
grpc_arg server_args[] = { |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 0), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
||||
grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), |
||||
server_args}; |
||||
server_ = grpc_server_create(&server_channel_args, nullptr); |
||||
grpc_server_register_completion_queue(server_, cq_, nullptr); |
||||
grpc_server_credentials* server_creds = |
||||
grpc_insecure_server_credentials_create(); |
||||
GPR_ASSERT(grpc_server_add_http2_port(server_, server_address.c_str(), |
||||
server_creds)); |
||||
grpc_server_credentials_release(server_creds); |
||||
grpc_server_start(server_); |
||||
// create the channel (bdp pings are enabled by default)
|
||||
grpc_arg client_args[] = { |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 1), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
||||
grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), |
||||
client_args}; |
||||
grpc_channel_credentials* creds = grpc_insecure_credentials_create(); |
||||
channel_ = grpc_channel_create(server_address.c_str(), creds, |
||||
&client_channel_args); |
||||
grpc_channel_credentials_release(creds); |
||||
VerifyChannelReady(channel_, cq_); |
||||
g_target_initial_window_size_mocker->Reset(); |
||||
} |
||||
|
||||
void TearDown() override { |
||||
// shutdown and destroy the client and server
|
||||
grpc_channel_destroy(channel_); |
||||
ServerShutdownAndDestroy(server_, cq_); |
||||
grpc_completion_queue_shutdown(cq_); |
||||
while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME), |
||||
nullptr) |
||||
.type != GRPC_QUEUE_SHUTDOWN) { |
||||
} |
||||
grpc_completion_queue_destroy(cq_); |
||||
} |
||||
|
||||
grpc_server* server_ = nullptr; |
||||
grpc_channel* channel_ = nullptr; |
||||
grpc_completion_queue* cq_ = nullptr; |
||||
}; |
||||
|
||||
TEST_F(FlowControlTest, |
||||
TestLargeWindowSizeUpdatesDoNotCauseIllegalFlowControlWindows) { |
||||
for (int i = 0; i < 10; ++i) { |
||||
PerformCallWithLargePayload(channel_, server_, cq_); |
||||
VerifyChannelConnected(channel_, cq_); |
||||
} |
||||
} |
||||
|
||||
TEST_F(FlowControlTest, TestWindowSizeUpdatesDoNotCauseStalledStreams) { |
||||
g_target_initial_window_size_mocker->AlternateTargetInitialWindowSizes(); |
||||
for (int i = 0; i < 100; ++i) { |
||||
PerformCallWithLargePayload(channel_, server_, cq_); |
||||
VerifyChannelConnected(channel_, cq_); |
||||
} |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
// Make sure that we will have an active poller on all client-side fd's that
|
||||
// are capable of sending and receiving even in the case that we don't have an
|
||||
// active RPC operation on the fd.
|
||||
GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); |
||||
grpc_core::chttp2::g_test_only_transport_flow_control_window_check = true; |
||||
g_target_initial_window_size_mocker = new TransportTargetWindowSizeMocker(); |
||||
grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker = |
||||
g_target_initial_window_size_mocker; |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
grpc_init(); |
||||
auto result = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return result; |
||||
} |
@ -1,115 +1,380 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
/*
|
||||
* |
||||
* Copyright 2021 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/flow_control.h" |
||||
|
||||
#include <gtest/gtest.h> |
||||
#include <stdlib.h> |
||||
#include <string.h> |
||||
|
||||
#include <functional> |
||||
#include <set> |
||||
#include <thread> |
||||
|
||||
#include <gmock/gmock.h> |
||||
|
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/grpc_security.h> |
||||
#include <grpc/impl/codegen/grpc_types.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/string_util.h> |
||||
#include <grpc/support/time.h> |
||||
|
||||
namespace grpc_core { |
||||
namespace chttp2 { |
||||
#include "src/core/ext/filters/client_channel/backup_poller.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gprpp/host_port.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "test/core/end2end/cq_verifier.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
namespace { |
||||
auto* g_memory_owner = new MemoryOwner( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryOwner("test")); |
||||
} |
||||
|
||||
TEST(FlowControl, NoOp) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
// Check initial values are per http2 spec
|
||||
EXPECT_EQ(tfc.sent_init_window(), 65535); |
||||
EXPECT_EQ(tfc.acked_init_window(), 65535); |
||||
EXPECT_EQ(tfc.remote_window(), 65535); |
||||
EXPECT_EQ(tfc.target_frame_size(), 16384); |
||||
EXPECT_EQ(sfc.remote_window_delta(), 0); |
||||
EXPECT_EQ(sfc.min_progress_size(), 0); |
||||
EXPECT_EQ(sfc.local_window_delta(), 0); |
||||
EXPECT_EQ(sfc.announced_window_delta(), 0); |
||||
class TransportTargetWindowSizeMocker |
||||
: public grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker { |
||||
public: |
||||
static constexpr uint32_t kLargeInitialWindowSize = 1u << 31; |
||||
static constexpr uint32_t kSmallInitialWindowSize = 0; |
||||
|
||||
double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate( |
||||
double /* current_target */) override { |
||||
// Protecting access to variable window_size_ shared between client and
|
||||
// server.
|
||||
grpc_core::MutexLock lock(&mu_); |
||||
if (alternating_initial_window_sizes_) { |
||||
window_size_ = (window_size_ == kLargeInitialWindowSize) |
||||
? kSmallInitialWindowSize |
||||
: kLargeInitialWindowSize; |
||||
} |
||||
return window_size_; |
||||
} |
||||
|
||||
// Alternates the initial window size targets. Computes a low values if it was
|
||||
// previously high, or a high value if it was previously low.
|
||||
void AlternateTargetInitialWindowSizes() { |
||||
grpc_core::MutexLock lock(&mu_); |
||||
alternating_initial_window_sizes_ = true; |
||||
} |
||||
|
||||
void Reset() { |
||||
// Protecting access to variable window_size_ shared between client and
|
||||
// server.
|
||||
grpc_core::MutexLock lock(&mu_); |
||||
alternating_initial_window_sizes_ = false; |
||||
window_size_ = kLargeInitialWindowSize; |
||||
} |
||||
|
||||
private: |
||||
grpc_core::Mutex mu_; |
||||
bool alternating_initial_window_sizes_ ABSL_GUARDED_BY(mu_) = false; |
||||
double window_size_ ABSL_GUARDED_BY(mu_) = kLargeInitialWindowSize; |
||||
}; |
||||
|
||||
TransportTargetWindowSizeMocker* g_target_initial_window_size_mocker; |
||||
|
||||
void* tag(intptr_t t) { return reinterpret_cast<void*>(t); } |
||||
|
||||
void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) { |
||||
grpc_connectivity_state state = |
||||
grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */); |
||||
while (state != GRPC_CHANNEL_READY) { |
||||
grpc_channel_watch_connectivity_state( |
||||
channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr); |
||||
grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), |
||||
nullptr); |
||||
state = grpc_channel_check_connectivity_state(channel, 0); |
||||
} |
||||
} |
||||
|
||||
TEST(FlowControl, SendData) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
sfc.SentData(1024); |
||||
EXPECT_EQ(sfc.remote_window_delta(), -1024); |
||||
EXPECT_EQ(tfc.remote_window(), 65535 - 1024); |
||||
void VerifyChannelConnected(grpc_channel* channel, grpc_completion_queue* cq) { |
||||
// Verify channel is connected. Use a ping to make sure that clients
|
||||
// tries sending/receiving bytes if the channel is connected.
|
||||
grpc_channel_ping(channel, cq, reinterpret_cast<void*>(2000), nullptr); |
||||
grpc_event ev = grpc_completion_queue_next( |
||||
cq, grpc_timeout_seconds_to_deadline(5), nullptr); |
||||
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); |
||||
GPR_ASSERT(ev.tag == reinterpret_cast<void*>(2000)); |
||||
GPR_ASSERT(ev.success == 1); |
||||
GPR_ASSERT(grpc_channel_check_connectivity_state(channel, 0) == |
||||
GRPC_CHANNEL_READY); |
||||
} |
||||
|
||||
TEST(FlowControl, InitialTransportUpdate) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
EXPECT_EQ(tfc.MakeAction(), FlowControlAction()); |
||||
// Shuts down and destroys the server.
|
||||
void ServerShutdownAndDestroy(grpc_server* server, grpc_completion_queue* cq) { |
||||
// Shutdown and destroy server
|
||||
grpc_server_shutdown_and_notify(server, cq, reinterpret_cast<void*>(1000)); |
||||
while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), |
||||
nullptr) |
||||
.tag != reinterpret_cast<void*>(1000)) { |
||||
} |
||||
grpc_server_destroy(server); |
||||
} |
||||
|
||||
TEST(FlowControl, InitialStreamUpdate) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
EXPECT_EQ(sfc.MakeAction(), FlowControlAction()); |
||||
grpc_slice LargeSlice(void) { |
||||
grpc_slice slice = grpc_slice_malloc(10000000); // ~10MB
|
||||
memset(GRPC_SLICE_START_PTR(slice), 'x', GRPC_SLICE_LENGTH(slice)); |
||||
return slice; |
||||
} |
||||
|
||||
TEST(FlowControl, RecvData) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
EXPECT_EQ(absl::OkStatus(), sfc.RecvData(1024)); |
||||
EXPECT_EQ(tfc.announced_window(), 65535 - 1024); |
||||
EXPECT_EQ(sfc.local_window_delta(), -1024); |
||||
void PerformCallWithLargePayload(grpc_channel* channel, grpc_server* server, |
||||
grpc_completion_queue* cq) { |
||||
grpc_slice request_payload_slice = LargeSlice(); |
||||
grpc_slice response_payload_slice = LargeSlice(); |
||||
grpc_call* c; |
||||
grpc_call* s; |
||||
grpc_byte_buffer* request_payload = |
||||
grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
||||
grpc_byte_buffer* response_payload = |
||||
grpc_raw_byte_buffer_create(&response_payload_slice, 1); |
||||
cq_verifier* cqv = cq_verifier_create(cq); |
||||
grpc_op ops[6]; |
||||
grpc_op* op; |
||||
grpc_metadata_array initial_metadata_recv; |
||||
grpc_metadata_array trailing_metadata_recv; |
||||
grpc_metadata_array request_metadata_recv; |
||||
grpc_byte_buffer* request_payload_recv = nullptr; |
||||
grpc_byte_buffer* response_payload_recv = nullptr; |
||||
grpc_call_details call_details; |
||||
grpc_status_code status; |
||||
grpc_call_error error; |
||||
grpc_slice details; |
||||
int was_cancelled = 2; |
||||
|
||||
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30); |
||||
c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, |
||||
grpc_slice_from_static_string("/foo"), nullptr, |
||||
deadline, nullptr); |
||||
GPR_ASSERT(c); |
||||
|
||||
grpc_metadata_array_init(&initial_metadata_recv); |
||||
grpc_metadata_array_init(&trailing_metadata_recv); |
||||
grpc_metadata_array_init(&request_metadata_recv); |
||||
grpc_call_details_init(&call_details); |
||||
|
||||
memset(ops, 0, sizeof(ops)); |
||||
op = ops; |
||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
op->data.send_initial_metadata.count = 0; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_MESSAGE; |
||||
op->data.send_message.send_message = request_payload; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
||||
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_MESSAGE; |
||||
op->data.recv_message.recv_message = &response_payload_recv; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
||||
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; |
||||
op->data.recv_status_on_client.status = &status; |
||||
op->data.recv_status_on_client.status_details = &details; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1), |
||||
nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
|
||||
error = grpc_server_request_call(server, &s, &call_details, |
||||
&request_metadata_recv, cq, cq, tag(101)); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
CQ_EXPECT_COMPLETION(cqv, tag(101), 1); |
||||
cq_verify(cqv); |
||||
|
||||
memset(ops, 0, sizeof(ops)); |
||||
op = ops; |
||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
op->data.send_initial_metadata.count = 0; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_MESSAGE; |
||||
op->data.recv_message.recv_message = &request_payload_recv; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102), |
||||
nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
|
||||
CQ_EXPECT_COMPLETION(cqv, tag(102), 1); |
||||
cq_verify(cqv); |
||||
|
||||
memset(ops, 0, sizeof(ops)); |
||||
op = ops; |
||||
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
||||
op->data.recv_close_on_server.cancelled = &was_cancelled; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_MESSAGE; |
||||
op->data.send_message.send_message = response_payload; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
||||
op->data.send_status_from_server.trailing_metadata_count = 0; |
||||
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; |
||||
grpc_slice status_details = grpc_slice_from_static_string("xyz"); |
||||
op->data.send_status_from_server.status_details = &status_details; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103), |
||||
nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == error); |
||||
|
||||
CQ_EXPECT_COMPLETION(cqv, tag(103), 1); |
||||
CQ_EXPECT_COMPLETION(cqv, tag(1), 1); |
||||
cq_verify(cqv); |
||||
|
||||
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); |
||||
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); |
||||
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); |
||||
GPR_ASSERT(was_cancelled == 0); |
||||
|
||||
grpc_slice_unref(details); |
||||
grpc_metadata_array_destroy(&initial_metadata_recv); |
||||
grpc_metadata_array_destroy(&trailing_metadata_recv); |
||||
grpc_metadata_array_destroy(&request_metadata_recv); |
||||
grpc_call_details_destroy(&call_details); |
||||
|
||||
grpc_call_unref(c); |
||||
grpc_call_unref(s); |
||||
|
||||
cq_verifier_destroy(cqv); |
||||
|
||||
grpc_byte_buffer_destroy(request_payload); |
||||
grpc_byte_buffer_destroy(response_payload); |
||||
grpc_byte_buffer_destroy(request_payload_recv); |
||||
grpc_byte_buffer_destroy(response_payload_recv); |
||||
grpc_slice_unref(request_payload_slice); |
||||
grpc_slice_unref(response_payload_slice); |
||||
} |
||||
|
||||
TEST(FlowControl, TrackMinProgressSize) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
sfc.UpdateProgress(5); |
||||
EXPECT_EQ(sfc.min_progress_size(), 5); |
||||
sfc.UpdateProgress(10); |
||||
EXPECT_EQ(sfc.min_progress_size(), 10); |
||||
EXPECT_EQ(absl::OkStatus(), sfc.RecvData(5)); |
||||
EXPECT_EQ(sfc.min_progress_size(), 5); |
||||
EXPECT_EQ(absl::OkStatus(), sfc.RecvData(5)); |
||||
EXPECT_EQ(sfc.min_progress_size(), 0); |
||||
EXPECT_EQ(absl::OkStatus(), sfc.RecvData(5)); |
||||
EXPECT_EQ(sfc.min_progress_size(), 0); |
||||
class FlowControlTest : public ::testing::Test { |
||||
protected: |
||||
void SetUp() override { |
||||
cq_ = grpc_completion_queue_create_for_next(nullptr); |
||||
// create the server
|
||||
std::string server_address = |
||||
grpc_core::JoinHostPort("localhost", grpc_pick_unused_port_or_die()); |
||||
grpc_arg server_args[] = { |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 0), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
||||
grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), |
||||
server_args}; |
||||
server_ = grpc_server_create(&server_channel_args, nullptr); |
||||
grpc_server_register_completion_queue(server_, cq_, nullptr); |
||||
grpc_server_credentials* server_creds = |
||||
grpc_insecure_server_credentials_create(); |
||||
GPR_ASSERT(grpc_server_add_http2_port(server_, server_address.c_str(), |
||||
server_creds)); |
||||
grpc_server_credentials_release(server_creds); |
||||
grpc_server_start(server_); |
||||
// create the channel (bdp pings are enabled by default)
|
||||
grpc_arg client_args[] = { |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 1), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
||||
grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), |
||||
client_args}; |
||||
grpc_channel_credentials* creds = grpc_insecure_credentials_create(); |
||||
channel_ = grpc_channel_create(server_address.c_str(), creds, |
||||
&client_channel_args); |
||||
grpc_channel_credentials_release(creds); |
||||
VerifyChannelReady(channel_, cq_); |
||||
g_target_initial_window_size_mocker->Reset(); |
||||
} |
||||
|
||||
void TearDown() override { |
||||
// shutdown and destroy the client and server
|
||||
grpc_channel_destroy(channel_); |
||||
ServerShutdownAndDestroy(server_, cq_); |
||||
grpc_completion_queue_shutdown(cq_); |
||||
while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME), |
||||
nullptr) |
||||
.type != GRPC_QUEUE_SHUTDOWN) { |
||||
} |
||||
grpc_completion_queue_destroy(cq_); |
||||
} |
||||
|
||||
grpc_server* server_ = nullptr; |
||||
grpc_channel* channel_ = nullptr; |
||||
grpc_completion_queue* cq_ = nullptr; |
||||
}; |
||||
|
||||
TEST_F(FlowControlTest, |
||||
TestLargeWindowSizeUpdatesDoNotCauseIllegalFlowControlWindows) { |
||||
for (int i = 0; i < 10; ++i) { |
||||
PerformCallWithLargePayload(channel_, server_, cq_); |
||||
VerifyChannelConnected(channel_, cq_); |
||||
} |
||||
} |
||||
|
||||
TEST(FlowControl, NoUpdateWithoutReader) { |
||||
ExecCtx exec_ctx; |
||||
TransportFlowControl tfc("test", true, g_memory_owner); |
||||
StreamFlowControl sfc(&tfc); |
||||
for (int i = 0; i < 65535; i++) { |
||||
EXPECT_EQ(sfc.RecvData(1), absl::OkStatus()); |
||||
EXPECT_EQ(sfc.MakeAction().send_stream_update(), |
||||
FlowControlAction::Urgency::NO_ACTION_NEEDED); |
||||
TEST_F(FlowControlTest, TestWindowSizeUpdatesDoNotCauseStalledStreams) { |
||||
g_target_initial_window_size_mocker->AlternateTargetInitialWindowSizes(); |
||||
for (int i = 0; i < 100; ++i) { |
||||
PerformCallWithLargePayload(channel_, server_, cq_); |
||||
VerifyChannelConnected(channel_, cq_); |
||||
} |
||||
// Empty window needing 1 byte to progress should trigger an immediate read.
|
||||
sfc.UpdateProgress(1); |
||||
EXPECT_EQ(sfc.min_progress_size(), 1); |
||||
EXPECT_EQ(sfc.MakeAction().send_stream_update(), |
||||
FlowControlAction::Urgency::UPDATE_IMMEDIATELY); |
||||
EXPECT_GT(sfc.MaybeSendUpdate(), 0); |
||||
} |
||||
|
||||
} // namespace chttp2
|
||||
} // namespace grpc_core
|
||||
} // namespace
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
// Make sure that we will have an active poller on all client-side fd's that
|
||||
// are capable of sending and receiving even in the case that we don't have an
|
||||
// active RPC operation on the fd.
|
||||
GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); |
||||
grpc_core::chttp2::g_test_only_transport_flow_control_window_check = true; |
||||
g_target_initial_window_size_mocker = new TransportTargetWindowSizeMocker(); |
||||
grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker = |
||||
g_target_initial_window_size_mocker; |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
grpc_init(); |
||||
auto result = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return result; |
||||
} |
||||
|
Loading…
Reference in new issue