mirror of https://github.com/grpc/grpc.git
[promises] Promise based grpc_call (#29598)
* Automated change: Fix sanity tests * fix * fixes * fixup allocator story - we should require a context * fixes * doodling * context scribbles * [arena] Add ManagedNew(), gtest-ify test Add a ManagedNew() method to Arena that calls the relevant destructor at Arena destruction time. There are some cases coming up in the promise based call work where this becomes super convenient, and I expect it's likely that there are other places that's true too. * Automated change: Fix sanity tests * progress * lalalal * progress * x * Automated change: Fix sanity tests * fixes * fix * fix * fix * fix * Automated change: Fix sanity tests * fix * Automated change: Fix sanity tests * fixes * fixes * fixes * Automated change: Fix sanity tests * progress * fix client streaming * handle invalid flags * Automated change: Fix sanity tests * no logging * progress * progress * channelz * tentative fix * fix * lalala * Automated change: Fix sanity tests * more readable trace * logging improvements, leading to bug fix in connected channel * fix * improve debuggability * fix * progress to better refcounting * progress * Automated change: Fix sanity tests * Automated change: Fix sanity tests * fix * fix * fix * threading * Automated change: Fix sanity tests * fix * fix * improve debuggability * fix * fix * Automated change: Fix sanity tests * fix * make promises runtime configurable * Automated change: Fix sanity tests * fix * fix build * fix broken test * clean up api * deal with stats better * peer string! * introduce fragments * Automated change: Fix sanity tests * use fragments * stuff * [promises] Add AtomicWaker type * Automated change: Fix sanity tests * fix * fix write path * fix * polling-entity-hell * review feedback * fix * fix * fix * fix * make an experiment * [experiments] Make output more diffable/readable * Automated change: Fix sanity tests * buildifier sized indentations * fix * fix * Automated change: Fix sanity tests * fix? * fix promise * prototype * progress * implement new api * Revert "fix promise" This reverts commitpull/31118/headded85e7d19
. * Revert "Revert "fix promise"" This reverts commitc2acef1958
. * progress * done * Automated change: Fix sanity tests * fix * fix * fix * Automated change: Fix sanity tests * updates * review feedback * first pass feedback * Automated change: Fix sanity tests * review feedback * naming * better-logs * fix test * Automated change: Fix sanity tests * comments * fix * progress * validation * iwyu * fix * ugh this needs to be any * fix flakiness in asan * call tracing * cleanup unused args * fix windows * fix build * ugh * fix tsan race * threading-fix * bloat1 * bloat2 * bloat3 * fix * unused-args * sanity * iwyu * fix * fix * this is ok * iwyu, exchange * fix * Automated change: Fix sanity tests * fix ee lifetime issue * fix * review feedback * Automated change: Fix sanity tests * comment * x * fix tsan race * iwyu * Automated change: Fix sanity tests Co-authored-by: ctiller <ctiller@users.noreply.github.com>
parent
e1e1f6181f
commit
beb5bdca62
102 changed files with 3374 additions and 418 deletions
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,114 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/surface/call_trace.h" |
||||
|
||||
#include <functional> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/container/flat_hash_map.h" |
||||
#include "absl/meta/type_traits.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/types/variant.h" |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/gprpp/no_destruct.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/arena_promise.h" |
||||
#include "src/core/lib/transport/call_fragments.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
const grpc_channel_filter* PromiseTracingFilterFor( |
||||
const grpc_channel_filter* filter) { |
||||
struct DerivedFilter : public grpc_channel_filter { |
||||
explicit DerivedFilter(const grpc_channel_filter* filter) |
||||
: grpc_channel_filter{ |
||||
/* start_transport_stream_op_batch: */ grpc_call_next_op, |
||||
/* make_call_promise: */ |
||||
[](grpc_channel_element* elem, CallArgs call_args, |
||||
NextPromiseFactory next_promise_factory) |
||||
-> ArenaPromise<ServerMetadataHandle> { |
||||
auto* source_filter = |
||||
static_cast<const DerivedFilter*>(elem->filter)->filter; |
||||
gpr_log( |
||||
GPR_DEBUG, |
||||
"%sCreateCallPromise[%s]: client_initial_metadata=%s", |
||||
Activity::current()->DebugTag().c_str(), |
||||
source_filter->name, |
||||
call_args.client_initial_metadata->DebugString().c_str()); |
||||
return [source_filter, child = next_promise_factory( |
||||
std::move(call_args))]() mutable { |
||||
gpr_log(GPR_DEBUG, "%sPollCallPromise[%s]: begin", |
||||
Activity::current()->DebugTag().c_str(), |
||||
source_filter->name); |
||||
auto r = child(); |
||||
if (auto* p = absl::get_if<ServerMetadataHandle>(&r)) { |
||||
gpr_log(GPR_DEBUG, "%sPollCallPromise[%s]: done: %s", |
||||
Activity::current()->DebugTag().c_str(), |
||||
source_filter->name, (*p)->DebugString().c_str()); |
||||
} else { |
||||
gpr_log(GPR_DEBUG, "%sPollCallPromise[%s]: <<pending>", |
||||
Activity::current()->DebugTag().c_str(), |
||||
source_filter->name); |
||||
} |
||||
return r; |
||||
}; |
||||
}, |
||||
grpc_channel_next_op, /* sizeof_call_data: */ 0, |
||||
/* init_call_elem: */ |
||||
[](grpc_call_element*, const grpc_call_element_args*) { |
||||
return absl::OkStatus(); |
||||
}, |
||||
grpc_call_stack_ignore_set_pollset_or_pollset_set, |
||||
/* destroy_call_elem: */ |
||||
[](grpc_call_element*, const grpc_call_final_info*, |
||||
grpc_closure*) {}, |
||||
/* sizeof_channel_data: */ 0, /* init_channel_elem: */ |
||||
[](grpc_channel_element*, grpc_channel_element_args*) { |
||||
return absl::OkStatus(); |
||||
}, |
||||
/* post_init_channel_elem: */ |
||||
[](grpc_channel_stack*, grpc_channel_element*) {}, |
||||
/* destroy_channel_elem: */ [](grpc_channel_element*) {}, |
||||
grpc_channel_next_get_info, filter->name}, |
||||
filter(filter) {} |
||||
const grpc_channel_filter* const filter; |
||||
}; |
||||
struct Globals { |
||||
Mutex mu; |
||||
absl::flat_hash_map<const grpc_channel_filter*, |
||||
std::unique_ptr<DerivedFilter>> |
||||
map ABSL_GUARDED_BY(mu); |
||||
}; |
||||
auto* globals = NoDestructSingleton<Globals>::Get(); |
||||
MutexLock lock(&globals->mu); |
||||
auto it = globals->map.find(filter); |
||||
if (it != globals->map.end()) return it->second.get(); |
||||
return globals->map.emplace(filter, std::make_unique<DerivedFilter>(filter)) |
||||
.first->second.get(); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,30 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_CORE_LIB_SURFACE_CALL_TRACE_H |
||||
#define GRPC_CORE_LIB_SURFACE_CALL_TRACE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/channel/channel_fwd.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
|
||||
extern grpc_core::TraceFlag grpc_call_trace; |
||||
|
||||
namespace grpc_core { |
||||
const grpc_channel_filter* PromiseTracingFilterFor( |
||||
const grpc_channel_filter* filter); |
||||
} |
||||
|
||||
#endif // GRPC_CORE_LIB_SURFACE_CALL_TRACE_H
|
@ -0,0 +1,45 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/transport/call_fragments.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
FragmentAllocator::Node* FragmentAllocator::AllocateNode() { |
||||
if (free_list_ != nullptr) { |
||||
Node* node = free_list_; |
||||
free_list_ = free_list_->next_free; |
||||
return node; |
||||
} |
||||
return static_cast<Node*>(GetContext<Arena>()->Alloc(sizeof(Node))); |
||||
} |
||||
|
||||
void FragmentAllocator::FreeNode(Node* node) { |
||||
node->next_free = free_list_; |
||||
free_list_ = node; |
||||
} |
||||
|
||||
void FragmentAllocator::Delete(grpc_metadata_batch* p) { |
||||
p->~grpc_metadata_batch(); |
||||
FreeNode(reinterpret_cast<Node*>(p)); |
||||
} |
||||
|
||||
void FragmentAllocator::Delete(Message* m) { |
||||
m->~Message(); |
||||
FreeNode(reinterpret_cast<Node*>(m)); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,232 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_CORE_LIB_TRANSPORT_CALL_FRAGMENTS_H |
||||
#define GRPC_CORE_LIB_TRANSPORT_CALL_FRAGMENTS_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <new> |
||||
#include <utility> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/status.h> |
||||
|
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// TODO(ctiller): eliminate once MetadataHandle is constructable directly.
|
||||
namespace promise_filter_detail { |
||||
class BaseCallData; |
||||
} // namespace promise_filter_detail
|
||||
|
||||
class FragmentAllocator; |
||||
|
||||
// Small owned "handle" type to ensure one accessor at a time to metadata.
|
||||
// The focus here is to get promises to use the syntax we'd like - we'll
|
||||
// probably substitute some other smart pointer later.
|
||||
template <typename T> |
||||
class FragmentHandle { |
||||
public: |
||||
FragmentHandle() = default; |
||||
|
||||
FragmentHandle(const FragmentHandle&) = delete; |
||||
FragmentHandle& operator=(const FragmentHandle&) = delete; |
||||
|
||||
FragmentHandle(FragmentHandle&& other) noexcept |
||||
: handle_(other.handle_), |
||||
allocated_by_allocator_(other.allocated_by_allocator_) { |
||||
other.handle_ = nullptr; |
||||
other.allocated_by_allocator_ = false; |
||||
} |
||||
FragmentHandle& operator=(FragmentHandle&& other) noexcept { |
||||
DestroyHandle(); |
||||
handle_ = other.handle_; |
||||
allocated_by_allocator_ = other.allocated_by_allocator_; |
||||
other.handle_ = nullptr; |
||||
other.allocated_by_allocator_ = false; |
||||
return *this; |
||||
} |
||||
|
||||
explicit FragmentHandle(const absl::Status& status); |
||||
|
||||
~FragmentHandle() { DestroyHandle(); } |
||||
|
||||
T* operator->() const { return handle_; } |
||||
bool has_value() const { return handle_ != nullptr; } |
||||
T* get() const { return handle_; } |
||||
void reset() { *this = FragmentHandle(); } |
||||
|
||||
static FragmentHandle TestOnlyWrap(T* p) { return FragmentHandle(p, false); } |
||||
|
||||
private: |
||||
// We restrict access to construction from a pointer to limit the number of
|
||||
// cases that need dealing with as this code evolves.
|
||||
friend class promise_filter_detail::BaseCallData; |
||||
friend class FragmentAllocator; |
||||
|
||||
explicit FragmentHandle(T* handle, bool allocated_by_allocator) |
||||
: handle_(handle), allocated_by_allocator_(allocated_by_allocator) {} |
||||
|
||||
void DestroyHandle(); |
||||
|
||||
T* Unwrap() { |
||||
T* result = handle_; |
||||
handle_ = nullptr; |
||||
return result; |
||||
} |
||||
|
||||
T* handle_ = nullptr; |
||||
// TODO(ctiller): remove this once promise_based_filter goes away.
|
||||
// This bit determines whether the pointer is allocated by a metadata
|
||||
// allocator or some other system. If it's held by a metadata allocator, we'll
|
||||
// release it back when we're done with it.
|
||||
bool allocated_by_allocator_ = false; |
||||
}; |
||||
|
||||
// Server metadata type
|
||||
// TODO(ctiller): This should be a bespoke instance of MetadataMap<>
|
||||
using ServerMetadata = grpc_metadata_batch; |
||||
using ServerMetadataHandle = FragmentHandle<ServerMetadata>; |
||||
|
||||
// Client initial metadata type
|
||||
// TODO(ctiller): This should be a bespoke instance of MetadataMap<>
|
||||
using ClientMetadata = grpc_metadata_batch; |
||||
using ClientMetadataHandle = FragmentHandle<ClientMetadata>; |
||||
|
||||
class Message { |
||||
public: |
||||
Message() = default; |
||||
~Message() = default; |
||||
Message(SliceBuffer payload, uint32_t flags) |
||||
: payload_(std::move(payload)), flags_(flags) {} |
||||
Message(const Message&) = delete; |
||||
Message& operator=(const Message&) = delete; |
||||
|
||||
uint32_t flags() const { return flags_; } |
||||
SliceBuffer* payload() { return &payload_; } |
||||
const SliceBuffer* payload() const { return &payload_; } |
||||
|
||||
private: |
||||
SliceBuffer payload_; |
||||
uint32_t flags_ = 0; |
||||
}; |
||||
|
||||
using MessageHandle = FragmentHandle<Message>; |
||||
|
||||
// Ok/not-ok check for trailing metadata, so that it can be used as result types
|
||||
// for TrySeq.
|
||||
inline bool IsStatusOk(const ServerMetadataHandle& m) { |
||||
return m->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN) == |
||||
GRPC_STATUS_OK; |
||||
} |
||||
|
||||
// Within a call arena we need metadata at least four times - (client,server) x
|
||||
// (initial,trailing), and possibly more for early returning promises.
|
||||
// Since we often don't need these *simultaneously*, we can save memory by
|
||||
// allocating/releasing them.
|
||||
// We'd still like the memory to be part of the arena though, so this type
|
||||
// creates a small free list of metadata objects and a central (call context)
|
||||
// based place to create/destroy them.
|
||||
class FragmentAllocator { |
||||
public: |
||||
FragmentAllocator() = default; |
||||
~FragmentAllocator() = default; |
||||
FragmentAllocator(const FragmentAllocator&) = delete; |
||||
FragmentAllocator& operator=(const FragmentAllocator&) = delete; |
||||
|
||||
ClientMetadataHandle MakeClientMetadata() { |
||||
auto* node = AllocateNode(); |
||||
// TODO(ctiller): once we finish the promise transition, have metadata map
|
||||
// know about arena contexts and allocate directly from there.
|
||||
// (we could do so before, but there's enough places where we don't have a
|
||||
// promise context up that it's too much whackamole)
|
||||
new (&node->batch) ClientMetadata(GetContext<Arena>()); |
||||
return ClientMetadataHandle(&node->batch, true); |
||||
} |
||||
|
||||
ServerMetadataHandle MakeServerMetadata() { return MakeClientMetadata(); } |
||||
|
||||
template <typename... Args> |
||||
MessageHandle MakeMessage(Args&&... args) { |
||||
auto* node = AllocateNode(); |
||||
new (&node->message) Message(std::forward<Args>(args)...); |
||||
return MessageHandle(&node->message, true); |
||||
} |
||||
|
||||
private: |
||||
union Node { |
||||
Node* next_free; |
||||
grpc_metadata_batch batch; |
||||
Message message; |
||||
}; |
||||
|
||||
template <typename T> |
||||
friend class FragmentHandle; |
||||
|
||||
void Delete(grpc_metadata_batch* p); |
||||
void Delete(Message* m); |
||||
|
||||
Node* AllocateNode(); |
||||
void FreeNode(Node* node); |
||||
|
||||
Node* free_list_ = nullptr; |
||||
}; |
||||
|
||||
template <> |
||||
struct ContextType<FragmentAllocator> {}; |
||||
|
||||
template <typename T> |
||||
FragmentHandle<T>::FragmentHandle(const absl::Status& status) { |
||||
// TODO(ctiller): currently we guarantee that MetadataAllocator is only
|
||||
// present for promise based calls, and if we're using promise_based_filter
|
||||
// it's not present. If we're in a promise based call, the correct thing is to
|
||||
// use the metadata allocator to track the memory we need. If we're not, we
|
||||
// need to do the hacky thing promise_based_filter does.
|
||||
// This all goes away when promise_based_filter goes away, and this code will
|
||||
// just assume there's an allocator present and move forward.
|
||||
if (auto* allocator = GetContext<FragmentAllocator>()) { |
||||
handle_ = nullptr; |
||||
allocated_by_allocator_ = false; |
||||
*this = allocator->MakeServerMetadata(); |
||||
} else { |
||||
handle_ = GetContext<Arena>()->New<T>(GetContext<Arena>()); |
||||
allocated_by_allocator_ = false; |
||||
} |
||||
handle_->Set(GrpcStatusMetadata(), |
||||
static_cast<grpc_status_code>(status.code())); |
||||
if (status.ok()) return; |
||||
handle_->Set(GrpcMessageMetadata(), |
||||
Slice::FromCopiedString(status.message())); |
||||
} |
||||
|
||||
template <typename T> |
||||
void FragmentHandle<T>::DestroyHandle() { |
||||
if (allocated_by_allocator_) { |
||||
GetContext<FragmentAllocator>()->Delete(handle_); |
||||
} |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_LIB_TRANSPORT_CALL_FRAGMENTS_H
|
@ -0,0 +1,176 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <memory> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/status.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_args_preconditioning.h" |
||||
#include "src/core/lib/channel/channelz.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/iomgr/endpoint.h" |
||||
#include "src/core/lib/iomgr/endpoint_pair.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "src/core/lib/surface/channel_stack_type.h" |
||||
#include "src/core/lib/surface/completion_queue.h" |
||||
#include "src/core/lib/surface/server.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
#include "src/core/lib/transport/transport_fwd.h" |
||||
#include "test/core/end2end/end2end_tests.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
/* chttp2 transport that is immediately available (used for testing
|
||||
connected_channel without a client_channel) */ |
||||
|
||||
struct custom_fixture_data { |
||||
grpc_endpoint_pair ep; |
||||
}; |
||||
|
||||
static void server_setup_transport(void* ts, grpc_transport* transport) { |
||||
grpc_end2end_test_fixture* f = static_cast<grpc_end2end_test_fixture*>(ts); |
||||
grpc_core::ExecCtx exec_ctx; |
||||
custom_fixture_data* fixture_data = |
||||
static_cast<custom_fixture_data*>(f->fixture_data); |
||||
grpc_endpoint_add_to_pollset(fixture_data->ep.server, grpc_cq_pollset(f->cq)); |
||||
grpc_core::Server* core_server = grpc_core::Server::FromC(f->server); |
||||
grpc_error_handle error = core_server->SetupTransport( |
||||
transport, nullptr, core_server->channel_args(), nullptr); |
||||
if (error == GRPC_ERROR_NONE) { |
||||
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); |
||||
} else { |
||||
GRPC_ERROR_UNREF(error); |
||||
grpc_transport_destroy(transport); |
||||
} |
||||
} |
||||
|
||||
typedef struct { |
||||
grpc_end2end_test_fixture* f; |
||||
const grpc_channel_args* client_args; |
||||
} sp_client_setup; |
||||
|
||||
static void client_setup_transport(void* ts, grpc_transport* transport) { |
||||
sp_client_setup* cs = static_cast<sp_client_setup*>(ts); |
||||
|
||||
auto args = grpc_core::ChannelArgs::FromC(cs->client_args) |
||||
.Set(GRPC_ARG_MINIMAL_STACK, true) |
||||
.Set(GRPC_ARG_DEFAULT_AUTHORITY, "test-authority"); |
||||
auto channel = grpc_core::Channel::Create( |
||||
"socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL, transport); |
||||
if (channel.ok()) { |
||||
cs->f->client = channel->release()->c_ptr(); |
||||
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); |
||||
} else { |
||||
cs->f->client = grpc_lame_client_channel_create( |
||||
nullptr, static_cast<grpc_status_code>(channel.status().code()), |
||||
"lame channel"); |
||||
grpc_transport_destroy(transport); |
||||
} |
||||
} |
||||
|
||||
static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( |
||||
const grpc_channel_args* /*client_args*/, |
||||
const grpc_channel_args* /*server_args*/) { |
||||
custom_fixture_data* fixture_data = static_cast<custom_fixture_data*>( |
||||
gpr_malloc(sizeof(custom_fixture_data))); |
||||
grpc_end2end_test_fixture f; |
||||
memset(&f, 0, sizeof(f)); |
||||
f.fixture_data = fixture_data; |
||||
f.cq = grpc_completion_queue_create_for_next(nullptr); |
||||
fixture_data->ep = grpc_iomgr_create_endpoint_pair("fixture", nullptr); |
||||
return f; |
||||
} |
||||
|
||||
static void chttp2_init_client_socketpair( |
||||
grpc_end2end_test_fixture* f, const grpc_channel_args* client_args) { |
||||
grpc_core::ExecCtx exec_ctx; |
||||
auto* fixture_data = static_cast<custom_fixture_data*>(f->fixture_data); |
||||
grpc_transport* transport; |
||||
sp_client_setup cs; |
||||
auto final_client_args = grpc_core::CoreConfiguration::Get() |
||||
.channel_args_preconditioning() |
||||
.PreconditionChannelArgs(client_args) |
||||
.Set(GRPC_ARG_MINIMAL_STACK, true); |
||||
auto c_client_args = final_client_args.ToC(); |
||||
cs.client_args = c_client_args.get(); |
||||
cs.f = f; |
||||
transport = grpc_create_chttp2_transport(final_client_args, |
||||
fixture_data->ep.client, true); |
||||
client_setup_transport(&cs, transport); |
||||
GPR_ASSERT(f->client); |
||||
} |
||||
|
||||
static void chttp2_init_server_socketpair( |
||||
grpc_end2end_test_fixture* f, const grpc_channel_args* server_args) { |
||||
grpc_core::ExecCtx exec_ctx; |
||||
auto* fixture_data = static_cast<custom_fixture_data*>(f->fixture_data); |
||||
grpc_transport* transport; |
||||
GPR_ASSERT(!f->server); |
||||
f->server = grpc_server_create(server_args, nullptr); |
||||
grpc_server_register_completion_queue(f->server, f->cq, nullptr); |
||||
grpc_server_start(f->server); |
||||
auto final_server_args = grpc_core::CoreConfiguration::Get() |
||||
.channel_args_preconditioning() |
||||
.PreconditionChannelArgs(server_args) |
||||
.Set(GRPC_ARG_MINIMAL_STACK, true); |
||||
transport = grpc_create_chttp2_transport(final_server_args, |
||||
fixture_data->ep.server, false); |
||||
server_setup_transport(f, transport); |
||||
} |
||||
|
||||
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) { |
||||
grpc_core::ExecCtx exec_ctx; |
||||
gpr_free(f->fixture_data); |
||||
} |
||||
|
||||
/* All test configurations */ |
||||
static grpc_end2end_test_config configs[] = { |
||||
{"chttp2/socketpair+minstack", |
||||
FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER | |
||||
FEATURE_MASK_DOES_NOT_SUPPORT_DEADLINES, |
||||
nullptr, chttp2_create_fixture_socketpair, chttp2_init_client_socketpair, |
||||
chttp2_init_server_socketpair, chttp2_tear_down_socketpair}, |
||||
}; |
||||
|
||||
int main(int argc, char** argv) { |
||||
size_t i; |
||||
|
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
grpc_end2end_tests_pre_init(); |
||||
grpc_init(); |
||||
|
||||
for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { |
||||
grpc_end2end_tests(argc, argv, configs[i]); |
||||
} |
||||
|
||||
grpc_shutdown(); |
||||
|
||||
return 0; |
||||
} |
@ -0,0 +1,96 @@ |
||||
//
|
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include "src/core/lib/transport/call_fragments.h" |
||||
|
||||
#include <algorithm> |
||||
#include <memory> |
||||
#include <vector> |
||||
|
||||
#include "gmock/gmock.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include <grpc/event_engine/memory_allocator.h> |
||||
|
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/resource_quota/memory_quota.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "test/core/promise/test_context.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
using testing::Each; |
||||
|
||||
namespace grpc_core { |
||||
namespace testing { |
||||
|
||||
class CallFragmentsTest : public ::testing::Test { |
||||
protected: |
||||
CallFragmentsTest() {} |
||||
~CallFragmentsTest() override {} |
||||
|
||||
private: |
||||
MemoryAllocator memory_allocator_ = |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"); |
||||
ScopedArenaPtr arena_ = MakeScopedArena(4096, &memory_allocator_); |
||||
FragmentAllocator fragment_allocator_; |
||||
|
||||
TestContext<Arena> arena_context_{arena_.get()}; |
||||
TestContext<FragmentAllocator> fragment_allocator_context_{ |
||||
&fragment_allocator_}; |
||||
}; |
||||
|
||||
// Ensure test fixture can init/destroy successfully.
|
||||
TEST_F(CallFragmentsTest, Nothing) {} |
||||
|
||||
// Ensure we can create/destroy some client metadata.
|
||||
TEST_F(CallFragmentsTest, ClientMetadata) { |
||||
GetContext<FragmentAllocator>()->MakeClientMetadata(); |
||||
} |
||||
|
||||
// Ensure we can create/destroy some server metadata.
|
||||
TEST_F(CallFragmentsTest, ServerMetadata) { |
||||
GetContext<FragmentAllocator>()->MakeServerMetadata(); |
||||
} |
||||
|
||||
// Ensure repeated allocation/deallocations reuse memory.
|
||||
TEST_F(CallFragmentsTest, RepeatedAllocationsReuseMemory) { |
||||
void* p = GetContext<FragmentAllocator>()->MakeClientMetadata().get(); |
||||
void* q = GetContext<FragmentAllocator>()->MakeClientMetadata().get(); |
||||
EXPECT_EQ(p, q); |
||||
} |
||||
|
||||
// Ensure repeated allocation reinitializes.
|
||||
TEST_F(CallFragmentsTest, RepeatedAllocationsReinitialize) { |
||||
std::vector<void*> addresses; |
||||
for (int i = 0; i < 4; i++) { |
||||
ClientMetadataHandle metadata = |
||||
GetContext<FragmentAllocator>()->MakeClientMetadata(); |
||||
EXPECT_EQ(metadata->get_pointer(HttpPathMetadata()), nullptr); |
||||
metadata->Set(HttpPathMetadata(), Slice::FromCopiedString("/")); |
||||
EXPECT_EQ(metadata->get_pointer(HttpPathMetadata())->as_string_view(), "/"); |
||||
addresses.push_back(metadata.get()); |
||||
} |
||||
EXPECT_THAT(addresses, Each(addresses[0])); |
||||
} |
||||
|
||||
} // namespace testing
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
testing::InitGoogleTest(&argc, argv); |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
}; |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue