[promises] Convert lame client (#29587)

* [promises] Convert lame client

* fixes

* fixes

* Automated change: Fix sanity tests

* cleanup

* fix

* fix

* review feedback

* fix

* no ok lame channels

* fix

* fix

* Update promise_based_filter.h

* [iwyu] Better script

* eliminate race

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/28758/head^2
Craig Tiller 3 years ago committed by GitHub
parent 3f2b3460a4
commit 35320cbc76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/filters/client_channel/channel_connectivity.cc
  2. 2
      src/core/ext/filters/client_channel/dynamic_filters.cc
  3. 2
      src/core/ext/xds/xds_client.cc
  4. 33
      src/core/lib/channel/promise_based_filter.cc
  5. 59
      src/core/lib/channel/promise_based_filter.h
  6. 2
      src/core/lib/surface/builtins.cc
  7. 130
      src/core/lib/surface/lame_client.cc
  8. 39
      src/core/lib/surface/lame_client.h
  9. 8
      test/core/filters/filter_fuzzer.cc
  10. 10
      tools/dockerfile/grpc_iwyu/iwyu.sh
  11. 1
      tools/run_tests/sanity/sanity_tests.yaml

@ -47,7 +47,7 @@ namespace {
bool IsLameChannel(Channel* channel) {
grpc_channel_element* elem =
grpc_channel_stack_last_element(channel->channel_stack());
return elem->filter == &grpc_lame_filter;
return elem->filter == &LameClientFilter::kFilter;
}
} // namespace

@ -177,7 +177,7 @@ RefCountedPtr<DynamicFilters> DynamicFilters::Create(
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(args, &error_arg, 1);
GRPC_ERROR_UNREF(error);
p = CreateChannelStack(new_args, {&grpc_lame_filter});
p = CreateChannelStack(new_args, {&LameClientFilter::kFilter});
GPR_ASSERT(p.second == GRPC_ERROR_NONE);
grpc_channel_args_destroy(new_args);
}

@ -555,7 +555,7 @@ namespace {
bool IsLameChannel(grpc_channel* channel) {
grpc_channel_element* elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
return elem->filter == &grpc_lame_filter;
return elem->filter == &LameClientFilter::kFilter;
}
} // namespace

@ -28,6 +28,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice.h"
namespace grpc_core {
@ -142,12 +143,26 @@ void BaseCallData::CapturedBatch::ResumeWith(Flusher* releaser) {
}
}
void BaseCallData::CapturedBatch::CompleteWith(Flusher* releaser) {
auto* batch = absl::exchange(batch_, nullptr);
GPR_ASSERT(batch != nullptr);
uintptr_t& refcnt = *RefCountField(batch);
if (refcnt == 0) return; // refcnt==0 ==> cancelled
if (--refcnt == 0) {
releaser->Complete(batch);
}
}
void BaseCallData::CapturedBatch::CancelWith(grpc_error_handle error,
Flusher* releaser) {
auto* batch = absl::exchange(batch_, nullptr);
GPR_ASSERT(batch != nullptr);
uintptr_t& refcnt = *RefCountField(batch);
if (refcnt == 0) return; // refcnt==0 ==> cancelled
if (refcnt == 0) {
// refcnt==0 ==> cancelled
GRPC_ERROR_UNREF(error);
return;
}
refcnt = 0;
releaser->Cancel(batch, error);
}
@ -495,7 +510,12 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) {
!batch->recv_initial_metadata && !batch->recv_message &&
!batch->recv_trailing_metadata);
Cancel(batch->payload->cancel_stream.cancel_error);
batch.ResumeWith(&flusher);
if (is_last()) {
GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
batch.CompleteWith(&flusher);
} else {
batch.ResumeWith(&flusher);
}
return;
}
@ -567,6 +587,8 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) {
recv_trailing_state_ = RecvTrailingState::kForwarded;
HookRecvTrailingMetadata(batch);
}
} else if (cancelled_error_ != GRPC_ERROR_NONE) {
batch.CancelWith(GRPC_ERROR_REF(cancelled_error_), &flusher);
}
if (batch.is_captured()) batch.ResumeWith(&flusher);
@ -950,7 +972,12 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
!batch->recv_trailing_metadata);
Cancel(GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error),
&flusher);
batch.ResumeWith(&flusher);
if (is_last()) {
GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
batch.CompleteWith(&flusher);
} else {
batch.ResumeWith(&flusher);
}
return;
}

@ -32,16 +32,19 @@
#include "absl/container/inlined_vector.h"
#include "absl/meta/type_traits.h"
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
@ -89,12 +92,18 @@ class ChannelFilter {
// structures going forward.
virtual bool StartTransportOp(grpc_transport_op*) { return false; }
// Perform a legacy get info call
// Return true if the op was handled, false if it should be passed to the
// next filter.
// TODO(ctiller): design a new API for this
virtual bool GetChannelInfo(const grpc_channel_info*) { return false; }
virtual ~ChannelFilter() = default;
};
// Designator for whether a filter is client side or server side.
// Please don't use this outside calls to MakePromiseBasedFilter - it's intended
// to be deleted once the promise conversion is complete.
// Please don't use this outside calls to MakePromiseBasedFilter - it's
// intended to be deleted once the promise conversion is complete.
enum class FilterEndpoint {
kClient,
kServer,
@ -102,11 +111,12 @@ enum class FilterEndpoint {
// Flags for MakePromiseBasedFilter.
static constexpr uint8_t kFilterExaminesServerInitialMetadata = 1;
static constexpr uint8_t kFilterIsLast = 2;
namespace promise_filter_detail {
// Proxy channel filter for initialization failure, since we must leave a valid
// filter in place.
// Proxy channel filter for initialization failure, since we must leave a
// valid filter in place.
class InvalidChannelFilter : public ChannelFilter {
public:
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
@ -169,6 +179,11 @@ class BaseCallData : public Activity, private Wakeable {
&call_closures_);
}
void Complete(grpc_transport_stream_op_batch* batch) {
call_closures_.Add(batch->on_complete, GRPC_ERROR_NONE,
"Flusher::Complete");
}
void AddClosure(grpc_closure* closure, grpc_error_handle error,
const char* reason) {
call_closures_.Add(closure, error, reason);
@ -197,8 +212,13 @@ class BaseCallData : public Activity, private Wakeable {
grpc_transport_stream_op_batch* operator->() { return batch_; }
bool is_captured() const { return batch_ != nullptr; }
// Resume processing this batch (releases one ref, passes it down the
// stack)
void ResumeWith(Flusher* releaser);
// Cancel this batch immediately (releases all refs)
void CancelWith(grpc_error_handle error, Flusher* releaser);
// Complete this batch (pass it up) assuming refs drop to zero
void CompleteWith(Flusher* releaser);
void Swap(CapturedBatch* other) { std::swap(batch_, other->batch_); }
@ -225,6 +245,11 @@ class BaseCallData : public Activity, private Wakeable {
return server_initial_metadata_latch_;
}
bool is_last() const {
return grpc_call_stack_element(call_stack_, call_stack_->count - 1) ==
elem_;
}
private:
// Wakeable implementation.
void Wakeup() final;
@ -272,7 +297,8 @@ class ClientCallData : public BaseCallData {
// Start state: no op seen
kInitial,
// We saw the op, and since it was bundled with send initial metadata, we
// queued it until the send initial metadata can be sent to the next filter.
// queued it until the send initial metadata can be sent to the next
// filter.
kQueued,
// We've forwarded the op to the next filter.
kForwarded,
@ -295,9 +321,9 @@ class ClientCallData : public BaseCallData {
// Begin running the promise - which will ultimately take some initial
// metadata and return some trailing metadata.
void StartPromise(Flusher* flusher);
// Interject our callback into the op batch for recv trailing metadata ready.
// Stash a pointer to the trailing metadata that will be filled in, so we can
// manipulate it later.
// Interject our callback into the op batch for recv trailing metadata
// ready. Stash a pointer to the trailing metadata that will be filled in,
// so we can manipulate it later.
void HookRecvTrailingMetadata(CapturedBatch batch);
// Construct a promise that will "call" the next filter.
// Effectively:
@ -454,8 +480,6 @@ class CallData<ChannelFilter, FilterEndpoint::kServer> : public ServerCallData {
// static absl::StatusOr<SomeChannelFilter> Create(
// ChannelArgs channel_args, ChannelFilter::Args filter_args);
// };
// TODO(ctiller): allow implementing get_channel_info, start_transport_op in
// some way on ChannelFilter.
template <typename F, FilterEndpoint kEndpoint, uint8_t kFlags = 0>
absl::enable_if_t<std::is_base_of<ChannelFilter, F>::value, grpc_channel_filter>
MakePromiseBasedFilter(const char* name) {
@ -493,15 +517,21 @@ MakePromiseBasedFilter(const char* name) {
},
// destroy_call_elem
[](grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_closure*) {
grpc_closure* then_schedule_closure) {
auto* cd = static_cast<CallData*>(elem->call_data);
cd->Finalize(final_info);
cd->~CallData();
if ((kFlags & kFilterIsLast) != 0) {
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
} else {
GPR_ASSERT(then_schedule_closure == nullptr);
}
},
// sizeof_channel_data
sizeof(F),
// init_channel_elem
[](grpc_channel_element* elem, grpc_channel_element_args* args) {
GPR_ASSERT(args->is_last == ((kFlags & kFilterIsLast) != 0));
auto status = F::Create(ChannelArgs::FromC(args->channel_args),
ChannelFilter::Args(args->channel_stack, elem));
if (!status.ok()) {
@ -524,7 +554,12 @@ MakePromiseBasedFilter(const char* name) {
static_cast<ChannelFilter*>(elem->channel_data)->~ChannelFilter();
},
// get_channel_info
grpc_channel_next_get_info,
[](grpc_channel_element* elem, const grpc_channel_info* info) {
if (!static_cast<ChannelFilter*>(elem->channel_data)
->GetChannelInfo(info)) {
grpc_channel_next_get_info(elem, info);
}
},
// name
name,
};

@ -41,7 +41,7 @@ void RegisterBuiltins(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_LAME_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
builder->AppendFilter(&grpc_lame_filter);
builder->AppendFilter(&LameClientFilter::kFilter);
return true;
});
builder->channel_init()->RegisterStage(

@ -21,9 +21,9 @@
#include "src/core/lib/surface/lame_client.h"
#include <memory>
#include <new>
#include <utility>
#include "absl/memory/memory.h"
#include "absl/status/statusor.h"
#include <grpc/grpc.h>
@ -33,17 +33,17 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_stack_type.h"
@ -57,49 +57,38 @@
namespace grpc_core {
namespace {
struct ChannelData {
explicit ChannelData(grpc_channel_element_args* args)
: state_tracker("lame_channel", GRPC_CHANNEL_SHUTDOWN) {
grpc_error_handle* err = grpc_channel_args_find_pointer<grpc_error_handle>(
args->channel_args, GRPC_ARG_LAME_FILTER_ERROR);
if (err != nullptr) error = GRPC_ERROR_REF(*err);
}
const grpc_channel_filter LameClientFilter::kFilter =
MakePromiseBasedFilter<LameClientFilter, FilterEndpoint::kClient,
kFilterIsLast>("lame-client");
~ChannelData() { GRPC_ERROR_UNREF(error); }
absl::StatusOr<LameClientFilter> LameClientFilter::Create(ChannelArgs args,
ChannelFilter::Args) {
return LameClientFilter(
*args.GetPointer<absl::Status>(GRPC_ARG_LAME_FILTER_ERROR));
}
grpc_error_handle error = GRPC_ERROR_NONE;
Mutex mu;
ConnectivityStateTracker state_tracker;
};
LameClientFilter::LameClientFilter(absl::Status error)
: error_(std::move(error)), state_(absl::make_unique<State>()) {}
struct CallData {
CallCombiner* call_combiner;
};
LameClientFilter::State::State()
: state_tracker("lame_client", GRPC_CHANNEL_SHUTDOWN) {}
void lame_start_transport_stream_op_batch(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
CallData* calld = static_cast<CallData*>(elem->call_data);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
grpc_transport_stream_op_batch_finish_with_failure(
op, GRPC_ERROR_REF(chand->error), calld->call_combiner);
ArenaPromise<ServerMetadataHandle> LameClientFilter::MakeCallPromise(
CallArgs, NextPromiseFactory) {
return Immediate(ServerMetadataHandle(error_));
}
void lame_get_channel_info(grpc_channel_element* /*elem*/,
const grpc_channel_info* /*channel_info*/) {}
bool LameClientFilter::GetChannelInfo(const grpc_channel_info*) { return true; }
void lame_start_transport_op(grpc_channel_element* elem,
grpc_transport_op* op) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
bool LameClientFilter::StartTransportOp(grpc_transport_op* op) {
{
MutexLock lock(&chand->mu);
MutexLock lock(&state_->mu);
if (op->start_connectivity_watch != nullptr) {
chand->state_tracker.AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
state_->state_tracker.AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
}
if (op->stop_connectivity_watch != nullptr) {
chand->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
state_->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
}
}
if (op->send_ping.on_initiate != nullptr) {
@ -114,50 +103,18 @@ void lame_start_transport_op(grpc_channel_element* elem,
if (op->on_consumed != nullptr) {
ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
}
return true;
}
grpc_error_handle lame_init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
void lame_destroy_call_elem(grpc_call_element* /*elem*/,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
}
grpc_error_handle lame_init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
new (elem->channel_data) ChannelData(args);
return GRPC_ERROR_NONE;
}
void lame_destroy_channel_elem(grpc_channel_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
}
namespace {
// Channel arg vtable for a grpc_error_handle.
void* ErrorCopy(void* p) {
grpc_error_handle* new_error = nullptr;
if (p != nullptr) {
grpc_error_handle* error = static_cast<grpc_error_handle*>(p);
new_error = new grpc_error_handle();
*new_error = GRPC_ERROR_REF(*error);
}
return new_error;
}
void ErrorDestroy(void* p) {
if (p != nullptr) {
grpc_error_handle* error = static_cast<grpc_error_handle*>(p);
GRPC_ERROR_UNREF(*error);
delete error;
}
return new absl::Status(*static_cast<absl::Status*>(p));
}
void ErrorDestroy(void* p) { delete static_cast<absl::Status*>(p); }
int ErrorCompare(void* p, void* q) { return QsortCompare(p, q); }
const grpc_arg_pointer_vtable kLameFilterErrorArgVtable = {
ErrorCopy, ErrorDestroy, ErrorCompare};
@ -171,24 +128,6 @@ grpc_arg MakeLameClientErrorArg(grpc_error_handle* error) {
} // namespace grpc_core
const grpc_channel_filter grpc_lame_filter = {
grpc_core::lame_start_transport_stream_op_batch,
nullptr,
grpc_core::lame_start_transport_op,
sizeof(grpc_core::CallData),
grpc_core::lame_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
grpc_core::lame_destroy_call_elem,
sizeof(grpc_core::ChannelData),
grpc_core::lame_init_channel_elem,
grpc_channel_stack_no_post_init,
grpc_core::lame_destroy_channel_elem,
grpc_core::lame_get_channel_info,
"lame-client",
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack*)((c) + 1))
grpc_channel* grpc_lame_client_channel_create(const char* target,
grpc_status_code error_code,
const char* error_message) {
@ -197,18 +136,15 @@ grpc_channel* grpc_lame_client_channel_create(const char* target,
"grpc_lame_client_channel_create(target=%s, error_code=%d, "
"error_message=%s)",
3, (target, (int)error_code, error_message));
grpc_error_handle error = grpc_error_set_str(
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
GRPC_ERROR_INT_GRPC_STATUS, error_code),
GRPC_ERROR_STR_GRPC_MESSAGE, error_message);
if (error_code == GRPC_STATUS_OK) error_code = GRPC_STATUS_UNKNOWN;
grpc_core::ChannelArgs args =
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.Set(GRPC_ARG_LAME_FILTER_ERROR,
grpc_core::ChannelArgs::Pointer(
new grpc_error_handle(error),
new absl::Status(static_cast<absl::StatusCode>(error_code),
error_message),
&grpc_core::kLameFilterErrorArgVtable));
auto channel = grpc_core::Channel::Create(target, std::move(args),
GRPC_CLIENT_LAME_CHANNEL, nullptr);

@ -21,16 +21,51 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
// Does NOT take ownership of error.
grpc_arg MakeLameClientErrorArg(grpc_error_handle* error);
} // namespace grpc_core
extern const grpc_channel_filter grpc_lame_filter;
// This filter becomes the entire channel stack for a channel that fails to be
// created. Every call returns failure.
class LameClientFilter : public ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<LameClientFilter> Create(
ChannelArgs args, ChannelFilter::Args filter_args);
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
bool StartTransportOp(grpc_transport_op*) override;
bool GetChannelInfo(const grpc_channel_info*) override;
private:
explicit LameClientFilter(absl::Status error);
absl::Status error_;
struct State {
State();
Mutex mu;
ConnectivityStateTracker state_tracker ABSL_GUARDED_BY(mu);
};
std::unique_ptr<State> state_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_SURFACE_LAME_CLIENT_H */

@ -324,11 +324,11 @@ class MainLoop {
static const grpc_channel_filter* BottomFilter(bool is_client) {
static const grpc_channel_filter client_filter =
MakePromiseBasedFilter<Call::BottomFilter, FilterEndpoint::kClient>(
"client-end");
MakePromiseBasedFilter<Call::BottomFilter, FilterEndpoint::kClient,
kFilterIsLast>("client-end");
static const grpc_channel_filter server_filter =
MakePromiseBasedFilter<Call::BottomFilter, FilterEndpoint::kServer>(
"server-end");
MakePromiseBasedFilter<Call::BottomFilter, FilterEndpoint::kServer,
kFilterIsLast>("server-end");
return is_client ? &client_filter : &server_filter;
}

@ -78,12 +78,4 @@ xargs -n 1 -P $CPU_COUNT -a iwyu_files.txt ${IWYU_ROOT}/iwyu/run_iwyu_on.sh
cat iwyu/iwyu.*.out > iwyu.out
# apply the suggested changes
${IWYU_ROOT}/iwyu/fix_includes.py --nocomments --nosafe_headers < iwyu.out || true
# reformat sources, since iwyu gets this wrong
xargs -a iwyu_files.txt ${CLANG_FORMAT:-clang-format} -i
# TODO(ctiller): expand this to match the clang-tidy directories:
# | grep -E "(^include/|^src/core/|^src/cpp/|^test/core/|^test/cpp/)"
git diff --exit-code > /dev/null
${IWYU_ROOT}/iwyu/fix_includes.py --nocomments --nosafe_headers < iwyu.out

@ -31,6 +31,7 @@
- script: tools/distrib/check_pytype.sh
- script: tools/codegen/core/gen_grpc_tls_credentials_options.py --test
- script: tools/distrib/clang_format_code.sh
cpu_cost: 1000
- script: tools/distrib/clang_tidy_code.sh
# ClangTidy needs to run exclusively because it uses files under the bazel output
# directory and this will be removed by another bazel invocation.

Loading…
Cancel
Save