This reverts commit aacf0e252b
.
pull/29891/head
parent
32f1766cf5
commit
d53986657f
77 changed files with 3532 additions and 1704 deletions
@ -0,0 +1,165 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2015 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/lib/transport/byte_stream.h" |
||||||
|
|
||||||
|
#include <memory> |
||||||
|
#include <utility> |
||||||
|
|
||||||
|
#include <grpc/slice_buffer.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
|
||||||
|
#include "src/core/lib/slice/slice_internal.h" |
||||||
|
#include "src/core/lib/slice/slice_refcount.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
//
|
||||||
|
// SliceBufferByteStream
|
||||||
|
//
|
||||||
|
|
||||||
|
SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer, |
||||||
|
uint32_t flags) |
||||||
|
: ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) { |
||||||
|
GPR_ASSERT(slice_buffer->length <= UINT32_MAX); |
||||||
|
grpc_slice_buffer_init(&backing_buffer_); |
||||||
|
grpc_slice_buffer_swap(slice_buffer, &backing_buffer_); |
||||||
|
if (backing_buffer_.count == 0) { |
||||||
|
grpc_slice_buffer_add_indexed(&backing_buffer_, grpc_empty_slice()); |
||||||
|
GPR_ASSERT(backing_buffer_.count > 0); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
SliceBufferByteStream::~SliceBufferByteStream() {} |
||||||
|
|
||||||
|
void SliceBufferByteStream::Orphan() { |
||||||
|
grpc_slice_buffer_destroy_internal(&backing_buffer_); |
||||||
|
GRPC_ERROR_UNREF(shutdown_error_); |
||||||
|
shutdown_error_ = GRPC_ERROR_NONE; |
||||||
|
// Note: We do not actually delete the object here, since
|
||||||
|
// SliceBufferByteStream is usually allocated as part of a larger
|
||||||
|
// object and has an OrphanablePtr of itself passed down through the
|
||||||
|
// filter stack.
|
||||||
|
} |
||||||
|
|
||||||
|
bool SliceBufferByteStream::Next(size_t /*max_size_hint*/, |
||||||
|
grpc_closure* /*on_complete*/) { |
||||||
|
GPR_DEBUG_ASSERT(backing_buffer_.count > 0); |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
grpc_error_handle SliceBufferByteStream::Pull(grpc_slice* slice) { |
||||||
|
if (GPR_UNLIKELY(shutdown_error_ != GRPC_ERROR_NONE)) { |
||||||
|
return GRPC_ERROR_REF(shutdown_error_); |
||||||
|
} |
||||||
|
*slice = grpc_slice_buffer_take_first(&backing_buffer_); |
||||||
|
return GRPC_ERROR_NONE; |
||||||
|
} |
||||||
|
|
||||||
|
void SliceBufferByteStream::Shutdown(grpc_error_handle error) { |
||||||
|
GRPC_ERROR_UNREF(shutdown_error_); |
||||||
|
shutdown_error_ = error; |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// ByteStreamCache
|
||||||
|
//
|
||||||
|
|
||||||
|
ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream) |
||||||
|
: underlying_stream_(std::move(underlying_stream)), |
||||||
|
length_(underlying_stream_->length()), |
||||||
|
flags_(underlying_stream_->flags()) { |
||||||
|
grpc_slice_buffer_init(&cache_buffer_); |
||||||
|
} |
||||||
|
|
||||||
|
ByteStreamCache::~ByteStreamCache() { Destroy(); } |
||||||
|
|
||||||
|
void ByteStreamCache::Destroy() { |
||||||
|
underlying_stream_.reset(); |
||||||
|
if (cache_buffer_.length > 0) { |
||||||
|
grpc_slice_buffer_destroy_internal(&cache_buffer_); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// ByteStreamCache::CachingByteStream
|
||||||
|
//
|
||||||
|
|
||||||
|
ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache) |
||||||
|
: ByteStream(cache->length_, cache->flags_), cache_(cache) {} |
||||||
|
|
||||||
|
ByteStreamCache::CachingByteStream::~CachingByteStream() {} |
||||||
|
|
||||||
|
void ByteStreamCache::CachingByteStream::Orphan() { |
||||||
|
GRPC_ERROR_UNREF(shutdown_error_); |
||||||
|
shutdown_error_ = GRPC_ERROR_NONE; |
||||||
|
// Note: We do not actually delete the object here, since
|
||||||
|
// CachingByteStream is usually allocated as part of a larger
|
||||||
|
// object and has an OrphanablePtr of itself passed down through the
|
||||||
|
// filter stack.
|
||||||
|
} |
||||||
|
|
||||||
|
bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint, |
||||||
|
grpc_closure* on_complete) { |
||||||
|
if (shutdown_error_ != GRPC_ERROR_NONE) return true; |
||||||
|
if (cursor_ < cache_->cache_buffer_.count) return true; |
||||||
|
GPR_ASSERT(cache_->underlying_stream_ != nullptr); |
||||||
|
return cache_->underlying_stream_->Next(max_size_hint, on_complete); |
||||||
|
} |
||||||
|
|
||||||
|
grpc_error_handle ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) { |
||||||
|
if (shutdown_error_ != GRPC_ERROR_NONE) { |
||||||
|
return GRPC_ERROR_REF(shutdown_error_); |
||||||
|
} |
||||||
|
if (cursor_ < cache_->cache_buffer_.count) { |
||||||
|
*slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]); |
||||||
|
++cursor_; |
||||||
|
offset_ += GRPC_SLICE_LENGTH(*slice); |
||||||
|
return GRPC_ERROR_NONE; |
||||||
|
} |
||||||
|
GPR_ASSERT(cache_->underlying_stream_ != nullptr); |
||||||
|
grpc_error_handle error = cache_->underlying_stream_->Pull(slice); |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
grpc_slice_buffer_add(&cache_->cache_buffer_, |
||||||
|
grpc_slice_ref_internal(*slice)); |
||||||
|
++cursor_; |
||||||
|
offset_ += GRPC_SLICE_LENGTH(*slice); |
||||||
|
// Orphan the underlying stream if it's been drained.
|
||||||
|
if (offset_ == cache_->underlying_stream_->length()) { |
||||||
|
cache_->underlying_stream_.reset(); |
||||||
|
} |
||||||
|
} |
||||||
|
return error; |
||||||
|
} |
||||||
|
|
||||||
|
void ByteStreamCache::CachingByteStream::Shutdown(grpc_error_handle error) { |
||||||
|
GRPC_ERROR_UNREF(shutdown_error_); |
||||||
|
shutdown_error_ = GRPC_ERROR_REF(error); |
||||||
|
if (cache_->underlying_stream_ != nullptr) { |
||||||
|
cache_->underlying_stream_->Shutdown(error); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void ByteStreamCache::CachingByteStream::Reset() { |
||||||
|
cursor_ = 0; |
||||||
|
offset_ = 0; |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,170 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2015 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H |
||||||
|
#define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <stddef.h> |
||||||
|
#include <stdint.h> |
||||||
|
|
||||||
|
#include <grpc/slice.h> |
||||||
|
|
||||||
|
#include "src/core/lib/gprpp/orphanable.h" |
||||||
|
#include "src/core/lib/iomgr/closure.h" |
||||||
|
#include "src/core/lib/iomgr/error.h" |
||||||
|
|
||||||
|
/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
|
||||||
|
* compression for the message. (Does not apply for stream compression.) */ |
||||||
|
#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) |
||||||
|
/** Internal bit flag for determining whether the message was compressed and had
|
||||||
|
* to be decompressed by the message_decompress filter. (Does not apply for |
||||||
|
* stream compression.) */ |
||||||
|
#define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u) |
||||||
|
/** Mask of all valid internal flags. */ |
||||||
|
#define GRPC_WRITE_INTERNAL_USED_MASK \ |
||||||
|
(GRPC_WRITE_INTERNAL_COMPRESS | GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED) |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
class ByteStream : public Orphanable { |
||||||
|
public: |
||||||
|
~ByteStream() override {} |
||||||
|
|
||||||
|
// Returns true if the bytes are available immediately (in which case
|
||||||
|
// on_complete will not be called), or false if the bytes will be available
|
||||||
|
// asynchronously (in which case on_complete will be called when they
|
||||||
|
// are available). Should not be called if there is no data left on the
|
||||||
|
// stream.
|
||||||
|
//
|
||||||
|
// max_size_hint can be set as a hint as to the maximum number
|
||||||
|
// of bytes that would be acceptable to read.
|
||||||
|
virtual bool Next(size_t max_size_hint, grpc_closure* on_complete) = 0; |
||||||
|
|
||||||
|
// Returns the next slice in the byte stream when it is available, as
|
||||||
|
// indicated by Next().
|
||||||
|
//
|
||||||
|
// Once a slice is returned into *slice, it is owned by the caller.
|
||||||
|
virtual grpc_error_handle Pull(grpc_slice* slice) = 0; |
||||||
|
|
||||||
|
// Shuts down the byte stream.
|
||||||
|
//
|
||||||
|
// If there is a pending call to on_complete from Next(), it will be
|
||||||
|
// invoked with the error passed to Shutdown().
|
||||||
|
//
|
||||||
|
// The next call to Pull() (if any) will return the error passed to
|
||||||
|
// Shutdown().
|
||||||
|
virtual void Shutdown(grpc_error_handle error) = 0; |
||||||
|
|
||||||
|
uint32_t length() const { return length_; } |
||||||
|
uint32_t flags() const { return flags_; } |
||||||
|
|
||||||
|
void set_flags(uint32_t flags) { flags_ = flags; } |
||||||
|
|
||||||
|
protected: |
||||||
|
ByteStream(uint32_t length, uint32_t flags) |
||||||
|
: length_(length), flags_(flags) {} |
||||||
|
|
||||||
|
private: |
||||||
|
const uint32_t length_; |
||||||
|
uint32_t flags_; |
||||||
|
}; |
||||||
|
|
||||||
|
//
|
||||||
|
// SliceBufferByteStream
|
||||||
|
//
|
||||||
|
// A ByteStream that wraps a slice buffer.
|
||||||
|
//
|
||||||
|
|
||||||
|
class SliceBufferByteStream : public ByteStream { |
||||||
|
public: |
||||||
|
// Removes all slices in slice_buffer, leaving it empty.
|
||||||
|
SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags); |
||||||
|
|
||||||
|
~SliceBufferByteStream() override; |
||||||
|
|
||||||
|
void Orphan() override; |
||||||
|
|
||||||
|
bool Next(size_t max_size_hint, grpc_closure* on_complete) override; |
||||||
|
grpc_error_handle Pull(grpc_slice* slice) override; |
||||||
|
void Shutdown(grpc_error_handle error) override; |
||||||
|
|
||||||
|
private: |
||||||
|
grpc_error_handle shutdown_error_ = GRPC_ERROR_NONE; |
||||||
|
grpc_slice_buffer backing_buffer_; |
||||||
|
}; |
||||||
|
|
||||||
|
//
|
||||||
|
// CachingByteStream
|
||||||
|
//
|
||||||
|
// A ByteStream that that wraps an underlying byte stream but caches
|
||||||
|
// the resulting slices in a slice buffer. If an initial attempt fails
|
||||||
|
// without fully draining the underlying stream, a new caching stream
|
||||||
|
// can be created from the same underlying cache, in which case it will
|
||||||
|
// return whatever is in the backing buffer before continuing to read the
|
||||||
|
// underlying stream.
|
||||||
|
//
|
||||||
|
// NOTE: No synchronization is done, so it is not safe to have multiple
|
||||||
|
// CachingByteStreams simultaneously drawing from the same underlying
|
||||||
|
// ByteStreamCache at the same time.
|
||||||
|
//
|
||||||
|
|
||||||
|
class ByteStreamCache { |
||||||
|
public: |
||||||
|
class CachingByteStream : public ByteStream { |
||||||
|
public: |
||||||
|
explicit CachingByteStream(ByteStreamCache* cache); |
||||||
|
|
||||||
|
~CachingByteStream() override; |
||||||
|
|
||||||
|
void Orphan() override; |
||||||
|
|
||||||
|
bool Next(size_t max_size_hint, grpc_closure* on_complete) override; |
||||||
|
grpc_error_handle Pull(grpc_slice* slice) override; |
||||||
|
void Shutdown(grpc_error_handle error) override; |
||||||
|
|
||||||
|
// Resets the byte stream to the start of the underlying stream.
|
||||||
|
void Reset(); |
||||||
|
|
||||||
|
private: |
||||||
|
ByteStreamCache* cache_; |
||||||
|
size_t cursor_ = 0; |
||||||
|
size_t offset_ = 0; |
||||||
|
grpc_error_handle shutdown_error_ = GRPC_ERROR_NONE; |
||||||
|
}; |
||||||
|
|
||||||
|
explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream); |
||||||
|
|
||||||
|
~ByteStreamCache(); |
||||||
|
|
||||||
|
// Must not be destroyed while still in use by a CachingByteStream.
|
||||||
|
void Destroy(); |
||||||
|
|
||||||
|
grpc_slice_buffer* cache_buffer() { return &cache_buffer_; } |
||||||
|
|
||||||
|
private: |
||||||
|
OrphanablePtr<ByteStream> underlying_stream_; |
||||||
|
uint32_t length_; |
||||||
|
uint32_t flags_; |
||||||
|
grpc_slice_buffer cache_buffer_; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ |
@ -1,118 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2015 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <string.h> |
|
||||||
|
|
||||||
#include <grpc/grpc_security.h> |
|
||||||
#include <grpc/support/alloc.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
#include <grpc/support/sync.h> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/client_channel/client_channel.h" |
|
||||||
#include "src/core/ext/filters/http/server/http_server_filter.h" |
|
||||||
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" |
|
||||||
#include "src/core/lib/channel/connected_channel.h" |
|
||||||
#include "src/core/lib/gprpp/host_port.h" |
|
||||||
#include "src/core/lib/surface/channel.h" |
|
||||||
#include "src/core/lib/surface/server.h" |
|
||||||
#include "test/core/end2end/end2end_tests.h" |
|
||||||
#include "test/core/util/port.h" |
|
||||||
#include "test/core/util/test_config.h" |
|
||||||
|
|
||||||
struct fullstack_fixture_data { |
|
||||||
std::string localaddr; |
|
||||||
}; |
|
||||||
|
|
||||||
static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( |
|
||||||
const grpc_channel_args* /*client_args*/, |
|
||||||
const grpc_channel_args* /*server_args*/) { |
|
||||||
grpc_end2end_test_fixture f; |
|
||||||
int port = grpc_pick_unused_port_or_die(); |
|
||||||
fullstack_fixture_data* ffd = new fullstack_fixture_data(); |
|
||||||
memset(&f, 0, sizeof(f)); |
|
||||||
|
|
||||||
ffd->localaddr = grpc_core::JoinHostPort("localhost", port); |
|
||||||
|
|
||||||
f.fixture_data = ffd; |
|
||||||
f.cq = grpc_completion_queue_create_for_next(nullptr); |
|
||||||
|
|
||||||
return f; |
|
||||||
} |
|
||||||
|
|
||||||
void chttp2_init_client_fullstack(grpc_end2end_test_fixture* f, |
|
||||||
const grpc_channel_args* client_args) { |
|
||||||
fullstack_fixture_data* ffd = |
|
||||||
static_cast<fullstack_fixture_data*>(f->fixture_data); |
|
||||||
grpc_channel_credentials* creds = grpc_insecure_credentials_create(); |
|
||||||
grpc_arg no_retry = grpc_channel_arg_integer_create( |
|
||||||
const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0); |
|
||||||
client_args = grpc_channel_args_copy_and_add(client_args, &no_retry, 1); |
|
||||||
f->client = grpc_channel_create(ffd->localaddr.c_str(), creds, client_args); |
|
||||||
grpc_channel_args_destroy(client_args); |
|
||||||
grpc_channel_credentials_release(creds); |
|
||||||
GPR_ASSERT(f->client); |
|
||||||
} |
|
||||||
|
|
||||||
void chttp2_init_server_fullstack(grpc_end2end_test_fixture* f, |
|
||||||
const grpc_channel_args* server_args) { |
|
||||||
fullstack_fixture_data* ffd = |
|
||||||
static_cast<fullstack_fixture_data*>(f->fixture_data); |
|
||||||
if (f->server) { |
|
||||||
grpc_server_destroy(f->server); |
|
||||||
} |
|
||||||
f->server = grpc_server_create(server_args, nullptr); |
|
||||||
grpc_server_register_completion_queue(f->server, f->cq, nullptr); |
|
||||||
grpc_server_credentials* server_creds = |
|
||||||
grpc_insecure_server_credentials_create(); |
|
||||||
GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr.c_str(), |
|
||||||
server_creds)); |
|
||||||
grpc_server_credentials_release(server_creds); |
|
||||||
grpc_server_start(f->server); |
|
||||||
} |
|
||||||
|
|
||||||
void chttp2_tear_down_fullstack(grpc_end2end_test_fixture* f) { |
|
||||||
fullstack_fixture_data* ffd = |
|
||||||
static_cast<fullstack_fixture_data*>(f->fixture_data); |
|
||||||
delete ffd; |
|
||||||
} |
|
||||||
|
|
||||||
/* All test configurations */ |
|
||||||
static grpc_end2end_test_config configs[] = { |
|
||||||
{"chttp2/fullstack", |
|
||||||
FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | |
|
||||||
FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | |
|
||||||
FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, |
|
||||||
nullptr, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, |
|
||||||
chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, |
|
||||||
}; |
|
||||||
|
|
||||||
int main(int argc, char** argv) { |
|
||||||
size_t i; |
|
||||||
|
|
||||||
grpc::testing::TestEnvironment env(&argc, argv); |
|
||||||
grpc_end2end_tests_pre_init(); |
|
||||||
grpc_init(); |
|
||||||
|
|
||||||
for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { |
|
||||||
grpc_end2end_tests(argc, argv, configs[i]); |
|
||||||
} |
|
||||||
|
|
||||||
grpc_shutdown(); |
|
||||||
|
|
||||||
return 0; |
|
||||||
} |
|
@ -0,0 +1,100 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2017 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
/* Test of gpr synchronization support. */ |
||||||
|
|
||||||
|
#include "src/core/lib/gprpp/manual_constructor.h" |
||||||
|
|
||||||
|
#include <stdio.h> |
||||||
|
#include <stdlib.h> |
||||||
|
|
||||||
|
#include <cstring> |
||||||
|
|
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
#include <grpc/support/sync.h> |
||||||
|
|
||||||
|
#include "test/core/util/test_config.h" |
||||||
|
|
||||||
|
class A { |
||||||
|
public: |
||||||
|
A() {} |
||||||
|
virtual ~A() {} |
||||||
|
virtual const char* foo() { return "A_foo"; } |
||||||
|
virtual const char* bar() { return "A_bar"; } |
||||||
|
}; |
||||||
|
|
||||||
|
class B : public A { |
||||||
|
public: |
||||||
|
B() {} |
||||||
|
~B() override {} |
||||||
|
const char* foo() override { return "B_foo"; } |
||||||
|
char get_junk() { return junk[0]; } |
||||||
|
|
||||||
|
private: |
||||||
|
char junk[1000]; |
||||||
|
}; |
||||||
|
|
||||||
|
class C : public B { |
||||||
|
public: |
||||||
|
C() {} |
||||||
|
~C() override {} |
||||||
|
const char* bar() override { return "C_bar"; } |
||||||
|
char get_more_junk() { return more_junk[0]; } |
||||||
|
|
||||||
|
private: |
||||||
|
char more_junk[1000]; |
||||||
|
}; |
||||||
|
|
||||||
|
class D : public A { |
||||||
|
public: |
||||||
|
const char* bar() override { return "D_bar"; } |
||||||
|
}; |
||||||
|
|
||||||
|
static void basic_test() { |
||||||
|
grpc_core::PolymorphicManualConstructor<A, B> poly; |
||||||
|
poly.Init<B>(); |
||||||
|
GPR_ASSERT(!strcmp(poly->foo(), "B_foo")); |
||||||
|
GPR_ASSERT(!strcmp(poly->bar(), "A_bar")); |
||||||
|
} |
||||||
|
|
||||||
|
static void complex_test() { |
||||||
|
grpc_core::PolymorphicManualConstructor<A, B, C, D> polyB; |
||||||
|
polyB.Init<B>(); |
||||||
|
GPR_ASSERT(!strcmp(polyB->foo(), "B_foo")); |
||||||
|
GPR_ASSERT(!strcmp(polyB->bar(), "A_bar")); |
||||||
|
|
||||||
|
grpc_core::PolymorphicManualConstructor<A, B, C, D> polyC; |
||||||
|
polyC.Init<C>(); |
||||||
|
GPR_ASSERT(!strcmp(polyC->foo(), "B_foo")); |
||||||
|
GPR_ASSERT(!strcmp(polyC->bar(), "C_bar")); |
||||||
|
|
||||||
|
grpc_core::PolymorphicManualConstructor<A, B, C, D> polyD; |
||||||
|
polyD.Init<D>(); |
||||||
|
GPR_ASSERT(!strcmp(polyD->foo(), "A_foo")); |
||||||
|
GPR_ASSERT(!strcmp(polyD->bar(), "D_bar")); |
||||||
|
} |
||||||
|
|
||||||
|
/* ------------------------------------------------- */ |
||||||
|
|
||||||
|
int main(int argc, char* argv[]) { |
||||||
|
grpc::testing::TestEnvironment env(&argc, argv); |
||||||
|
basic_test(); |
||||||
|
complex_test(); |
||||||
|
return 0; |
||||||
|
} |
@ -0,0 +1,254 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2017 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include "src/core/lib/transport/byte_stream.h" |
||||||
|
|
||||||
|
#include <gtest/gtest.h> |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
|
||||||
|
#include "src/core/lib/gpr/useful.h" |
||||||
|
#include "src/core/lib/iomgr/exec_ctx.h" |
||||||
|
#include "src/core/lib/slice/slice_internal.h" |
||||||
|
#include "test/core/util/test_config.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
namespace { |
||||||
|
|
||||||
|
//
|
||||||
|
// SliceBufferByteStream tests
|
||||||
|
//
|
||||||
|
|
||||||
|
void NotCalledClosure(void* /*arg*/, grpc_error_handle /*error*/) { |
||||||
|
GPR_ASSERT(false); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(SliceBufferByteStream, Basic) { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
// Create and populate slice buffer.
|
||||||
|
grpc_slice_buffer buffer; |
||||||
|
grpc_slice_buffer_init(&buffer); |
||||||
|
grpc_slice input[] = { |
||||||
|
grpc_slice_from_static_string("foo"), |
||||||
|
grpc_slice_from_static_string("bar"), |
||||||
|
}; |
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||||
|
grpc_slice_buffer_add(&buffer, input[i]); |
||||||
|
} |
||||||
|
// Create byte stream.
|
||||||
|
SliceBufferByteStream stream(&buffer, 0); |
||||||
|
grpc_slice_buffer_destroy_internal(&buffer); |
||||||
|
EXPECT_EQ(6U, stream.length()); |
||||||
|
grpc_closure closure; |
||||||
|
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
// Read each slice. Note that Next() always returns synchronously.
|
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||||
|
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||||
|
grpc_slice output; |
||||||
|
grpc_error_handle error = stream.Pull(&output); |
||||||
|
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||||
|
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||||
|
grpc_slice_unref_internal(output); |
||||||
|
} |
||||||
|
// Clean up.
|
||||||
|
stream.Orphan(); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(SliceBufferByteStream, Shutdown) { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
// Create and populate slice buffer.
|
||||||
|
grpc_slice_buffer buffer; |
||||||
|
grpc_slice_buffer_init(&buffer); |
||||||
|
grpc_slice input[] = { |
||||||
|
grpc_slice_from_static_string("foo"), |
||||||
|
grpc_slice_from_static_string("bar"), |
||||||
|
}; |
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||||
|
grpc_slice_buffer_add(&buffer, input[i]); |
||||||
|
} |
||||||
|
// Create byte stream.
|
||||||
|
SliceBufferByteStream stream(&buffer, 0); |
||||||
|
grpc_slice_buffer_destroy_internal(&buffer); |
||||||
|
EXPECT_EQ(6U, stream.length()); |
||||||
|
grpc_closure closure; |
||||||
|
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
// Read the first slice.
|
||||||
|
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||||
|
grpc_slice output; |
||||||
|
grpc_error_handle error = stream.Pull(&output); |
||||||
|
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||||
|
EXPECT_TRUE(grpc_slice_eq(input[0], output)); |
||||||
|
grpc_slice_unref_internal(output); |
||||||
|
// Now shutdown.
|
||||||
|
grpc_error_handle shutdown_error = |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error"); |
||||||
|
stream.Shutdown(GRPC_ERROR_REF(shutdown_error)); |
||||||
|
// After shutdown, the next pull() should return the error.
|
||||||
|
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||||
|
error = stream.Pull(&output); |
||||||
|
EXPECT_TRUE(error == shutdown_error); |
||||||
|
GRPC_ERROR_UNREF(error); |
||||||
|
GRPC_ERROR_UNREF(shutdown_error); |
||||||
|
// Clean up.
|
||||||
|
stream.Orphan(); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// CachingByteStream tests
|
||||||
|
//
|
||||||
|
|
||||||
|
TEST(CachingByteStream, Basic) { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
// Create and populate slice buffer byte stream.
|
||||||
|
grpc_slice_buffer buffer; |
||||||
|
grpc_slice_buffer_init(&buffer); |
||||||
|
grpc_slice input[] = { |
||||||
|
grpc_slice_from_static_string("foo"), |
||||||
|
grpc_slice_from_static_string("bar"), |
||||||
|
}; |
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||||
|
grpc_slice_buffer_add(&buffer, input[i]); |
||||||
|
} |
||||||
|
SliceBufferByteStream underlying_stream(&buffer, 0); |
||||||
|
grpc_slice_buffer_destroy_internal(&buffer); |
||||||
|
// Create cache and caching stream.
|
||||||
|
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream))); |
||||||
|
ByteStreamCache::CachingByteStream stream(&cache); |
||||||
|
grpc_closure closure; |
||||||
|
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
// Read each slice. Note that next() always returns synchronously,
|
||||||
|
// because the underlying byte stream always does.
|
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||||
|
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||||
|
grpc_slice output; |
||||||
|
grpc_error_handle error = stream.Pull(&output); |
||||||
|
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||||
|
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||||
|
grpc_slice_unref_internal(output); |
||||||
|
} |
||||||
|
// Clean up.
|
||||||
|
stream.Orphan(); |
||||||
|
cache.Destroy(); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(CachingByteStream, Reset) { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
// Create and populate slice buffer byte stream.
|
||||||
|
grpc_slice_buffer buffer; |
||||||
|
grpc_slice_buffer_init(&buffer); |
||||||
|
grpc_slice input[] = { |
||||||
|
grpc_slice_from_static_string("foo"), |
||||||
|
grpc_slice_from_static_string("bar"), |
||||||
|
}; |
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||||
|
grpc_slice_buffer_add(&buffer, input[i]); |
||||||
|
} |
||||||
|
SliceBufferByteStream underlying_stream(&buffer, 0); |
||||||
|
grpc_slice_buffer_destroy_internal(&buffer); |
||||||
|
// Create cache and caching stream.
|
||||||
|
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream))); |
||||||
|
ByteStreamCache::CachingByteStream stream(&cache); |
||||||
|
grpc_closure closure; |
||||||
|
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
// Read one slice.
|
||||||
|
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||||
|
grpc_slice output; |
||||||
|
grpc_error_handle error = stream.Pull(&output); |
||||||
|
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||||
|
EXPECT_TRUE(grpc_slice_eq(input[0], output)); |
||||||
|
grpc_slice_unref_internal(output); |
||||||
|
// Reset the caching stream. The reads should start over from the
|
||||||
|
// first slice.
|
||||||
|
stream.Reset(); |
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||||
|
ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); |
||||||
|
error = stream.Pull(&output); |
||||||
|
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||||
|
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||||
|
grpc_slice_unref_internal(output); |
||||||
|
} |
||||||
|
// Clean up.
|
||||||
|
stream.Orphan(); |
||||||
|
cache.Destroy(); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(CachingByteStream, SharedCache) { |
||||||
|
ExecCtx exec_ctx; |
||||||
|
// Create and populate slice buffer byte stream.
|
||||||
|
grpc_slice_buffer buffer; |
||||||
|
grpc_slice_buffer_init(&buffer); |
||||||
|
grpc_slice input[] = { |
||||||
|
grpc_slice_from_static_string("foo"), |
||||||
|
grpc_slice_from_static_string("bar"), |
||||||
|
}; |
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||||
|
grpc_slice_buffer_add(&buffer, input[i]); |
||||||
|
} |
||||||
|
SliceBufferByteStream underlying_stream(&buffer, 0); |
||||||
|
grpc_slice_buffer_destroy_internal(&buffer); |
||||||
|
// Create cache and two caching streams.
|
||||||
|
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream))); |
||||||
|
ByteStreamCache::CachingByteStream stream1(&cache); |
||||||
|
ByteStreamCache::CachingByteStream stream2(&cache); |
||||||
|
grpc_closure closure; |
||||||
|
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
// Read one slice from stream1.
|
||||||
|
EXPECT_TRUE(stream1.Next(~(size_t)0, &closure)); |
||||||
|
grpc_slice output; |
||||||
|
grpc_error_handle error = stream1.Pull(&output); |
||||||
|
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||||
|
EXPECT_TRUE(grpc_slice_eq(input[0], output)); |
||||||
|
grpc_slice_unref_internal(output); |
||||||
|
// Read all slices from stream2.
|
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { |
||||||
|
EXPECT_TRUE(stream2.Next(~(size_t)0, &closure)); |
||||||
|
error = stream2.Pull(&output); |
||||||
|
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||||
|
EXPECT_TRUE(grpc_slice_eq(input[i], output)); |
||||||
|
grpc_slice_unref_internal(output); |
||||||
|
} |
||||||
|
// Now read the second slice from stream1.
|
||||||
|
EXPECT_TRUE(stream1.Next(~(size_t)0, &closure)); |
||||||
|
error = stream1.Pull(&output); |
||||||
|
EXPECT_TRUE(error == GRPC_ERROR_NONE); |
||||||
|
EXPECT_TRUE(grpc_slice_eq(input[1], output)); |
||||||
|
grpc_slice_unref_internal(output); |
||||||
|
// Clean up.
|
||||||
|
stream1.Orphan(); |
||||||
|
stream2.Orphan(); |
||||||
|
cache.Destroy(); |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
grpc::testing::TestEnvironment env(&argc, argv); |
||||||
|
::testing::InitGoogleTest(&argc, argv); |
||||||
|
grpc_init(); |
||||||
|
int retval = RUN_ALL_TESTS(); |
||||||
|
grpc_shutdown(); |
||||||
|
return retval; |
||||||
|
} |
@ -1,379 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2021 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <stdlib.h> |
|
||||||
#include <string.h> |
|
||||||
|
|
||||||
#include <functional> |
|
||||||
#include <set> |
|
||||||
#include <thread> |
|
||||||
|
|
||||||
#include <gmock/gmock.h> |
|
||||||
|
|
||||||
#include <grpc/grpc.h> |
|
||||||
#include <grpc/grpc_security.h> |
|
||||||
#include <grpc/impl/codegen/grpc_types.h> |
|
||||||
#include <grpc/slice.h> |
|
||||||
#include <grpc/support/alloc.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
#include <grpc/support/string_util.h> |
|
||||||
#include <grpc/support/time.h> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/client_channel/backup_poller.h" |
|
||||||
#include "src/core/ext/transport/chttp2/transport/flow_control.h" |
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/gprpp/host_port.h" |
|
||||||
#include "src/core/lib/surface/channel.h" |
|
||||||
#include "test/core/end2end/cq_verifier.h" |
|
||||||
#include "test/core/util/port.h" |
|
||||||
#include "test/core/util/test_config.h" |
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
class TransportTargetWindowSizeMocker |
|
||||||
: public grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker { |
|
||||||
public: |
|
||||||
static constexpr uint32_t kLargeInitialWindowSize = 1u << 31; |
|
||||||
static constexpr uint32_t kSmallInitialWindowSize = 0; |
|
||||||
|
|
||||||
double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate( |
|
||||||
double /* current_target */) override { |
|
||||||
// Protecting access to variable window_size_ shared between client and
|
|
||||||
// server.
|
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
if (alternating_initial_window_sizes_) { |
|
||||||
window_size_ = (window_size_ == kLargeInitialWindowSize) |
|
||||||
? kSmallInitialWindowSize |
|
||||||
: kLargeInitialWindowSize; |
|
||||||
} |
|
||||||
return window_size_; |
|
||||||
} |
|
||||||
|
|
||||||
// Alternates the initial window size targets. Computes a low values if it was
|
|
||||||
// previously high, or a high value if it was previously low.
|
|
||||||
void AlternateTargetInitialWindowSizes() { |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
alternating_initial_window_sizes_ = true; |
|
||||||
} |
|
||||||
|
|
||||||
void Reset() { |
|
||||||
// Protecting access to variable window_size_ shared between client and
|
|
||||||
// server.
|
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
alternating_initial_window_sizes_ = false; |
|
||||||
window_size_ = kLargeInitialWindowSize; |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
grpc_core::Mutex mu_; |
|
||||||
bool alternating_initial_window_sizes_ ABSL_GUARDED_BY(mu_) = false; |
|
||||||
double window_size_ ABSL_GUARDED_BY(mu_) = kLargeInitialWindowSize; |
|
||||||
}; |
|
||||||
|
|
||||||
TransportTargetWindowSizeMocker* g_target_initial_window_size_mocker; |
|
||||||
|
|
||||||
void* tag(intptr_t t) { return reinterpret_cast<void*>(t); } |
|
||||||
|
|
||||||
void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) { |
|
||||||
grpc_connectivity_state state = |
|
||||||
grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */); |
|
||||||
while (state != GRPC_CHANNEL_READY) { |
|
||||||
grpc_channel_watch_connectivity_state( |
|
||||||
channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr); |
|
||||||
grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), |
|
||||||
nullptr); |
|
||||||
state = grpc_channel_check_connectivity_state(channel, 0); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void VerifyChannelConnected(grpc_channel* channel, grpc_completion_queue* cq) { |
|
||||||
// Verify channel is connected. Use a ping to make sure that clients
|
|
||||||
// tries sending/receiving bytes if the channel is connected.
|
|
||||||
grpc_channel_ping(channel, cq, reinterpret_cast<void*>(2000), nullptr); |
|
||||||
grpc_event ev = grpc_completion_queue_next( |
|
||||||
cq, grpc_timeout_seconds_to_deadline(5), nullptr); |
|
||||||
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); |
|
||||||
GPR_ASSERT(ev.tag == reinterpret_cast<void*>(2000)); |
|
||||||
GPR_ASSERT(ev.success == 1); |
|
||||||
GPR_ASSERT(grpc_channel_check_connectivity_state(channel, 0) == |
|
||||||
GRPC_CHANNEL_READY); |
|
||||||
} |
|
||||||
|
|
||||||
// Shuts down and destroys the server.
|
|
||||||
void ServerShutdownAndDestroy(grpc_server* server, grpc_completion_queue* cq) { |
|
||||||
// Shutdown and destroy server
|
|
||||||
grpc_server_shutdown_and_notify(server, cq, reinterpret_cast<void*>(1000)); |
|
||||||
while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), |
|
||||||
nullptr) |
|
||||||
.tag != reinterpret_cast<void*>(1000)) { |
|
||||||
} |
|
||||||
grpc_server_destroy(server); |
|
||||||
} |
|
||||||
|
|
||||||
grpc_slice LargeSlice(void) { |
|
||||||
grpc_slice slice = grpc_slice_malloc(10000000); // ~10MB
|
|
||||||
memset(GRPC_SLICE_START_PTR(slice), 'x', GRPC_SLICE_LENGTH(slice)); |
|
||||||
return slice; |
|
||||||
} |
|
||||||
|
|
||||||
void PerformCallWithLargePayload(grpc_channel* channel, grpc_server* server, |
|
||||||
grpc_completion_queue* cq) { |
|
||||||
grpc_slice request_payload_slice = LargeSlice(); |
|
||||||
grpc_slice response_payload_slice = LargeSlice(); |
|
||||||
grpc_call* c; |
|
||||||
grpc_call* s; |
|
||||||
grpc_byte_buffer* request_payload = |
|
||||||
grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
|
||||||
grpc_byte_buffer* response_payload = |
|
||||||
grpc_raw_byte_buffer_create(&response_payload_slice, 1); |
|
||||||
cq_verifier* cqv = cq_verifier_create(cq); |
|
||||||
grpc_op ops[6]; |
|
||||||
grpc_op* op; |
|
||||||
grpc_metadata_array initial_metadata_recv; |
|
||||||
grpc_metadata_array trailing_metadata_recv; |
|
||||||
grpc_metadata_array request_metadata_recv; |
|
||||||
grpc_byte_buffer* request_payload_recv = nullptr; |
|
||||||
grpc_byte_buffer* response_payload_recv = nullptr; |
|
||||||
grpc_call_details call_details; |
|
||||||
grpc_status_code status; |
|
||||||
grpc_call_error error; |
|
||||||
grpc_slice details; |
|
||||||
int was_cancelled = 2; |
|
||||||
|
|
||||||
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30); |
|
||||||
c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, |
|
||||||
grpc_slice_from_static_string("/foo"), nullptr, |
|
||||||
deadline, nullptr); |
|
||||||
GPR_ASSERT(c); |
|
||||||
|
|
||||||
grpc_metadata_array_init(&initial_metadata_recv); |
|
||||||
grpc_metadata_array_init(&trailing_metadata_recv); |
|
||||||
grpc_metadata_array_init(&request_metadata_recv); |
|
||||||
grpc_call_details_init(&call_details); |
|
||||||
|
|
||||||
memset(ops, 0, sizeof(ops)); |
|
||||||
op = ops; |
|
||||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
|
||||||
op->data.send_initial_metadata.count = 0; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
op->op = GRPC_OP_SEND_MESSAGE; |
|
||||||
op->data.send_message.send_message = request_payload; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
|
||||||
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
op->op = GRPC_OP_RECV_MESSAGE; |
|
||||||
op->data.recv_message.recv_message = &response_payload_recv; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
|
||||||
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; |
|
||||||
op->data.recv_status_on_client.status = &status; |
|
||||||
op->data.recv_status_on_client.status_details = &details; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1), |
|
||||||
nullptr); |
|
||||||
GPR_ASSERT(GRPC_CALL_OK == error); |
|
||||||
|
|
||||||
error = grpc_server_request_call(server, &s, &call_details, |
|
||||||
&request_metadata_recv, cq, cq, tag(101)); |
|
||||||
GPR_ASSERT(GRPC_CALL_OK == error); |
|
||||||
CQ_EXPECT_COMPLETION(cqv, tag(101), 1); |
|
||||||
cq_verify(cqv); |
|
||||||
|
|
||||||
memset(ops, 0, sizeof(ops)); |
|
||||||
op = ops; |
|
||||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
|
||||||
op->data.send_initial_metadata.count = 0; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
op->op = GRPC_OP_RECV_MESSAGE; |
|
||||||
op->data.recv_message.recv_message = &request_payload_recv; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102), |
|
||||||
nullptr); |
|
||||||
GPR_ASSERT(GRPC_CALL_OK == error); |
|
||||||
|
|
||||||
CQ_EXPECT_COMPLETION(cqv, tag(102), 1); |
|
||||||
cq_verify(cqv); |
|
||||||
|
|
||||||
memset(ops, 0, sizeof(ops)); |
|
||||||
op = ops; |
|
||||||
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
|
||||||
op->data.recv_close_on_server.cancelled = &was_cancelled; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
op->op = GRPC_OP_SEND_MESSAGE; |
|
||||||
op->data.send_message.send_message = response_payload; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
|
||||||
op->data.send_status_from_server.trailing_metadata_count = 0; |
|
||||||
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; |
|
||||||
grpc_slice status_details = grpc_slice_from_static_string("xyz"); |
|
||||||
op->data.send_status_from_server.status_details = &status_details; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103), |
|
||||||
nullptr); |
|
||||||
GPR_ASSERT(GRPC_CALL_OK == error); |
|
||||||
|
|
||||||
CQ_EXPECT_COMPLETION(cqv, tag(103), 1); |
|
||||||
CQ_EXPECT_COMPLETION(cqv, tag(1), 1); |
|
||||||
cq_verify(cqv); |
|
||||||
|
|
||||||
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); |
|
||||||
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); |
|
||||||
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); |
|
||||||
GPR_ASSERT(was_cancelled == 0); |
|
||||||
|
|
||||||
grpc_slice_unref(details); |
|
||||||
grpc_metadata_array_destroy(&initial_metadata_recv); |
|
||||||
grpc_metadata_array_destroy(&trailing_metadata_recv); |
|
||||||
grpc_metadata_array_destroy(&request_metadata_recv); |
|
||||||
grpc_call_details_destroy(&call_details); |
|
||||||
|
|
||||||
grpc_call_unref(c); |
|
||||||
grpc_call_unref(s); |
|
||||||
|
|
||||||
cq_verifier_destroy(cqv); |
|
||||||
|
|
||||||
grpc_byte_buffer_destroy(request_payload); |
|
||||||
grpc_byte_buffer_destroy(response_payload); |
|
||||||
grpc_byte_buffer_destroy(request_payload_recv); |
|
||||||
grpc_byte_buffer_destroy(response_payload_recv); |
|
||||||
grpc_slice_unref(request_payload_slice); |
|
||||||
grpc_slice_unref(response_payload_slice); |
|
||||||
} |
|
||||||
|
|
||||||
class FlowControlTest : public ::testing::Test { |
|
||||||
protected: |
|
||||||
void SetUp() override { |
|
||||||
cq_ = grpc_completion_queue_create_for_next(nullptr); |
|
||||||
// create the server
|
|
||||||
std::string server_address = |
|
||||||
grpc_core::JoinHostPort("localhost", grpc_pick_unused_port_or_die()); |
|
||||||
grpc_arg server_args[] = { |
|
||||||
grpc_channel_arg_integer_create( |
|
||||||
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 0), |
|
||||||
grpc_channel_arg_integer_create( |
|
||||||
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
|
||||||
grpc_channel_arg_integer_create( |
|
||||||
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
|
||||||
grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), |
|
||||||
server_args}; |
|
||||||
server_ = grpc_server_create(&server_channel_args, nullptr); |
|
||||||
grpc_server_register_completion_queue(server_, cq_, nullptr); |
|
||||||
grpc_server_credentials* server_creds = |
|
||||||
grpc_insecure_server_credentials_create(); |
|
||||||
GPR_ASSERT(grpc_server_add_http2_port(server_, server_address.c_str(), |
|
||||||
server_creds)); |
|
||||||
grpc_server_credentials_release(server_creds); |
|
||||||
grpc_server_start(server_); |
|
||||||
// create the channel (bdp pings are enabled by default)
|
|
||||||
grpc_arg client_args[] = { |
|
||||||
grpc_channel_arg_integer_create( |
|
||||||
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), |
|
||||||
grpc_channel_arg_integer_create( |
|
||||||
const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 1), |
|
||||||
grpc_channel_arg_integer_create( |
|
||||||
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
|
||||||
grpc_channel_arg_integer_create( |
|
||||||
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
|
||||||
grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), |
|
||||||
client_args}; |
|
||||||
grpc_channel_credentials* creds = grpc_insecure_credentials_create(); |
|
||||||
channel_ = grpc_channel_create(server_address.c_str(), creds, |
|
||||||
&client_channel_args); |
|
||||||
grpc_channel_credentials_release(creds); |
|
||||||
VerifyChannelReady(channel_, cq_); |
|
||||||
g_target_initial_window_size_mocker->Reset(); |
|
||||||
} |
|
||||||
|
|
||||||
void TearDown() override { |
|
||||||
// shutdown and destroy the client and server
|
|
||||||
grpc_channel_destroy(channel_); |
|
||||||
ServerShutdownAndDestroy(server_, cq_); |
|
||||||
grpc_completion_queue_shutdown(cq_); |
|
||||||
while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME), |
|
||||||
nullptr) |
|
||||||
.type != GRPC_QUEUE_SHUTDOWN) { |
|
||||||
} |
|
||||||
grpc_completion_queue_destroy(cq_); |
|
||||||
} |
|
||||||
|
|
||||||
grpc_server* server_ = nullptr; |
|
||||||
grpc_channel* channel_ = nullptr; |
|
||||||
grpc_completion_queue* cq_ = nullptr; |
|
||||||
}; |
|
||||||
|
|
||||||
TEST_F(FlowControlTest, |
|
||||||
TestLargeWindowSizeUpdatesDoNotCauseIllegalFlowControlWindows) { |
|
||||||
for (int i = 0; i < 10; ++i) { |
|
||||||
PerformCallWithLargePayload(channel_, server_, cq_); |
|
||||||
VerifyChannelConnected(channel_, cq_); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
TEST_F(FlowControlTest, TestWindowSizeUpdatesDoNotCauseStalledStreams) { |
|
||||||
g_target_initial_window_size_mocker->AlternateTargetInitialWindowSizes(); |
|
||||||
for (int i = 0; i < 100; ++i) { |
|
||||||
PerformCallWithLargePayload(channel_, server_, cq_); |
|
||||||
VerifyChannelConnected(channel_, cq_); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
int main(int argc, char** argv) { |
|
||||||
::testing::InitGoogleTest(&argc, argv); |
|
||||||
// Make sure that we will have an active poller on all client-side fd's that
|
|
||||||
// are capable of sending and receiving even in the case that we don't have an
|
|
||||||
// active RPC operation on the fd.
|
|
||||||
GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); |
|
||||||
grpc_core::chttp2::g_test_only_transport_flow_control_window_check = true; |
|
||||||
g_target_initial_window_size_mocker = new TransportTargetWindowSizeMocker(); |
|
||||||
grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker = |
|
||||||
g_target_initial_window_size_mocker; |
|
||||||
grpc::testing::TestEnvironment env(&argc, argv); |
|
||||||
grpc_init(); |
|
||||||
auto result = RUN_ALL_TESTS(); |
|
||||||
grpc_shutdown(); |
|
||||||
return result; |
|
||||||
} |
|
@ -1,115 +1,380 @@ |
|||||||
// Copyright 2022 gRPC authors.
|
/*
|
||||||
//
|
* |
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
* Copyright 2021 gRPC authors. |
||||||
// you may not use this file except in compliance with the License.
|
* |
||||||
// You may obtain a copy of the License at
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
//
|
* you may not use this file except in compliance with the License. |
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
* You may obtain a copy of the License at |
||||||
//
|
* |
||||||
// Unless required by applicable law or agreed to in writing, software
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
* |
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* Unless required by applicable law or agreed to in writing, software |
||||||
// See the License for the specific language governing permissions and
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
// limitations under the License.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
#include "src/core/ext/transport/chttp2/transport/flow_control.h" |
#include "src/core/ext/transport/chttp2/transport/flow_control.h" |
||||||
|
|
||||||
#include <gtest/gtest.h> |
#include <stdlib.h> |
||||||
|
#include <string.h> |
||||||
|
|
||||||
|
#include <functional> |
||||||
|
#include <set> |
||||||
|
#include <thread> |
||||||
|
|
||||||
|
#include <gmock/gmock.h> |
||||||
|
|
||||||
#include "src/core/lib/iomgr/exec_ctx.h" |
#include <grpc/grpc.h> |
||||||
#include "src/core/lib/resource_quota/resource_quota.h" |
#include <grpc/grpc_security.h> |
||||||
|
#include <grpc/impl/codegen/grpc_types.h> |
||||||
|
#include <grpc/slice.h> |
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
#include <grpc/support/string_util.h> |
||||||
|
#include <grpc/support/time.h> |
||||||
|
|
||||||
namespace grpc_core { |
#include "src/core/ext/filters/client_channel/backup_poller.h" |
||||||
namespace chttp2 { |
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/gprpp/host_port.h" |
||||||
|
#include "src/core/lib/surface/channel.h" |
||||||
|
#include "test/core/end2end/cq_verifier.h" |
||||||
|
#include "test/core/util/port.h" |
||||||
|
#include "test/core/util/test_config.h" |
||||||
|
|
||||||
namespace { |
namespace { |
||||||
auto* g_memory_owner = new MemoryOwner( |
|
||||||
ResourceQuota::Default()->memory_quota()->CreateMemoryOwner("test")); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(FlowControl, NoOp) { |
class TransportTargetWindowSizeMocker |
||||||
ExecCtx exec_ctx; |
: public grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker { |
||||||
TransportFlowControl tfc("test", true, g_memory_owner); |
public: |
||||||
StreamFlowControl sfc(&tfc); |
static constexpr uint32_t kLargeInitialWindowSize = 1u << 31; |
||||||
// Check initial values are per http2 spec
|
static constexpr uint32_t kSmallInitialWindowSize = 0; |
||||||
EXPECT_EQ(tfc.sent_init_window(), 65535); |
|
||||||
EXPECT_EQ(tfc.acked_init_window(), 65535); |
double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate( |
||||||
EXPECT_EQ(tfc.remote_window(), 65535); |
double /* current_target */) override { |
||||||
EXPECT_EQ(tfc.target_frame_size(), 16384); |
// Protecting access to variable window_size_ shared between client and
|
||||||
EXPECT_EQ(sfc.remote_window_delta(), 0); |
// server.
|
||||||
EXPECT_EQ(sfc.min_progress_size(), 0); |
grpc_core::MutexLock lock(&mu_); |
||||||
EXPECT_EQ(sfc.local_window_delta(), 0); |
if (alternating_initial_window_sizes_) { |
||||||
EXPECT_EQ(sfc.announced_window_delta(), 0); |
window_size_ = (window_size_ == kLargeInitialWindowSize) |
||||||
|
? kSmallInitialWindowSize |
||||||
|
: kLargeInitialWindowSize; |
||||||
|
} |
||||||
|
return window_size_; |
||||||
|
} |
||||||
|
|
||||||
|
// Alternates the initial window size targets. Computes a low values if it was
|
||||||
|
// previously high, or a high value if it was previously low.
|
||||||
|
void AlternateTargetInitialWindowSizes() { |
||||||
|
grpc_core::MutexLock lock(&mu_); |
||||||
|
alternating_initial_window_sizes_ = true; |
||||||
|
} |
||||||
|
|
||||||
|
void Reset() { |
||||||
|
// Protecting access to variable window_size_ shared between client and
|
||||||
|
// server.
|
||||||
|
grpc_core::MutexLock lock(&mu_); |
||||||
|
alternating_initial_window_sizes_ = false; |
||||||
|
window_size_ = kLargeInitialWindowSize; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
grpc_core::Mutex mu_; |
||||||
|
bool alternating_initial_window_sizes_ ABSL_GUARDED_BY(mu_) = false; |
||||||
|
double window_size_ ABSL_GUARDED_BY(mu_) = kLargeInitialWindowSize; |
||||||
|
}; |
||||||
|
|
||||||
|
TransportTargetWindowSizeMocker* g_target_initial_window_size_mocker; |
||||||
|
|
||||||
|
void* tag(intptr_t t) { return reinterpret_cast<void*>(t); } |
||||||
|
|
||||||
|
void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) { |
||||||
|
grpc_connectivity_state state = |
||||||
|
grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */); |
||||||
|
while (state != GRPC_CHANNEL_READY) { |
||||||
|
grpc_channel_watch_connectivity_state( |
||||||
|
channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr); |
||||||
|
grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), |
||||||
|
nullptr); |
||||||
|
state = grpc_channel_check_connectivity_state(channel, 0); |
||||||
|
} |
||||||
} |
} |
||||||
|
|
||||||
TEST(FlowControl, SendData) { |
void VerifyChannelConnected(grpc_channel* channel, grpc_completion_queue* cq) { |
||||||
ExecCtx exec_ctx; |
// Verify channel is connected. Use a ping to make sure that clients
|
||||||
TransportFlowControl tfc("test", true, g_memory_owner); |
// tries sending/receiving bytes if the channel is connected.
|
||||||
StreamFlowControl sfc(&tfc); |
grpc_channel_ping(channel, cq, reinterpret_cast<void*>(2000), nullptr); |
||||||
sfc.SentData(1024); |
grpc_event ev = grpc_completion_queue_next( |
||||||
EXPECT_EQ(sfc.remote_window_delta(), -1024); |
cq, grpc_timeout_seconds_to_deadline(5), nullptr); |
||||||
EXPECT_EQ(tfc.remote_window(), 65535 - 1024); |
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); |
||||||
|
GPR_ASSERT(ev.tag == reinterpret_cast<void*>(2000)); |
||||||
|
GPR_ASSERT(ev.success == 1); |
||||||
|
GPR_ASSERT(grpc_channel_check_connectivity_state(channel, 0) == |
||||||
|
GRPC_CHANNEL_READY); |
||||||
} |
} |
||||||
|
|
||||||
TEST(FlowControl, InitialTransportUpdate) { |
// Shuts down and destroys the server.
|
||||||
ExecCtx exec_ctx; |
void ServerShutdownAndDestroy(grpc_server* server, grpc_completion_queue* cq) { |
||||||
TransportFlowControl tfc("test", true, g_memory_owner); |
// Shutdown and destroy server
|
||||||
EXPECT_EQ(tfc.MakeAction(), FlowControlAction()); |
grpc_server_shutdown_and_notify(server, cq, reinterpret_cast<void*>(1000)); |
||||||
|
while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), |
||||||
|
nullptr) |
||||||
|
.tag != reinterpret_cast<void*>(1000)) { |
||||||
|
} |
||||||
|
grpc_server_destroy(server); |
||||||
} |
} |
||||||
|
|
||||||
TEST(FlowControl, InitialStreamUpdate) { |
grpc_slice LargeSlice(void) { |
||||||
ExecCtx exec_ctx; |
grpc_slice slice = grpc_slice_malloc(10000000); // ~10MB
|
||||||
TransportFlowControl tfc("test", true, g_memory_owner); |
memset(GRPC_SLICE_START_PTR(slice), 'x', GRPC_SLICE_LENGTH(slice)); |
||||||
StreamFlowControl sfc(&tfc); |
return slice; |
||||||
EXPECT_EQ(sfc.MakeAction(), FlowControlAction()); |
|
||||||
} |
} |
||||||
|
|
||||||
TEST(FlowControl, RecvData) { |
void PerformCallWithLargePayload(grpc_channel* channel, grpc_server* server, |
||||||
ExecCtx exec_ctx; |
grpc_completion_queue* cq) { |
||||||
TransportFlowControl tfc("test", true, g_memory_owner); |
grpc_slice request_payload_slice = LargeSlice(); |
||||||
StreamFlowControl sfc(&tfc); |
grpc_slice response_payload_slice = LargeSlice(); |
||||||
EXPECT_EQ(absl::OkStatus(), sfc.RecvData(1024)); |
grpc_call* c; |
||||||
EXPECT_EQ(tfc.announced_window(), 65535 - 1024); |
grpc_call* s; |
||||||
EXPECT_EQ(sfc.local_window_delta(), -1024); |
grpc_byte_buffer* request_payload = |
||||||
|
grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
||||||
|
grpc_byte_buffer* response_payload = |
||||||
|
grpc_raw_byte_buffer_create(&response_payload_slice, 1); |
||||||
|
cq_verifier* cqv = cq_verifier_create(cq); |
||||||
|
grpc_op ops[6]; |
||||||
|
grpc_op* op; |
||||||
|
grpc_metadata_array initial_metadata_recv; |
||||||
|
grpc_metadata_array trailing_metadata_recv; |
||||||
|
grpc_metadata_array request_metadata_recv; |
||||||
|
grpc_byte_buffer* request_payload_recv = nullptr; |
||||||
|
grpc_byte_buffer* response_payload_recv = nullptr; |
||||||
|
grpc_call_details call_details; |
||||||
|
grpc_status_code status; |
||||||
|
grpc_call_error error; |
||||||
|
grpc_slice details; |
||||||
|
int was_cancelled = 2; |
||||||
|
|
||||||
|
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30); |
||||||
|
c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, |
||||||
|
grpc_slice_from_static_string("/foo"), nullptr, |
||||||
|
deadline, nullptr); |
||||||
|
GPR_ASSERT(c); |
||||||
|
|
||||||
|
grpc_metadata_array_init(&initial_metadata_recv); |
||||||
|
grpc_metadata_array_init(&trailing_metadata_recv); |
||||||
|
grpc_metadata_array_init(&request_metadata_recv); |
||||||
|
grpc_call_details_init(&call_details); |
||||||
|
|
||||||
|
memset(ops, 0, sizeof(ops)); |
||||||
|
op = ops; |
||||||
|
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||||
|
op->data.send_initial_metadata.count = 0; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
op->op = GRPC_OP_SEND_MESSAGE; |
||||||
|
op->data.send_message.send_message = request_payload; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
||||||
|
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
op->op = GRPC_OP_RECV_MESSAGE; |
||||||
|
op->data.recv_message.recv_message = &response_payload_recv; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
||||||
|
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; |
||||||
|
op->data.recv_status_on_client.status = &status; |
||||||
|
op->data.recv_status_on_client.status_details = &details; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1), |
||||||
|
nullptr); |
||||||
|
GPR_ASSERT(GRPC_CALL_OK == error); |
||||||
|
|
||||||
|
error = grpc_server_request_call(server, &s, &call_details, |
||||||
|
&request_metadata_recv, cq, cq, tag(101)); |
||||||
|
GPR_ASSERT(GRPC_CALL_OK == error); |
||||||
|
CQ_EXPECT_COMPLETION(cqv, tag(101), 1); |
||||||
|
cq_verify(cqv); |
||||||
|
|
||||||
|
memset(ops, 0, sizeof(ops)); |
||||||
|
op = ops; |
||||||
|
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||||
|
op->data.send_initial_metadata.count = 0; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
op->op = GRPC_OP_RECV_MESSAGE; |
||||||
|
op->data.recv_message.recv_message = &request_payload_recv; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102), |
||||||
|
nullptr); |
||||||
|
GPR_ASSERT(GRPC_CALL_OK == error); |
||||||
|
|
||||||
|
CQ_EXPECT_COMPLETION(cqv, tag(102), 1); |
||||||
|
cq_verify(cqv); |
||||||
|
|
||||||
|
memset(ops, 0, sizeof(ops)); |
||||||
|
op = ops; |
||||||
|
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
||||||
|
op->data.recv_close_on_server.cancelled = &was_cancelled; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
op->op = GRPC_OP_SEND_MESSAGE; |
||||||
|
op->data.send_message.send_message = response_payload; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
||||||
|
op->data.send_status_from_server.trailing_metadata_count = 0; |
||||||
|
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; |
||||||
|
grpc_slice status_details = grpc_slice_from_static_string("xyz"); |
||||||
|
op->data.send_status_from_server.status_details = &status_details; |
||||||
|
op->flags = 0; |
||||||
|
op->reserved = nullptr; |
||||||
|
op++; |
||||||
|
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103), |
||||||
|
nullptr); |
||||||
|
GPR_ASSERT(GRPC_CALL_OK == error); |
||||||
|
|
||||||
|
CQ_EXPECT_COMPLETION(cqv, tag(103), 1); |
||||||
|
CQ_EXPECT_COMPLETION(cqv, tag(1), 1); |
||||||
|
cq_verify(cqv); |
||||||
|
|
||||||
|
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); |
||||||
|
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); |
||||||
|
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); |
||||||
|
GPR_ASSERT(was_cancelled == 0); |
||||||
|
|
||||||
|
grpc_slice_unref(details); |
||||||
|
grpc_metadata_array_destroy(&initial_metadata_recv); |
||||||
|
grpc_metadata_array_destroy(&trailing_metadata_recv); |
||||||
|
grpc_metadata_array_destroy(&request_metadata_recv); |
||||||
|
grpc_call_details_destroy(&call_details); |
||||||
|
|
||||||
|
grpc_call_unref(c); |
||||||
|
grpc_call_unref(s); |
||||||
|
|
||||||
|
cq_verifier_destroy(cqv); |
||||||
|
|
||||||
|
grpc_byte_buffer_destroy(request_payload); |
||||||
|
grpc_byte_buffer_destroy(response_payload); |
||||||
|
grpc_byte_buffer_destroy(request_payload_recv); |
||||||
|
grpc_byte_buffer_destroy(response_payload_recv); |
||||||
|
grpc_slice_unref(request_payload_slice); |
||||||
|
grpc_slice_unref(response_payload_slice); |
||||||
} |
} |
||||||
|
|
||||||
TEST(FlowControl, TrackMinProgressSize) { |
class FlowControlTest : public ::testing::Test { |
||||||
ExecCtx exec_ctx; |
protected: |
||||||
TransportFlowControl tfc("test", true, g_memory_owner); |
void SetUp() override { |
||||||
StreamFlowControl sfc(&tfc); |
cq_ = grpc_completion_queue_create_for_next(nullptr); |
||||||
sfc.UpdateProgress(5); |
// create the server
|
||||||
EXPECT_EQ(sfc.min_progress_size(), 5); |
std::string server_address = |
||||||
sfc.UpdateProgress(10); |
grpc_core::JoinHostPort("localhost", grpc_pick_unused_port_or_die()); |
||||||
EXPECT_EQ(sfc.min_progress_size(), 10); |
grpc_arg server_args[] = { |
||||||
EXPECT_EQ(absl::OkStatus(), sfc.RecvData(5)); |
grpc_channel_arg_integer_create( |
||||||
EXPECT_EQ(sfc.min_progress_size(), 5); |
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 0), |
||||||
EXPECT_EQ(absl::OkStatus(), sfc.RecvData(5)); |
grpc_channel_arg_integer_create( |
||||||
EXPECT_EQ(sfc.min_progress_size(), 0); |
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
||||||
EXPECT_EQ(absl::OkStatus(), sfc.RecvData(5)); |
grpc_channel_arg_integer_create( |
||||||
EXPECT_EQ(sfc.min_progress_size(), 0); |
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
||||||
|
grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), |
||||||
|
server_args}; |
||||||
|
server_ = grpc_server_create(&server_channel_args, nullptr); |
||||||
|
grpc_server_register_completion_queue(server_, cq_, nullptr); |
||||||
|
grpc_server_credentials* server_creds = |
||||||
|
grpc_insecure_server_credentials_create(); |
||||||
|
GPR_ASSERT(grpc_server_add_http2_port(server_, server_address.c_str(), |
||||||
|
server_creds)); |
||||||
|
grpc_server_credentials_release(server_creds); |
||||||
|
grpc_server_start(server_); |
||||||
|
// create the channel (bdp pings are enabled by default)
|
||||||
|
grpc_arg client_args[] = { |
||||||
|
grpc_channel_arg_integer_create( |
||||||
|
const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), |
||||||
|
grpc_channel_arg_integer_create( |
||||||
|
const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 1), |
||||||
|
grpc_channel_arg_integer_create( |
||||||
|
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), -1), |
||||||
|
grpc_channel_arg_integer_create( |
||||||
|
const_cast<char*>(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH), -1)}; |
||||||
|
grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), |
||||||
|
client_args}; |
||||||
|
grpc_channel_credentials* creds = grpc_insecure_credentials_create(); |
||||||
|
channel_ = grpc_channel_create(server_address.c_str(), creds, |
||||||
|
&client_channel_args); |
||||||
|
grpc_channel_credentials_release(creds); |
||||||
|
VerifyChannelReady(channel_, cq_); |
||||||
|
g_target_initial_window_size_mocker->Reset(); |
||||||
|
} |
||||||
|
|
||||||
|
void TearDown() override { |
||||||
|
// shutdown and destroy the client and server
|
||||||
|
grpc_channel_destroy(channel_); |
||||||
|
ServerShutdownAndDestroy(server_, cq_); |
||||||
|
grpc_completion_queue_shutdown(cq_); |
||||||
|
while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME), |
||||||
|
nullptr) |
||||||
|
.type != GRPC_QUEUE_SHUTDOWN) { |
||||||
|
} |
||||||
|
grpc_completion_queue_destroy(cq_); |
||||||
|
} |
||||||
|
|
||||||
|
grpc_server* server_ = nullptr; |
||||||
|
grpc_channel* channel_ = nullptr; |
||||||
|
grpc_completion_queue* cq_ = nullptr; |
||||||
|
}; |
||||||
|
|
||||||
|
TEST_F(FlowControlTest, |
||||||
|
TestLargeWindowSizeUpdatesDoNotCauseIllegalFlowControlWindows) { |
||||||
|
for (int i = 0; i < 10; ++i) { |
||||||
|
PerformCallWithLargePayload(channel_, server_, cq_); |
||||||
|
VerifyChannelConnected(channel_, cq_); |
||||||
|
} |
||||||
} |
} |
||||||
|
|
||||||
TEST(FlowControl, NoUpdateWithoutReader) { |
TEST_F(FlowControlTest, TestWindowSizeUpdatesDoNotCauseStalledStreams) { |
||||||
ExecCtx exec_ctx; |
g_target_initial_window_size_mocker->AlternateTargetInitialWindowSizes(); |
||||||
TransportFlowControl tfc("test", true, g_memory_owner); |
for (int i = 0; i < 100; ++i) { |
||||||
StreamFlowControl sfc(&tfc); |
PerformCallWithLargePayload(channel_, server_, cq_); |
||||||
for (int i = 0; i < 65535; i++) { |
VerifyChannelConnected(channel_, cq_); |
||||||
EXPECT_EQ(sfc.RecvData(1), absl::OkStatus()); |
|
||||||
EXPECT_EQ(sfc.MakeAction().send_stream_update(), |
|
||||||
FlowControlAction::Urgency::NO_ACTION_NEEDED); |
|
||||||
} |
} |
||||||
// Empty window needing 1 byte to progress should trigger an immediate read.
|
|
||||||
sfc.UpdateProgress(1); |
|
||||||
EXPECT_EQ(sfc.min_progress_size(), 1); |
|
||||||
EXPECT_EQ(sfc.MakeAction().send_stream_update(), |
|
||||||
FlowControlAction::Urgency::UPDATE_IMMEDIATELY); |
|
||||||
EXPECT_GT(sfc.MaybeSendUpdate(), 0); |
|
||||||
} |
} |
||||||
|
|
||||||
} // namespace chttp2
|
} // namespace
|
||||||
} // namespace grpc_core
|
|
||||||
|
|
||||||
int main(int argc, char** argv) { |
int main(int argc, char** argv) { |
||||||
::testing::InitGoogleTest(&argc, argv); |
::testing::InitGoogleTest(&argc, argv); |
||||||
return RUN_ALL_TESTS(); |
// Make sure that we will have an active poller on all client-side fd's that
|
||||||
|
// are capable of sending and receiving even in the case that we don't have an
|
||||||
|
// active RPC operation on the fd.
|
||||||
|
GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1); |
||||||
|
grpc_core::chttp2::g_test_only_transport_flow_control_window_check = true; |
||||||
|
g_target_initial_window_size_mocker = new TransportTargetWindowSizeMocker(); |
||||||
|
grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker = |
||||||
|
g_target_initial_window_size_mocker; |
||||||
|
grpc::testing::TestEnvironment env(&argc, argv); |
||||||
|
grpc_init(); |
||||||
|
auto result = RUN_ALL_TESTS(); |
||||||
|
grpc_shutdown(); |
||||||
|
return result; |
||||||
} |
} |
||||||
|
Loading…
Reference in new issue