mirror of https://github.com/grpc/grpc.git
[promises] Convert call to a party (#32359)
<!-- If you know who should review your pull request, please assign it to that person, otherwise the pull request would get assigned randomly. If your pull request is for a specific language, please add the appropriate lang label. --> --------- Co-authored-by: ctiller <ctiller@users.noreply.github.com>create-pull-request/patch-93f0266
parent
5029af9578
commit
a9873e8357
85 changed files with 3926 additions and 3495 deletions
File diff suppressed because it is too large
Load Diff
@ -1,55 +0,0 @@ |
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_PROMISE_INTRA_ACTIVITY_WAITER_H |
||||
#define GRPC_SRC_CORE_LIB_PROMISE_INTRA_ACTIVITY_WAITER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <string> |
||||
|
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Helper type to track wakeups between objects in the same activity.
|
||||
// Can be fairly fast as no ref counting or locking needs to occur.
|
||||
class IntraActivityWaiter { |
||||
public: |
||||
// Register for wakeup, return Pending(). If state is not ready to proceed,
|
||||
// Promises should bottom out here.
|
||||
Pending pending() { |
||||
waiting_ = true; |
||||
return Pending(); |
||||
} |
||||
// Wake the activity
|
||||
void Wake() { |
||||
if (waiting_) { |
||||
waiting_ = false; |
||||
Activity::current()->ForceImmediateRepoll(); |
||||
} |
||||
} |
||||
|
||||
std::string DebugString() const { |
||||
return waiting_ ? "WAITING" : "NOT_WAITING"; |
||||
} |
||||
|
||||
private: |
||||
bool waiting_ = false; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_PROMISE_INTRA_ACTIVITY_WAITER_H
|
@ -1,295 +0,0 @@ |
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_PROMISE_OBSERVABLE_H |
||||
#define GRPC_SRC_CORE_LIB_PROMISE_OBSERVABLE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <limits> |
||||
#include <memory> |
||||
#include <type_traits> |
||||
#include <utility> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/detail/promise_like.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/wait_set.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace promise_detail { |
||||
|
||||
using ObservableVersion = uint64_t; |
||||
static constexpr ObservableVersion kTombstoneVersion = |
||||
std::numeric_limits<ObservableVersion>::max(); |
||||
|
||||
} // namespace promise_detail
|
||||
|
||||
class WatchCommitter { |
||||
public: |
||||
void Commit() { version_seen_ = promise_detail::kTombstoneVersion; } |
||||
|
||||
protected: |
||||
promise_detail::ObservableVersion version_seen_ = 0; |
||||
}; |
||||
|
||||
namespace promise_detail { |
||||
|
||||
// Shared state between Observable and Observer.
|
||||
template <typename T> |
||||
class ObservableState { |
||||
public: |
||||
explicit ObservableState(absl::optional<T> value) |
||||
: value_(std::move(value)) {} |
||||
|
||||
// Publish that we're closed.
|
||||
void Close() { |
||||
mu_.Lock(); |
||||
version_ = kTombstoneVersion; |
||||
value_.reset(); |
||||
auto wakeup = waiters_.TakeWakeupSet(); |
||||
mu_.Unlock(); |
||||
wakeup.Wakeup(); |
||||
} |
||||
|
||||
// Synchronously publish a new value, and wake any waiters.
|
||||
void Push(T value) { |
||||
mu_.Lock(); |
||||
version_++; |
||||
value_ = std::move(value); |
||||
auto wakeup = waiters_.TakeWakeupSet(); |
||||
mu_.Unlock(); |
||||
wakeup.Wakeup(); |
||||
} |
||||
|
||||
Poll<absl::optional<T>> PollGet(ObservableVersion* version_seen) { |
||||
MutexLock lock(&mu_); |
||||
if (!Started()) return Pending(); |
||||
*version_seen = version_; |
||||
return value_; |
||||
} |
||||
|
||||
Poll<absl::optional<T>> PollNext(ObservableVersion* version_seen) { |
||||
MutexLock lock(&mu_); |
||||
if (!NextValueReady(version_seen)) return Pending(); |
||||
return value_; |
||||
} |
||||
|
||||
Poll<absl::optional<T>> PollWatch(ObservableVersion* version_seen) { |
||||
if (*version_seen == kTombstoneVersion) return Pending(); |
||||
|
||||
MutexLock lock(&mu_); |
||||
if (!NextValueReady(version_seen)) return Pending(); |
||||
// Watch needs to be woken up if the value changes even if it's ready now.
|
||||
waiters_.AddPending(Activity::current()->MakeNonOwningWaker()); |
||||
return value_; |
||||
} |
||||
|
||||
private: |
||||
// Returns true if an initial value is set.
|
||||
// If one is not set, add ourselves as pending to waiters_, and return false.
|
||||
bool Started() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
||||
if (!value_.has_value()) { |
||||
if (version_ != kTombstoneVersion) { |
||||
// We allow initial no-value, which does not indicate closure.
|
||||
waiters_.AddPending(Activity::current()->MakeNonOwningWaker()); |
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
// If no value is ready, add ourselves as pending to waiters_ and return
|
||||
// false.
|
||||
// If the next value is ready, update the last version seen and return true.
|
||||
bool NextValueReady(ObservableVersion* version_seen) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
||||
if (!Started()) return false; |
||||
if (version_ == *version_seen) { |
||||
waiters_.AddPending(Activity::current()->MakeNonOwningWaker()); |
||||
return false; |
||||
} |
||||
*version_seen = version_; |
||||
return true; |
||||
} |
||||
|
||||
Mutex mu_; |
||||
WaitSet waiters_ ABSL_GUARDED_BY(mu_); |
||||
ObservableVersion version_ ABSL_GUARDED_BY(mu_) = 1; |
||||
absl::optional<T> value_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
// Promise implementation for Observer::Get.
|
||||
template <typename T> |
||||
class ObservableGet { |
||||
public: |
||||
ObservableGet(ObservableVersion* version_seen, ObservableState<T>* state) |
||||
: version_seen_(version_seen), state_(state) {} |
||||
|
||||
Poll<absl::optional<T>> operator()() { |
||||
return state_->PollGet(version_seen_); |
||||
} |
||||
|
||||
private: |
||||
ObservableVersion* version_seen_; |
||||
ObservableState<T>* state_; |
||||
}; |
||||
|
||||
// Promise implementation for Observer::Next.
|
||||
template <typename T> |
||||
class ObservableNext { |
||||
public: |
||||
ObservableNext(ObservableVersion* version_seen, ObservableState<T>* state) |
||||
: version_seen_(version_seen), state_(state) {} |
||||
|
||||
Poll<absl::optional<T>> operator()() { |
||||
return state_->PollNext(version_seen_); |
||||
} |
||||
|
||||
private: |
||||
ObservableVersion* version_seen_; |
||||
ObservableState<T>* state_; |
||||
}; |
||||
|
||||
template <typename T, typename F> |
||||
class ObservableWatch final : private WatchCommitter { |
||||
private: |
||||
using Promise = PromiseLike<decltype(std::declval<F>()( |
||||
std::declval<T>(), std::declval<WatchCommitter*>()))>; |
||||
using Result = typename Promise::Result; |
||||
|
||||
public: |
||||
explicit ObservableWatch(F factory, std::shared_ptr<ObservableState<T>> state) |
||||
: state_(std::move(state)), factory_(std::move(factory)) {} |
||||
ObservableWatch(const ObservableWatch&) = delete; |
||||
ObservableWatch& operator=(const ObservableWatch&) = delete; |
||||
ObservableWatch(ObservableWatch&& other) noexcept |
||||
: state_(std::move(other.state_)), |
||||
promise_(std::move(other.promise_)), |
||||
factory_(std::move(other.factory_)) {} |
||||
ObservableWatch& operator=(ObservableWatch&&) noexcept = default; |
||||
|
||||
Poll<Result> operator()() { |
||||
auto r = state_->PollWatch(&version_seen_); |
||||
if (auto* p = r.value_if_ready()) { |
||||
if (p->has_value()) { |
||||
promise_ = Promise(factory_(std::move(**p), this)); |
||||
} else { |
||||
promise_ = {}; |
||||
} |
||||
} |
||||
if (promise_.has_value()) { |
||||
return (*promise_)(); |
||||
} else { |
||||
return Pending(); |
||||
} |
||||
} |
||||
|
||||
private: |
||||
std::shared_ptr<ObservableState<T>> state_; |
||||
absl::optional<Promise> promise_; |
||||
F factory_; |
||||
}; |
||||
|
||||
} // namespace promise_detail
|
||||
|
||||
template <typename T> |
||||
class Observable; |
||||
|
||||
// Observer watches an Observable for updates.
|
||||
// It can see either the latest value or wait for a new value, but is not
|
||||
// guaranteed to see every value pushed to the Observable.
|
||||
template <typename T> |
||||
class Observer { |
||||
public: |
||||
Observer(const Observer&) = delete; |
||||
Observer& operator=(const Observer&) = delete; |
||||
Observer(Observer&& other) noexcept |
||||
: version_seen_(other.version_seen_), state_(std::move(other.state_)) {} |
||||
Observer& operator=(Observer&& other) noexcept { |
||||
version_seen_ = other.version_seen_; |
||||
state_ = std::move(other.state_); |
||||
return *this; |
||||
} |
||||
|
||||
// Return a promise that will produce an optional<T>.
|
||||
// If the Observable is still present, this will be a value T, but if the
|
||||
// Observable has been closed, this will be nullopt. Borrows data from the
|
||||
// Observer, so this value must stay valid until the promise is resolved. Only
|
||||
// one Next, Get call is allowed to be outstanding at a time.
|
||||
promise_detail::ObservableGet<T> Get() { |
||||
return promise_detail::ObservableGet<T>{&version_seen_, &*state_}; |
||||
} |
||||
|
||||
// Return a promise that will produce the next unseen value as an optional<T>.
|
||||
// If the Observable is still present, this will be a value T, but if the
|
||||
// Observable has been closed, this will be nullopt. Borrows data from the
|
||||
// Observer, so this value must stay valid until the promise is resolved. Only
|
||||
// one Next, Get call is allowed to be outstanding at a time.
|
||||
promise_detail::ObservableNext<T> Next() { |
||||
return promise_detail::ObservableNext<T>{&version_seen_, &*state_}; |
||||
} |
||||
|
||||
private: |
||||
using State = promise_detail::ObservableState<T>; |
||||
friend class Observable<T>; |
||||
explicit Observer(std::shared_ptr<State> state) : state_(state) {} |
||||
promise_detail::ObservableVersion version_seen_ = 0; |
||||
std::shared_ptr<State> state_; |
||||
}; |
||||
|
||||
// Observable models a single writer multiple reader broadcast channel.
|
||||
// Readers can observe the latest value, or await a new latest value, but they
|
||||
// are not guaranteed to observe every value.
|
||||
template <typename T> |
||||
class Observable { |
||||
public: |
||||
Observable() : state_(std::make_shared<State>(absl::nullopt)) {} |
||||
explicit Observable(T value) |
||||
: state_(std::make_shared<State>(std::move(value))) {} |
||||
~Observable() { state_->Close(); } |
||||
Observable(const Observable&) = delete; |
||||
Observable& operator=(const Observable&) = delete; |
||||
|
||||
// Push a new value into the observable.
|
||||
void Push(T value) { state_->Push(std::move(value)); } |
||||
|
||||
// Create a new Observer - which can pull the current state from this
|
||||
// Observable.
|
||||
Observer<T> MakeObserver() { return Observer<T>(state_); } |
||||
|
||||
// Create a new Watch - a promise that pushes state into the passed in promise
|
||||
// factory. The promise factory takes two parameters - the current value and a
|
||||
// commit token. If the commit token is used (the Commit function on it is
|
||||
// called), then no further Watch updates are provided.
|
||||
template <typename F> |
||||
promise_detail::ObservableWatch<T, F> Watch(F f) { |
||||
return promise_detail::ObservableWatch<T, F>(std::move(f), state_); |
||||
} |
||||
|
||||
private: |
||||
using State = promise_detail::ObservableState<T>; |
||||
std::shared_ptr<State> state_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_PROMISE_OBSERVABLE_H
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,179 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/transport/batch_builder.h" |
||||
|
||||
#include <type_traits> |
||||
|
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/surface/call_trace.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
#include "src/core/lib/transport/transport_impl.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
BatchBuilder::BatchBuilder(grpc_transport_stream_op_batch_payload* payload) |
||||
: payload_(payload) {} |
||||
|
||||
void BatchBuilder::PendingCompletion::CompletionCallback( |
||||
void* self, grpc_error_handle error) { |
||||
auto* pc = static_cast<PendingCompletion*>(self); |
||||
auto* party = pc->batch->party.get(); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log( |
||||
GPR_DEBUG, "%s[connected] Finish batch-component %s for %s: status=%s", |
||||
party->DebugTag().c_str(), std::string(pc->name()).c_str(), |
||||
grpc_transport_stream_op_batch_string(&pc->batch->batch, false).c_str(), |
||||
error.ToString().c_str()); |
||||
} |
||||
party->Spawn( |
||||
"batch-completion", |
||||
[pc, error = std::move(error)]() mutable { |
||||
RefCountedPtr<Batch> batch = std::exchange(pc->batch, nullptr); |
||||
pc->done_latch.Set(std::move(error)); |
||||
return Empty{}; |
||||
}, |
||||
[](Empty) {}); |
||||
} |
||||
|
||||
BatchBuilder::PendingCompletion::PendingCompletion(RefCountedPtr<Batch> batch) |
||||
: batch(std::move(batch)) { |
||||
GRPC_CLOSURE_INIT(&on_done_closure, CompletionCallback, this, nullptr); |
||||
} |
||||
|
||||
BatchBuilder::Batch::Batch(grpc_transport_stream_op_batch_payload* payload, |
||||
grpc_stream_refcount* stream_refcount) |
||||
: party(static_cast<Party*>(Activity::current())->Ref()), |
||||
stream_refcount(stream_refcount) { |
||||
batch.payload = payload; |
||||
batch.is_traced = GetContext<CallContext>()->traced(); |
||||
#ifndef NDEBUG |
||||
grpc_stream_ref(stream_refcount, "pending-batch"); |
||||
#else |
||||
grpc_stream_ref(stream_refcount); |
||||
#endif |
||||
} |
||||
|
||||
BatchBuilder::Batch::~Batch() { |
||||
auto* arena = party->arena(); |
||||
if (pending_receive_message != nullptr) { |
||||
arena->DeletePooled(pending_receive_message); |
||||
} |
||||
if (pending_receive_initial_metadata != nullptr) { |
||||
arena->DeletePooled(pending_receive_initial_metadata); |
||||
} |
||||
if (pending_receive_trailing_metadata != nullptr) { |
||||
arena->DeletePooled(pending_receive_trailing_metadata); |
||||
} |
||||
if (pending_sends != nullptr) { |
||||
arena->DeletePooled(pending_sends); |
||||
} |
||||
if (batch.cancel_stream) { |
||||
arena->DeletePooled(batch.payload); |
||||
} |
||||
#ifndef NDEBUG |
||||
grpc_stream_unref(stream_refcount, "pending-batch"); |
||||
#else |
||||
grpc_stream_unref(stream_refcount); |
||||
#endif |
||||
} |
||||
|
||||
BatchBuilder::Batch* BatchBuilder::GetBatch(Target target) { |
||||
if (target_.has_value() && |
||||
(target_->stream != target.stream || |
||||
target.transport->vtable |
||||
->hacky_disable_stream_op_batch_coalescing_in_connected_channel)) { |
||||
FlushBatch(); |
||||
} |
||||
if (!target_.has_value()) { |
||||
target_ = target; |
||||
batch_ = GetContext<Arena>()->NewPooled<Batch>(payload_, |
||||
target_->stream_refcount); |
||||
} |
||||
GPR_ASSERT(batch_ != nullptr); |
||||
return batch_; |
||||
} |
||||
|
||||
void BatchBuilder::FlushBatch() { |
||||
GPR_ASSERT(batch_ != nullptr); |
||||
GPR_ASSERT(target_.has_value()); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log( |
||||
GPR_DEBUG, "%s[connected] Perform transport stream op batch: %p %s", |
||||
batch_->party->DebugTag().c_str(), &batch_->batch, |
||||
grpc_transport_stream_op_batch_string(&batch_->batch, false).c_str()); |
||||
} |
||||
std::exchange(batch_, nullptr)->PerformWith(*target_); |
||||
target_.reset(); |
||||
} |
||||
|
||||
void BatchBuilder::Batch::PerformWith(Target target) { |
||||
grpc_transport_perform_stream_op(target.transport, target.stream, &batch); |
||||
} |
||||
|
||||
ServerMetadataHandle BatchBuilder::CompleteSendServerTrailingMetadata( |
||||
ServerMetadataHandle sent_metadata, absl::Status send_result, |
||||
bool actually_sent) { |
||||
if (!send_result.ok()) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, |
||||
"%s[connected] Send metadata failed with error: %s, " |
||||
"fabricating trailing metadata", |
||||
Activity::current()->DebugTag().c_str(), |
||||
send_result.ToString().c_str()); |
||||
} |
||||
sent_metadata->Clear(); |
||||
sent_metadata->Set(GrpcStatusMetadata(), |
||||
static_cast<grpc_status_code>(send_result.code())); |
||||
sent_metadata->Set(GrpcMessageMetadata(), |
||||
Slice::FromCopiedString(send_result.message())); |
||||
sent_metadata->Set(GrpcCallWasCancelled(), true); |
||||
} |
||||
if (!sent_metadata->get(GrpcCallWasCancelled()).has_value()) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log( |
||||
GPR_DEBUG, |
||||
"%s[connected] Tagging trailing metadata with " |
||||
"cancellation status from transport: %s", |
||||
Activity::current()->DebugTag().c_str(), |
||||
actually_sent ? "sent => not-cancelled" : "not-sent => cancelled"); |
||||
} |
||||
sent_metadata->Set(GrpcCallWasCancelled(), !actually_sent); |
||||
} |
||||
return sent_metadata; |
||||
} |
||||
|
||||
BatchBuilder::Batch* BatchBuilder::MakeCancel( |
||||
grpc_stream_refcount* stream_refcount, absl::Status status) { |
||||
auto* arena = GetContext<Arena>(); |
||||
auto* payload = |
||||
arena->NewPooled<grpc_transport_stream_op_batch_payload>(nullptr); |
||||
auto* batch = arena->NewPooled<Batch>(payload, stream_refcount); |
||||
batch->batch.cancel_stream = true; |
||||
payload->cancel_stream.cancel_error = std::move(status); |
||||
return batch; |
||||
} |
||||
|
||||
void BatchBuilder::Cancel(Target target, absl::Status status) { |
||||
auto* batch = MakeCancel(target.stream_refcount, std::move(status)); |
||||
batch->batch.on_complete = NewClosure( |
||||
[batch](absl::Status) { batch->party->arena()->DeletePooled(batch); }); |
||||
batch->PerformWith(target); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,468 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_BATCH_BUILDER_H |
||||
#define GRPC_SRC_CORE_LIB_TRANSPORT_BATCH_BUILDER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/string_view.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/status.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/status_helper.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/latch.h" |
||||
#include "src/core/lib/promise/map.h" |
||||
#include "src/core/lib/promise/party.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/surface/call.h" |
||||
#include "src/core/lib/surface/call_trace.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
#include "src/core/lib/transport/transport_fwd.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Build up a transport stream op batch for a stream for a promise based
|
||||
// connected channel.
|
||||
// Offered as a context from Call, so that it can collect ALL the updates during
|
||||
// a single party round, and then push them down to the transport as a single
|
||||
// transaction.
|
||||
class BatchBuilder { |
||||
public: |
||||
explicit BatchBuilder(grpc_transport_stream_op_batch_payload* payload); |
||||
~BatchBuilder() { |
||||
if (batch_ != nullptr) FlushBatch(); |
||||
} |
||||
|
||||
struct Target { |
||||
grpc_transport* transport; |
||||
grpc_stream* stream; |
||||
grpc_stream_refcount* stream_refcount; |
||||
}; |
||||
|
||||
BatchBuilder(const BatchBuilder&) = delete; |
||||
BatchBuilder& operator=(const BatchBuilder&) = delete; |
||||
|
||||
// Returns a promise that will resolve to a Status when the send is completed.
|
||||
auto SendMessage(Target target, MessageHandle message); |
||||
|
||||
// Returns a promise that will resolve to a Status when the send is completed.
|
||||
auto SendClientInitialMetadata(Target target, ClientMetadataHandle metadata); |
||||
|
||||
// Returns a promise that will resolve to a Status when the send is completed.
|
||||
auto SendClientTrailingMetadata(Target target); |
||||
|
||||
// Returns a promise that will resolve to a Status when the send is completed.
|
||||
auto SendServerInitialMetadata(Target target, ServerMetadataHandle metadata); |
||||
|
||||
// Returns a promise that will resolve to a ServerMetadataHandle when the send
|
||||
// is completed.
|
||||
//
|
||||
// If convert_to_cancellation is true, then the status will be converted to a
|
||||
// cancellation batch instead of a trailing metadata op in a coalesced batch.
|
||||
//
|
||||
// This quirk exists as in the filter based stack upon which our transports
|
||||
// were written if a trailing metadata op were sent it always needed to be
|
||||
// paired with an initial op batch, and the transports would wait for the
|
||||
// initial metadata batch to arrive (in case of reordering up the stack).
|
||||
auto SendServerTrailingMetadata(Target target, ServerMetadataHandle metadata, |
||||
bool convert_to_cancellation); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<optional<MessageHandle>>
|
||||
// when a message is received.
|
||||
// Error => non-ok status
|
||||
// End of stream => Ok, nullopt (no message)
|
||||
// Message => Ok, message
|
||||
auto ReceiveMessage(Target target); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<ClientMetadataHandle>
|
||||
// when the receive is complete.
|
||||
auto ReceiveClientInitialMetadata(Target target); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<ClientMetadataHandle>
|
||||
// when the receive is complete.
|
||||
auto ReceiveClientTrailingMetadata(Target target); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<ServerMetadataHandle>
|
||||
// when the receive is complete.
|
||||
auto ReceiveServerInitialMetadata(Target target); |
||||
|
||||
// Returns a promise that will resolve to a StatusOr<ServerMetadataHandle>
|
||||
// when the receive is complete.
|
||||
auto ReceiveServerTrailingMetadata(Target target); |
||||
|
||||
// Send a cancellation: does not occupy the same payload, nor does it
|
||||
// coalesce with other ops.
|
||||
void Cancel(Target target, absl::Status status); |
||||
|
||||
private: |
||||
struct Batch; |
||||
|
||||
// Base pending operation
|
||||
struct PendingCompletion { |
||||
explicit PendingCompletion(RefCountedPtr<Batch> batch); |
||||
virtual absl::string_view name() const = 0; |
||||
static void CompletionCallback(void* self, grpc_error_handle error); |
||||
grpc_closure on_done_closure; |
||||
Latch<absl::Status> done_latch; |
||||
RefCountedPtr<Batch> batch; |
||||
|
||||
protected: |
||||
~PendingCompletion() = default; |
||||
}; |
||||
|
||||
// A pending receive message.
|
||||
struct PendingReceiveMessage final : public PendingCompletion { |
||||
using PendingCompletion::PendingCompletion; |
||||
|
||||
absl::string_view name() const override { return "receive_message"; } |
||||
|
||||
MessageHandle IntoMessageHandle() { |
||||
return GetContext<Arena>()->MakePooled<Message>(std::move(*payload), |
||||
flags); |
||||
} |
||||
|
||||
absl::optional<SliceBuffer> payload; |
||||
uint32_t flags; |
||||
}; |
||||
|
||||
// A pending receive metadata.
|
||||
struct PendingReceiveMetadata : public PendingCompletion { |
||||
using PendingCompletion::PendingCompletion; |
||||
|
||||
Arena::PoolPtr<grpc_metadata_batch> metadata = |
||||
GetContext<Arena>()->MakePooled<grpc_metadata_batch>( |
||||
GetContext<Arena>()); |
||||
|
||||
protected: |
||||
~PendingReceiveMetadata() = default; |
||||
}; |
||||
|
||||
struct PendingReceiveInitialMetadata final : public PendingReceiveMetadata { |
||||
using PendingReceiveMetadata::PendingReceiveMetadata; |
||||
absl::string_view name() const override { |
||||
return "receive_initial_metadata"; |
||||
} |
||||
}; |
||||
|
||||
struct PendingReceiveTrailingMetadata final : public PendingReceiveMetadata { |
||||
using PendingReceiveMetadata::PendingReceiveMetadata; |
||||
absl::string_view name() const override { |
||||
return "receive_trailing_metadata"; |
||||
} |
||||
}; |
||||
|
||||
// Pending sends in a batch
|
||||
struct PendingSends final : public PendingCompletion { |
||||
using PendingCompletion::PendingCompletion; |
||||
|
||||
absl::string_view name() const override { return "sends"; } |
||||
|
||||
MessageHandle send_message; |
||||
Arena::PoolPtr<grpc_metadata_batch> send_initial_metadata; |
||||
Arena::PoolPtr<grpc_metadata_batch> send_trailing_metadata; |
||||
bool trailing_metadata_sent = false; |
||||
}; |
||||
|
||||
// One outstanding batch.
|
||||
struct Batch final { |
||||
Batch(grpc_transport_stream_op_batch_payload* payload, |
||||
grpc_stream_refcount* stream_refcount); |
||||
~Batch(); |
||||
Batch(const Batch&) = delete; |
||||
Batch& operator=(const Batch&) = delete; |
||||
void IncrementRefCount() { ++refs; } |
||||
void Unref() { |
||||
if (--refs == 0) party->arena()->DeletePooled(this); |
||||
} |
||||
RefCountedPtr<Batch> Ref() { |
||||
IncrementRefCount(); |
||||
return RefCountedPtr<Batch>(this); |
||||
} |
||||
// Get an initialized pending completion.
|
||||
// There are four pending completions potentially contained within a batch.
|
||||
// They can be rather large so we don't create all of them always. Instead,
|
||||
// we dynamically create them on the arena as needed.
|
||||
// This method either returns the existing completion in a batch if that
|
||||
// completion has already been initialized, or it creates a new completion
|
||||
// and returns that.
|
||||
template <typename T> |
||||
T* GetInitializedCompletion(T*(Batch::*field)) { |
||||
if (this->*field != nullptr) return this->*field; |
||||
this->*field = party->arena()->NewPooled<T>(Ref()); |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s[connected] Add batch closure for %s @ %s", |
||||
Activity::current()->DebugTag().c_str(), |
||||
std::string((this->*field)->name()).c_str(), |
||||
(this->*field)->on_done_closure.DebugString().c_str()); |
||||
} |
||||
return this->*field; |
||||
} |
||||
// grpc_transport_perform_stream_op on target.stream
|
||||
void PerformWith(Target target); |
||||
// Take a promise, and return a promise that holds a ref on this batch until
|
||||
// the promise completes or is cancelled.
|
||||
template <typename P> |
||||
auto RefUntil(P promise) { |
||||
return [self = Ref(), promise = std::move(promise)]() mutable { |
||||
return promise(); |
||||
}; |
||||
} |
||||
|
||||
grpc_transport_stream_op_batch batch; |
||||
PendingReceiveMessage* pending_receive_message = nullptr; |
||||
PendingReceiveInitialMetadata* pending_receive_initial_metadata = nullptr; |
||||
PendingReceiveTrailingMetadata* pending_receive_trailing_metadata = nullptr; |
||||
PendingSends* pending_sends = nullptr; |
||||
const RefCountedPtr<Party> party; |
||||
grpc_stream_refcount* const stream_refcount; |
||||
uint8_t refs = 0; |
||||
}; |
||||
|
||||
// Get a batch for the given target.
|
||||
// Currently: if the current batch is for this target, return it - otherwise
|
||||
// flush the batch and start a new one (and return that).
|
||||
// This function may change in the future to allow multiple batches to be
|
||||
// building at once (if that turns out to be useful for hedging).
|
||||
Batch* GetBatch(Target target); |
||||
// Flush the current batch down to the transport.
|
||||
void FlushBatch(); |
||||
// Create a cancel batch with its own payload.
|
||||
Batch* MakeCancel(grpc_stream_refcount* stream_refcount, absl::Status status); |
||||
|
||||
// Note: we don't distinguish between client and server metadata here.
|
||||
// At the time of writing they're both the same thing - and it's unclear
|
||||
// whether we'll get to separate them prior to batches going away or not.
|
||||
// So for now we claim YAGNI and just do the simplest possible implementation.
|
||||
auto SendInitialMetadata(Target target, |
||||
Arena::PoolPtr<grpc_metadata_batch> md); |
||||
auto ReceiveInitialMetadata(Target target); |
||||
auto ReceiveTrailingMetadata(Target target); |
||||
|
||||
// Combine send status and server metadata into a final status to report back
|
||||
// to the containing call.
|
||||
static ServerMetadataHandle CompleteSendServerTrailingMetadata( |
||||
ServerMetadataHandle sent_metadata, absl::Status send_result, |
||||
bool actually_sent); |
||||
|
||||
grpc_transport_stream_op_batch_payload* const payload_; |
||||
absl::optional<Target> target_; |
||||
Batch* batch_ = nullptr; |
||||
}; |
||||
|
||||
inline auto BatchBuilder::SendMessage(Target target, MessageHandle message) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s[connected] Queue send message: %s", |
||||
Activity::current()->DebugTag().c_str(), |
||||
message->DebugString().c_str()); |
||||
} |
||||
auto* batch = GetBatch(target); |
||||
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
batch->batch.on_complete = &pc->on_done_closure; |
||||
batch->batch.send_message = true; |
||||
payload_->send_message.send_message = message->payload(); |
||||
payload_->send_message.flags = message->flags(); |
||||
pc->send_message = std::move(message); |
||||
return batch->RefUntil(pc->done_latch.WaitAndCopy()); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendInitialMetadata( |
||||
Target target, Arena::PoolPtr<grpc_metadata_batch> md) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s[connected] Queue send initial metadata: %s", |
||||
Activity::current()->DebugTag().c_str(), md->DebugString().c_str()); |
||||
} |
||||
auto* batch = GetBatch(target); |
||||
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
batch->batch.on_complete = &pc->on_done_closure; |
||||
batch->batch.send_initial_metadata = true; |
||||
payload_->send_initial_metadata.send_initial_metadata = md.get(); |
||||
pc->send_initial_metadata = std::move(md); |
||||
return batch->RefUntil(pc->done_latch.WaitAndCopy()); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendClientInitialMetadata( |
||||
Target target, ClientMetadataHandle metadata) { |
||||
return SendInitialMetadata(target, std::move(metadata)); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendClientTrailingMetadata(Target target) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s[connected] Queue send trailing metadata", |
||||
Activity::current()->DebugTag().c_str()); |
||||
} |
||||
auto* batch = GetBatch(target); |
||||
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
batch->batch.on_complete = &pc->on_done_closure; |
||||
batch->batch.send_trailing_metadata = true; |
||||
auto metadata = |
||||
GetContext<Arena>()->MakePooled<grpc_metadata_batch>(GetContext<Arena>()); |
||||
payload_->send_trailing_metadata.send_trailing_metadata = metadata.get(); |
||||
payload_->send_trailing_metadata.sent = nullptr; |
||||
pc->send_trailing_metadata = std::move(metadata); |
||||
return batch->RefUntil(pc->done_latch.WaitAndCopy()); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendServerInitialMetadata( |
||||
Target target, ServerMetadataHandle metadata) { |
||||
return SendInitialMetadata(target, std::move(metadata)); |
||||
} |
||||
|
||||
inline auto BatchBuilder::SendServerTrailingMetadata( |
||||
Target target, ServerMetadataHandle metadata, |
||||
bool convert_to_cancellation) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s[connected] %s: %s", |
||||
Activity::current()->DebugTag().c_str(), |
||||
convert_to_cancellation ? "Send trailing metadata as cancellation" |
||||
: "Queue send trailing metadata", |
||||
metadata->DebugString().c_str()); |
||||
} |
||||
Batch* batch; |
||||
PendingSends* pc; |
||||
if (convert_to_cancellation) { |
||||
const auto status_code = |
||||
metadata->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN); |
||||
auto status = grpc_error_set_int( |
||||
absl::Status(static_cast<absl::StatusCode>(status_code), |
||||
metadata->GetOrCreatePointer(GrpcMessageMetadata()) |
||||
->as_string_view()), |
||||
StatusIntProperty::kRpcStatus, status_code); |
||||
batch = MakeCancel(target.stream_refcount, std::move(status)); |
||||
pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
} else { |
||||
batch = GetBatch(target); |
||||
pc = batch->GetInitializedCompletion(&Batch::pending_sends); |
||||
batch->batch.send_trailing_metadata = true; |
||||
payload_->send_trailing_metadata.send_trailing_metadata = metadata.get(); |
||||
payload_->send_trailing_metadata.sent = &pc->trailing_metadata_sent; |
||||
} |
||||
batch->batch.on_complete = &pc->on_done_closure; |
||||
pc->send_trailing_metadata = std::move(metadata); |
||||
auto promise = batch->RefUntil( |
||||
Map(pc->done_latch.WaitAndCopy(), [pc](absl::Status status) { |
||||
return CompleteSendServerTrailingMetadata( |
||||
std::move(pc->send_trailing_metadata), std::move(status), |
||||
pc->trailing_metadata_sent); |
||||
})); |
||||
if (convert_to_cancellation) { |
||||
batch->PerformWith(target); |
||||
} |
||||
return promise; |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveMessage(Target target) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s[connected] Queue receive message", |
||||
Activity::current()->DebugTag().c_str()); |
||||
} |
||||
auto* batch = GetBatch(target); |
||||
auto* pc = batch->GetInitializedCompletion(&Batch::pending_receive_message); |
||||
batch->batch.recv_message = true; |
||||
payload_->recv_message.recv_message_ready = &pc->on_done_closure; |
||||
payload_->recv_message.recv_message = &pc->payload; |
||||
payload_->recv_message.flags = &pc->flags; |
||||
return batch->RefUntil( |
||||
Map(pc->done_latch.Wait(), |
||||
[pc](absl::Status status) |
||||
-> absl::StatusOr<absl::optional<MessageHandle>> { |
||||
if (!status.ok()) return status; |
||||
if (!pc->payload.has_value()) return absl::nullopt; |
||||
return pc->IntoMessageHandle(); |
||||
})); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveInitialMetadata(Target target) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s[connected] Queue receive initial metadata", |
||||
Activity::current()->DebugTag().c_str()); |
||||
} |
||||
auto* batch = GetBatch(target); |
||||
auto* pc = |
||||
batch->GetInitializedCompletion(&Batch::pending_receive_initial_metadata); |
||||
batch->batch.recv_initial_metadata = true; |
||||
payload_->recv_initial_metadata.recv_initial_metadata_ready = |
||||
&pc->on_done_closure; |
||||
payload_->recv_initial_metadata.recv_initial_metadata = pc->metadata.get(); |
||||
return batch->RefUntil( |
||||
Map(pc->done_latch.Wait(), |
||||
[pc](absl::Status status) -> absl::StatusOr<ClientMetadataHandle> { |
||||
if (!status.ok()) return status; |
||||
return std::move(pc->metadata); |
||||
})); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveClientInitialMetadata(Target target) { |
||||
return ReceiveInitialMetadata(target); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveServerInitialMetadata(Target target) { |
||||
return ReceiveInitialMetadata(target); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveTrailingMetadata(Target target) { |
||||
if (grpc_call_trace.enabled()) { |
||||
gpr_log(GPR_DEBUG, "%s[connected] Queue receive trailing metadata", |
||||
Activity::current()->DebugTag().c_str()); |
||||
} |
||||
auto* batch = GetBatch(target); |
||||
auto* pc = batch->GetInitializedCompletion( |
||||
&Batch::pending_receive_trailing_metadata); |
||||
batch->batch.recv_trailing_metadata = true; |
||||
payload_->recv_trailing_metadata.recv_trailing_metadata_ready = |
||||
&pc->on_done_closure; |
||||
payload_->recv_trailing_metadata.recv_trailing_metadata = pc->metadata.get(); |
||||
payload_->recv_trailing_metadata.collect_stats = |
||||
&GetContext<CallContext>()->call_stats()->transport_stream_stats; |
||||
return batch->RefUntil( |
||||
Map(pc->done_latch.Wait(), |
||||
[pc](absl::Status status) -> absl::StatusOr<ServerMetadataHandle> { |
||||
if (!status.ok()) return status; |
||||
return std::move(pc->metadata); |
||||
})); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveClientTrailingMetadata(Target target) { |
||||
return ReceiveTrailingMetadata(target); |
||||
} |
||||
|
||||
inline auto BatchBuilder::ReceiveServerTrailingMetadata(Target target) { |
||||
return ReceiveTrailingMetadata(target); |
||||
} |
||||
|
||||
template <> |
||||
struct ContextType<BatchBuilder> {}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_BATCH_BUILDER_H
|
@ -1,134 +0,0 @@ |
||||
// Copyright 2021 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/lib/promise/observable.h" |
||||
|
||||
#include <functional> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "gmock/gmock.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include "src/core/lib/promise/promise.h" |
||||
#include "src/core/lib/promise/seq.h" |
||||
#include "test/core/promise/test_wakeup_schedulers.h" |
||||
|
||||
using testing::MockFunction; |
||||
using testing::StrictMock; |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// A simple Barrier type: stalls progress until it is 'cleared'.
|
||||
class Barrier { |
||||
public: |
||||
struct Result {}; |
||||
|
||||
Promise<Result> Wait() { |
||||
return [this]() -> Poll<Result> { |
||||
MutexLock lock(&mu_); |
||||
if (cleared_) { |
||||
return Result{}; |
||||
} else { |
||||
return wait_set_.AddPending(Activity::current()->MakeOwningWaker()); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
void Clear() { |
||||
mu_.Lock(); |
||||
cleared_ = true; |
||||
auto wakeup = wait_set_.TakeWakeupSet(); |
||||
mu_.Unlock(); |
||||
wakeup.Wakeup(); |
||||
} |
||||
|
||||
private: |
||||
Mutex mu_; |
||||
WaitSet wait_set_ ABSL_GUARDED_BY(mu_); |
||||
bool cleared_ ABSL_GUARDED_BY(mu_) = false; |
||||
}; |
||||
|
||||
TEST(ObservableTest, CanPushAndGet) { |
||||
StrictMock<MockFunction<void(absl::Status)>> on_done; |
||||
Observable<int> observable; |
||||
auto observer = observable.MakeObserver(); |
||||
auto activity = MakeActivity( |
||||
[&observer]() { |
||||
return Seq(observer.Get(), [](absl::optional<int> i) { |
||||
return i == 42 ? absl::OkStatus() : absl::UnknownError("expected 42"); |
||||
}); |
||||
}, |
||||
InlineWakeupScheduler(), |
||||
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); |
||||
EXPECT_CALL(on_done, Call(absl::OkStatus())); |
||||
observable.Push(42); |
||||
} |
||||
|
||||
TEST(ObservableTest, CanNext) { |
||||
StrictMock<MockFunction<void(absl::Status)>> on_done; |
||||
Observable<int> observable; |
||||
auto observer = observable.MakeObserver(); |
||||
auto activity = MakeActivity( |
||||
[&observer]() { |
||||
return Seq( |
||||
observer.Get(), |
||||
[&observer](absl::optional<int> i) { |
||||
EXPECT_EQ(i, 42); |
||||
return observer.Next(); |
||||
}, |
||||
[](absl::optional<int> i) { |
||||
return i == 1 ? absl::OkStatus() |
||||
: absl::UnknownError("expected 1"); |
||||
}); |
||||
}, |
||||
InlineWakeupScheduler(), |
||||
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); |
||||
observable.Push(42); |
||||
EXPECT_CALL(on_done, Call(absl::OkStatus())); |
||||
observable.Push(1); |
||||
} |
||||
|
||||
TEST(ObservableTest, CanWatch) { |
||||
StrictMock<MockFunction<void(absl::Status)>> on_done; |
||||
Observable<int> observable; |
||||
Barrier barrier; |
||||
auto activity = MakeActivity( |
||||
[&observable, &barrier]() { |
||||
return observable.Watch( |
||||
[&barrier](int x, |
||||
WatchCommitter* committer) -> Promise<absl::Status> { |
||||
if (x == 3) { |
||||
committer->Commit(); |
||||
return Seq(barrier.Wait(), Immediate(absl::OkStatus())); |
||||
} else { |
||||
return Never<absl::Status>(); |
||||
} |
||||
}); |
||||
}, |
||||
InlineWakeupScheduler(), |
||||
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); |
||||
observable.Push(1); |
||||
observable.Push(2); |
||||
observable.Push(3); |
||||
observable.Push(4); |
||||
EXPECT_CALL(on_done, Call(absl::OkStatus())); |
||||
barrier.Clear(); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
@ -0,0 +1,132 @@ |
||||
#!/usr/bin/env python3 |
||||
|
||||
# Copyright 2023 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
# USAGE: |
||||
# Run some tests with the GRPC_ARENA_TRACE_POOLED_ALLOCATIONS #define turned on. |
||||
# Capture the output to a text file. |
||||
# Invoke this program with that as an argument, and let it work its magic. |
||||
|
||||
import collections |
||||
import heapq |
||||
import random |
||||
import re |
||||
import sys |
||||
|
||||
# A single allocation, negative size => free |
||||
Allocation = collections.namedtuple('Allocation', 'size ptr') |
||||
Active = collections.namedtuple('Active', 'id size') |
||||
|
||||
# Read through all the captures, and build up scrubbed traces |
||||
arenas = [] |
||||
building = collections.defaultdict(list) |
||||
active = {} |
||||
biggest = 0 |
||||
smallest = 1024 |
||||
sizes = set() |
||||
for filename in sys.argv[1:]: |
||||
for line in open(filename): |
||||
m = re.search(r'ARENA 0x([0-9a-f]+) ALLOC ([0-9]+) @ 0x([0-9a-f]+)', |
||||
line) |
||||
if m: |
||||
size = int(m.group(2)) |
||||
if size > biggest: |
||||
biggest = size |
||||
if size < smallest: |
||||
smallest = size |
||||
active[m.group(3)] = Active(m.group(1), size) |
||||
building[m.group(1)].append(size) |
||||
sizes.add(size) |
||||
m = re.search(r'FREE 0x([0-9a-f]+)', line) |
||||
if m: |
||||
# We may have spurious frees, so make sure there's an outstanding allocation |
||||
last = active.pop(m.group(1), None) |
||||
if last is not None: |
||||
building[last.id].append(-last.size) |
||||
m = re.search(r'DESTRUCT_ARENA 0x([0-9a-f]+)', line) |
||||
if m: |
||||
trace = building.pop(m.group(1), None) |
||||
if trace: |
||||
arenas.append(trace) |
||||
|
||||
|
||||
# Given a list of pool sizes, return which bucket an allocation should go into |
||||
def bucket(pool_sizes, size): |
||||
for bucket in sorted(pool_sizes): |
||||
if abs(size) <= bucket: |
||||
return bucket |
||||
|
||||
|
||||
# Given a list of pool sizes, determine the total outstanding bytes in the arena for once trace |
||||
def outstanding_bytes(pool_sizes, trace): |
||||
free_list = collections.defaultdict(int) |
||||
allocated = 0 |
||||
for size in trace: |
||||
b = bucket(pool_sizes, size) |
||||
if size < 0: |
||||
free_list[b] += 1 |
||||
else: |
||||
if free_list[b] > 0: |
||||
free_list[b] -= 1 |
||||
else: |
||||
allocated += b |
||||
return allocated + len(pool_sizes) * 8 |
||||
|
||||
|
||||
# Given a list of pool sizes, determine the maximum outstanding bytes for any seen trace |
||||
def measure(pool_sizes): |
||||
max_outstanding = 0 |
||||
for trace in arenas: |
||||
max_outstanding = max(max_outstanding, |
||||
outstanding_bytes(pool_sizes, trace)) |
||||
return max_outstanding |
||||
|
||||
|
||||
ALWAYS_INCLUDE = 1024 |
||||
best = [ALWAYS_INCLUDE, biggest] |
||||
best_measure = measure(best) |
||||
|
||||
testq = [] |
||||
step = 0 |
||||
|
||||
|
||||
def add(l): |
||||
global testq, best_measure, best |
||||
m = measure(l) |
||||
if m < best_measure: |
||||
best_measure = m |
||||
best = l |
||||
if l[-1] == smallest: |
||||
return |
||||
heapq.heappush(testq, (m, l)) |
||||
|
||||
|
||||
add(best) |
||||
|
||||
while testq: |
||||
top = heapq.heappop(testq)[1] |
||||
m = measure(top) |
||||
step += 1 |
||||
if step % 1000 == 0: |
||||
print("iter %d; pending=%d; top=%r/%d" % |
||||
(step, len(testq), top, measure(top))) |
||||
for i in sizes: |
||||
if i >= top[-1]: |
||||
continue |
||||
add(top + [i]) |
||||
|
||||
print("SAW SIZES: %r" % sorted(list(sizes))) |
||||
print("BEST: %r" % list(reversed(best))) |
||||
print("BEST MEASURE: %d" % best_measure) |
Loading…
Reference in new issue