Move client-idle-filter to promises (#28838)

* Promise based sleep

* Embrace absl::Status

* Automated change: Fix sanity tests

* Add another test, fix bug

* fix

* fix

* Move client-idle-filter to promises

* better call counting

* x

* x

* x

* review feedback

* progress

* Automated change: Fix sanity tests

* progress

* Automated change: Fix sanity tests

* fix

* channel stack refcount

* remove op from lock, refcount

* compiles

* Automated change: Fix sanity tests

* nicer api

* fix

* Automated change: Fix sanity tests

* fix

* speculative fix for windows

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/28907/head
Craig Tiller 3 years ago committed by GitHub
parent 95c12d88c3
commit 0f424ae5cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      BUILD
  2. 4
      CMakeLists.txt
  3. 2
      Makefile
  4. 14
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 6
      gRPC-C++.podspec
  8. 7
      gRPC-Core.podspec
  9. 4
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 4
      package.xml
  12. 237
      src/core/ext/filters/client_idle/client_idle_filter.cc
  13. 4
      src/core/ext/filters/client_idle/idle_filter_state.h
  14. 2
      src/core/ext/filters/http/client_authority_filter.cc
  15. 6
      src/core/ext/filters/http/client_authority_filter.h
  16. 21
      src/core/lib/channel/channel_stack.h
  17. 65
      src/core/lib/channel/promise_based_filter.h
  18. 42
      src/core/lib/promise/loop.h
  19. 2
      src/core/lib/security/authorization/grpc_server_authz_filter.cc
  20. 7
      src/core/lib/security/authorization/grpc_server_authz_filter.h
  21. 4
      src/core/lib/transport/transport.cc
  22. 1
      src/python/grpcio/grpc_core_dependencies.py
  23. 19
      test/core/filters/client_authority_filter_test.cc
  24. 4
      tools/doxygen/Doxyfile.c++.internal
  25. 4
      tools/doxygen/Doxyfile.core.internal

@ -1188,6 +1188,10 @@ grpc_cc_library(
grpc_cc_library(
name = "loop",
external_deps = [
"absl/types:variant",
"absl/status:statusor",
],
language = "c++",
public_hdrs = [
"src/core/lib/promise/loop.h",
@ -2477,10 +2481,15 @@ grpc_cc_library(
"src/core/ext/filters/client_idle/client_idle_filter.cc",
],
deps = [
"capture",
"config",
"exec_ctx_wakeup_scheduler",
"gpr_base",
"grpc_base",
"idle_filter_state",
"loop",
"sleep",
"try_seq",
],
)

4
CMakeLists.txt generated

@ -2091,6 +2091,7 @@ add_library(grpc
src/core/lib/json/json_writer.cc
src/core/lib/matchers/matchers.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/sleep.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
src/core/lib/resolver/server_address.cc
@ -2730,6 +2731,7 @@ add_library(grpc_unsecure
src/core/lib/json/json_util.cc
src/core/lib/json/json_writer.cc
src/core/lib/promise/activity.cc
src/core/lib/promise/sleep.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
src/core/lib/resolver/server_address.cc
@ -12798,6 +12800,7 @@ target_include_directories(loop_test
target_link_libraries(loop_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::statusor
absl::variant
)
@ -15195,7 +15198,6 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(sleep_test
src/core/lib/promise/sleep.cc
test/core/promise/sleep_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc

2
Makefile generated

@ -1538,6 +1538,7 @@ LIBGRPC_SRC = \
src/core/lib/json/json_writer.cc \
src/core/lib/matchers/matchers.cc \
src/core/lib/promise/activity.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver_registry.cc \
src/core/lib/resolver/server_address.cc \
@ -2024,6 +2025,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/json/json_util.cc \
src/core/lib/json/json_writer.cc \
src/core/lib/promise/activity.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver_registry.cc \
src/core/lib/resolver/server_address.cc \

@ -836,6 +836,7 @@ libs:
- src/core/lib/event_engine/sockaddr.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/capture.h
- src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/dual_ref_counted.h
@ -939,6 +940,8 @@ libs:
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/sleep.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
- src/core/lib/resolver/resolver_registry.h
@ -1585,6 +1588,7 @@ libs:
- src/core/lib/json/json_writer.cc
- src/core/lib/matchers/matchers.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/sleep.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
- src/core/lib/resolver/server_address.cc
@ -1999,6 +2003,7 @@ libs:
- src/core/lib/event_engine/sockaddr.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/capture.h
- src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/dual_ref_counted.h
@ -2100,6 +2105,8 @@ libs:
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
- src/core/lib/promise/sleep.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
- src/core/lib/resolver/resolver_registry.h
@ -2400,6 +2407,7 @@ libs:
- src/core/lib/json/json_util.cc
- src/core/lib/json/json_writer.cc
- src/core/lib/promise/activity.cc
- src/core/lib/promise/sleep.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
- src/core/lib/resolver/server_address.cc
@ -6712,6 +6720,7 @@ targets:
src:
- test/core/promise/loop_test.cc
deps:
- absl/status:statusor
- absl/types:variant
uses_polling: false
- name: match_test
@ -7718,10 +7727,8 @@ targets:
build: test
language: c++
headers:
- src/core/lib/promise/sleep.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/promise/sleep.cc
- test/core/promise/sleep_test.cc
deps:
- grpc
@ -8205,8 +8212,7 @@ targets:
gtest: true
build: test
language: c++
headers:
- src/core/lib/promise/try_seq.h
headers: []
src:
- test/core/promise/try_seq_metadata_test.cc
deps:

1
config.m4 generated

@ -600,6 +600,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/profiling/basic_timers.cc \
src/core/lib/profiling/stap_timers.cc \
src/core/lib/promise/activity.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver_registry.cc \
src/core/lib/resolver/server_address.cc \

1
config.w32 generated

@ -566,6 +566,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\profiling\\basic_timers.cc " +
"src\\core\\lib\\profiling\\stap_timers.cc " +
"src\\core\\lib\\promise\\activity.cc " +
"src\\core\\lib\\promise\\sleep.cc " +
"src\\core\\lib\\resolver\\resolver.cc " +
"src\\core\\lib\\resolver\\resolver_registry.cc " +
"src\\core\\lib\\resolver\\server_address.cc " +

6
gRPC-C++.podspec generated

@ -670,6 +670,7 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/useful.h',
'src/core/lib/gprpp/atomic_utils.h',
'src/core/lib/gprpp/bitset.h',
'src/core/lib/gprpp/capture.h',
'src/core/lib/gprpp/chunked_vector.h',
'src/core/lib/gprpp/construct_destruct.h',
'src/core/lib/gprpp/cpp_impl_of.h',
@ -791,6 +792,8 @@ Pod::Spec.new do |s|
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
'src/core/lib/resolver/resolver_registry.h',
@ -1462,6 +1465,7 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/useful.h',
'src/core/lib/gprpp/atomic_utils.h',
'src/core/lib/gprpp/bitset.h',
'src/core/lib/gprpp/capture.h',
'src/core/lib/gprpp/chunked_vector.h',
'src/core/lib/gprpp/construct_destruct.h',
'src/core/lib/gprpp/cpp_impl_of.h',
@ -1583,6 +1587,8 @@ Pod::Spec.new do |s|
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
'src/core/lib/resolver/resolver_registry.h',

7
gRPC-Core.podspec generated

@ -1061,6 +1061,7 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/wrap_memcpy.cc',
'src/core/lib/gprpp/atomic_utils.h',
'src/core/lib/gprpp/bitset.h',
'src/core/lib/gprpp/capture.h',
'src/core/lib/gprpp/chunked_vector.h',
'src/core/lib/gprpp/construct_destruct.h',
'src/core/lib/gprpp/cpp_impl_of.h',
@ -1290,6 +1291,9 @@ Pod::Spec.new do |s|
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.cc',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
@ -2051,6 +2055,7 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/useful.h',
'src/core/lib/gprpp/atomic_utils.h',
'src/core/lib/gprpp/bitset.h',
'src/core/lib/gprpp/capture.h',
'src/core/lib/gprpp/chunked_vector.h',
'src/core/lib/gprpp/construct_destruct.h',
'src/core/lib/gprpp/cpp_impl_of.h',
@ -2172,6 +2177,8 @@ Pod::Spec.new do |s|
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
'src/core/lib/resolver/resolver_registry.h',

4
grpc.gemspec generated

@ -980,6 +980,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gpr/wrap_memcpy.cc )
s.files += %w( src/core/lib/gprpp/atomic_utils.h )
s.files += %w( src/core/lib/gprpp/bitset.h )
s.files += %w( src/core/lib/gprpp/capture.h )
s.files += %w( src/core/lib/gprpp/chunked_vector.h )
s.files += %w( src/core/lib/gprpp/construct_destruct.h )
s.files += %w( src/core/lib/gprpp/cpp_impl_of.h )
@ -1209,6 +1210,9 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/promise.h )
s.files += %w( src/core/lib/promise/race.h )
s.files += %w( src/core/lib/promise/seq.h )
s.files += %w( src/core/lib/promise/sleep.cc )
s.files += %w( src/core/lib/promise/sleep.h )
s.files += %w( src/core/lib/promise/try_seq.h )
s.files += %w( src/core/lib/resolver/resolver.cc )
s.files += %w( src/core/lib/resolver/resolver.h )
s.files += %w( src/core/lib/resolver/resolver_factory.h )

2
grpc.gyp generated

@ -994,6 +994,7 @@
'src/core/lib/json/json_writer.cc',
'src/core/lib/matchers/matchers.cc',
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',
'src/core/lib/resolver/server_address.cc',
@ -1453,6 +1454,7 @@
'src/core/lib/json/json_util.cc',
'src/core/lib/json/json_writer.cc',
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',
'src/core/lib/resolver/server_address.cc',

4
package.xml generated

@ -960,6 +960,7 @@
<file baseinstalldir="/" name="src/core/lib/gpr/wrap_memcpy.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/atomic_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/bitset.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/capture.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/chunked_vector.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/construct_destruct.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/cpp_impl_of.h" role="src" />
@ -1189,6 +1190,9 @@
<file baseinstalldir="/" name="src/core/lib/promise/promise.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/race.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/seq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/sleep.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/sleep.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/try_seq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/resolver/resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/resolver/resolver.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/resolver/resolver_factory.h" role="src" />

@ -25,8 +25,14 @@
#include "src/core/ext/filters/client_idle/idle_filter_state.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/capture.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/transport/http2_errors.h"
// TODO(juanlishen): The idle filter is disabled in client channel by default
@ -56,194 +62,125 @@ grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) {
MIN_IDLE_TIMEOUT_MS);
}
class ChannelData {
class ClientIdleFilter : public ChannelFilter {
public:
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* args);
static void Destroy(grpc_channel_element* elem);
static absl::StatusOr<ClientIdleFilter> Create(
const grpc_channel_args* args, ChannelFilter::Args filter_args);
~ClientIdleFilter() override = default;
static void StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op);
ClientIdleFilter(const ClientIdleFilter&) = delete;
ClientIdleFilter& operator=(const ClientIdleFilter&) = delete;
ClientIdleFilter(ClientIdleFilter&&) = default;
ClientIdleFilter& operator=(ClientIdleFilter&&) = default;
void IncreaseCallCount();
// Construct a promise for one call.
ArenaPromise<TrailingMetadata> MakeCallPromise(
ClientInitialMetadata initial_metadata,
NextPromiseFactory next_promise_factory) override;
void DecreaseCallCount();
bool StartTransportOp(grpc_transport_op* op) override;
private:
ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args,
grpc_error_handle* error);
~ChannelData() = default;
static void IdleTimerCallback(void* arg, grpc_error_handle error);
static void IdleTransportOpCompleteCallback(void* arg,
grpc_error_handle error);
ClientIdleFilter(grpc_channel_stack* channel_stack,
grpc_millis client_idle_timeout)
: channel_stack_(channel_stack),
client_idle_timeout_(client_idle_timeout) {}
void StartIdleTimer();
void EnterIdle();
void IncreaseCallCount();
void DecreaseCallCount();
struct CallCountDecreaser {
void operator()(ClientIdleFilter* filter) const {
filter->DecreaseCallCount();
}
};
grpc_channel_element* elem_;
// The channel stack to which we take refs for pending callbacks.
grpc_channel_stack* channel_stack_;
// Timeout after the last RPC finishes on the client channel at which the
// channel goes back into IDLE state.
const grpc_millis client_idle_timeout_;
// Member data used to track the state of channel.
IdleFilterState idle_filter_state_{false};
grpc_millis client_idle_timeout_;
std::shared_ptr<IdleFilterState> idle_filter_state_{
std::make_shared<IdleFilterState>(false)};
// Idle timer and its callback closure.
grpc_timer idle_timer_;
grpc_closure idle_timer_callback_;
// The transport op telling the client channel to enter IDLE.
grpc_transport_op idle_transport_op_;
grpc_closure idle_transport_op_complete_callback_;
ActivityPtr activity_;
};
grpc_error_handle ChannelData::Init(grpc_channel_element* elem,
grpc_channel_element_args* args) {
grpc_error_handle error = GRPC_ERROR_NONE;
new (elem->channel_data) ChannelData(elem, args, &error);
return error;
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create(
const grpc_channel_args* args, ChannelFilter::Args filter_args) {
ClientIdleFilter filter(filter_args.channel_stack(),
GetClientIdleTimeout(args));
return absl::StatusOr<ClientIdleFilter>(std::move(filter));
}
void ChannelData::Destroy(grpc_channel_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
// Construct a promise for one call.
ArenaPromise<TrailingMetadata> ClientIdleFilter::MakeCallPromise(
ClientInitialMetadata initial_metadata,
NextPromiseFactory next_promise_factory) {
using Decrementer = std::unique_ptr<ClientIdleFilter, CallCountDecreaser>;
IncreaseCallCount();
return ArenaPromise<TrailingMetadata>(Capture(
[](Decrementer*, ArenaPromise<TrailingMetadata>* next)
-> Poll<TrailingMetadata> { return (*next)(); },
Decrementer(this), next_promise_factory(std::move(initial_metadata))));
}
void ChannelData::StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
bool ClientIdleFilter::StartTransportOp(grpc_transport_op* op) {
// Catch the disconnect_with_error transport op.
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
// IncreaseCallCount() introduces a phony call and prevent the timer from
// being reset by other threads.
chand->IncreaseCallCount();
// If the timer has been set, cancel the timer.
// No synchronization issues here. grpc_timer_cancel() is valid as long as
// the timer has been init()ed before.
grpc_timer_cancel(&chand->idle_timer_);
IncreaseCallCount();
activity_.reset();
}
// Pass the op to the next filter.
grpc_channel_next_op(elem, op);
return false;
}
void ChannelData::IncreaseCallCount() {
idle_filter_state_.IncreaseCallCount();
void ClientIdleFilter::IncreaseCallCount() {
idle_filter_state_->IncreaseCallCount();
}
void ChannelData::DecreaseCallCount() {
if (idle_filter_state_.DecreaseCallCount()) {
void ClientIdleFilter::DecreaseCallCount() {
if (idle_filter_state_->DecreaseCallCount()) {
// If there are no more calls in progress, start the idle timer.
StartIdleTimer();
}
}
ChannelData::ChannelData(grpc_channel_element* elem,
grpc_channel_element_args* args,
grpc_error_handle* /*error*/)
: elem_(elem),
channel_stack_(args->channel_stack),
client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) {
// If the idle filter is explicitly disabled in channel args, this ctor should
// not get called.
GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE);
GRPC_IDLE_FILTER_LOG("created with max_leisure_time = %" PRId64 " ms",
client_idle_timeout_);
// Initialize the idle timer without setting it.
grpc_timer_init_unset(&idle_timer_);
// Initialize the idle timer callback closure.
GRPC_CLOSURE_INIT(&idle_timer_callback_, IdleTimerCallback, this,
grpc_schedule_on_exec_ctx);
// Initialize the idle transport op complete callback.
GRPC_CLOSURE_INIT(&idle_transport_op_complete_callback_,
IdleTransportOpCompleteCallback, this,
grpc_schedule_on_exec_ctx);
}
void ChannelData::IdleTimerCallback(void* arg, grpc_error_handle error) {
GRPC_IDLE_FILTER_LOG("timer alarms");
ChannelData* chand = static_cast<ChannelData*>(arg);
if (error != GRPC_ERROR_NONE) {
GRPC_IDLE_FILTER_LOG("timer canceled");
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
return;
}
if (chand->idle_filter_state_.CheckTimer()) {
chand->StartIdleTimer();
} else {
chand->EnterIdle();
}
GRPC_IDLE_FILTER_LOG("timer finishes");
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
}
void ChannelData::IdleTransportOpCompleteCallback(void* arg,
grpc_error_handle /*error*/) {
ChannelData* chand = static_cast<ChannelData*>(arg);
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "idle transport op");
}
void ChannelData::StartIdleTimer() {
void ClientIdleFilter::StartIdleTimer() {
GRPC_IDLE_FILTER_LOG("timer has started");
auto idle_filter_state = idle_filter_state_;
// Hold a ref to the channel stack for the timer callback.
GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback");
grpc_timer_init(&idle_timer_, ExecCtx::Get()->Now() + client_idle_timeout_,
&idle_timer_callback_);
}
void ChannelData::EnterIdle() {
GRPC_IDLE_FILTER_LOG("the channel will enter IDLE");
// Hold a ref to the channel stack for the transport op.
GRPC_CHANNEL_STACK_REF(channel_stack_, "idle transport op");
// Initialize the transport op.
idle_transport_op_ = {};
idle_transport_op_.disconnect_with_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"),
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE);
idle_transport_op_.on_consumed = &idle_transport_op_complete_callback_;
// Pass the transport op down to the channel stack.
grpc_channel_next_op(elem_, &idle_transport_op_);
}
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args);
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure);
};
grpc_error_handle CallData::Init(grpc_call_element* elem,
const grpc_call_element_args* /*args*/) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->IncreaseCallCount();
return GRPC_ERROR_NONE;
}
void CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->DecreaseCallCount();
auto channel_stack = channel_stack_->Ref();
auto timeout = client_idle_timeout_;
auto promise = Loop([timeout, idle_filter_state]() {
return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout),
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> {
if (idle_filter_state->CheckTimer()) {
return Continue{};
} else {
return absl::OkStatus();
}
});
});
activity_ = MakeActivity(
std::move(promise), ExecCtxWakeupScheduler{},
[channel_stack](absl::Status status) {
if (!status.ok()) return;
auto* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"),
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE);
// Pass the transport op down to the channel stack.
auto* elem = grpc_channel_stack_element(channel_stack.get(), 0);
elem->filter->start_transport_op(elem, op);
});
}
const grpc_channel_filter grpc_client_idle_filter = {
grpc_call_next_op,
nullptr,
ChannelData::StartTransportOp,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(ChannelData),
ChannelData::Init,
ChannelData::Destroy,
grpc_channel_next_get_info,
"client_idle"};
const grpc_channel_filter grpc_client_idle_filter =
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>(
"client_idle");
} // namespace

@ -29,8 +29,8 @@ class IdleFilterState {
explicit IdleFilterState(bool start_timer);
~IdleFilterState() = default;
IdleFilterState(const IdleFilterState&);
IdleFilterState& operator=(const IdleFilterState&);
IdleFilterState(const IdleFilterState&) = delete;
IdleFilterState& operator=(const IdleFilterState&) = delete;
// Increment the number of calls in progress.
void IncreaseCallCount();

@ -40,7 +40,7 @@
namespace grpc_core {
absl::StatusOr<ClientAuthorityFilter> ClientAuthorityFilter::Create(
const grpc_channel_args* args) {
const grpc_channel_args* args, ChannelFilter::Args) {
const grpc_arg* default_authority_arg =
grpc_channel_args_find(args, GRPC_ARG_DEFAULT_AUTHORITY);
if (default_authority_arg == nullptr) {

@ -31,15 +31,15 @@
namespace grpc_core {
class ClientAuthorityFilter {
class ClientAuthorityFilter final : public ChannelFilter {
public:
static absl::StatusOr<ClientAuthorityFilter> Create(
const grpc_channel_args* args);
const grpc_channel_args* args, ChannelFilter::Args);
// Construct a promise for one call.
ArenaPromise<TrailingMetadata> MakeCallPromise(
ClientInitialMetadata initial_metadata,
NextPromiseFactory next_promise_factory);
NextPromiseFactory next_promise_factory) override;
private:
explicit ClientAuthorityFilter(Slice default_authority)

@ -56,6 +56,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/resource_quota/arena.h"
@ -201,6 +202,18 @@ struct grpc_channel_stack {
/* Memory required for a call stack (computed at channel stack
initialization) */
size_t call_stack_size;
// Minimal infrastructure to act like a RefCounted thing without converting
// everything.
// It's likely that we'll want to replace grpc_channel_stack with something
// less regimented once the promise conversion completes, so avoiding doing a
// full C++-ification for now.
void IncrementRefCount();
void Unref();
grpc_core::RefCountedPtr<grpc_channel_stack> Ref() {
IncrementRefCount();
return grpc_core::RefCountedPtr<grpc_channel_stack>(this);
}
};
/* A call stack tracks a set of related filters for one call, and guarantees
@ -286,6 +299,14 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
} while (0);
#endif
inline void grpc_channel_stack::IncrementRefCount() {
GRPC_CHANNEL_STACK_REF(this, "smart_pointer");
}
inline void grpc_channel_stack::Unref() {
GRPC_CHANNEL_STACK_UNREF(this, "smart_pointer");
}
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack* stack,
const grpc_call_final_info* final_info,

@ -35,6 +35,37 @@
namespace grpc_core {
class ChannelFilter {
public:
class Args {
public:
Args() : Args(nullptr) {}
explicit Args(grpc_channel_stack* channel_stack)
: channel_stack_(channel_stack) {}
grpc_channel_stack* channel_stack() const { return channel_stack_; }
private:
friend class ChannelFilter;
grpc_channel_stack* channel_stack_;
};
// Construct a promise for one call.
virtual ArenaPromise<TrailingMetadata> MakeCallPromise(
ClientInitialMetadata initial_metadata,
NextPromiseFactory next_promise_factory) = 0;
// Start a legacy transport op
// Return true if the op was handled, false if it should be passed to the
// next filter.
// TODO(ctiller): design a new API for this - we probably don't want big op
// structures going forward.
virtual bool StartTransportOp(grpc_transport_op*) { return false; }
protected:
virtual ~ChannelFilter() = default;
};
// Designator for whether a filter is client side or server side.
// Please don't use this outside calls to MakePromiseBasedFilter - it's intended
// to be deleted once the promise conversion is complete.
@ -666,19 +697,18 @@ class CallData<ChannelFilter, FilterEndpoint::kServer> : public BaseCallData {
} // namespace promise_filter_detail
// ChannelFilter contains the following:
// class SomeChannelFilter {
// F implements ChannelFilter and :
// class SomeChannelFilter : public ChannelFilter {
// public:
// static absl::StatusOr<SomeChannelFilter> Create(
// const grpc_channel_args* args);
// ArenaPromise<TrailingMetadata> MakeCallPromise(
// InitialMetadata* initial_metadata, NextPromiseFactory next_promise);
// ChannelFilter::Args filter_args);
// };
// TODO(ctiller): allow implementing get_channel_info, start_transport_op in
// some way on ChannelFilter.
template <typename ChannelFilter, FilterEndpoint kEndpoint>
grpc_channel_filter MakePromiseBasedFilter(const char* name) {
using CallData = promise_filter_detail::CallData<ChannelFilter, kEndpoint>;
template <typename F, FilterEndpoint kEndpoint>
absl::enable_if_t<std::is_base_of<ChannelFilter, F>::value, grpc_channel_filter>
MakePromiseBasedFilter(const char* name) {
using CallData = promise_filter_detail::CallData<F, kEndpoint>;
return grpc_channel_filter{
// start_transport_stream_op_batch
@ -688,12 +718,16 @@ grpc_channel_filter MakePromiseBasedFilter(const char* name) {
// make_call_promise
[](grpc_channel_element* elem, ClientInitialMetadata initial_metadata,
NextPromiseFactory next_promise_factory) {
return static_cast<ChannelFilter*>(elem->channel_data)
return static_cast<F*>(elem->channel_data)
->MakeCallPromise(std::move(initial_metadata),
std::move(next_promise_factory));
},
// start_transport_op - for now unsupported
grpc_channel_next_op,
// start_transport_op
[](grpc_channel_element* elem, grpc_transport_op* op) {
if (!static_cast<F*>(elem->channel_data)->StartTransportOp(op)) {
grpc_channel_next_op(elem, op);
}
},
// sizeof_call_data
sizeof(CallData),
// init_call_elem
@ -708,18 +742,19 @@ grpc_channel_filter MakePromiseBasedFilter(const char* name) {
static_cast<CallData*>(elem->call_data)->~CallData();
},
// sizeof_channel_data
sizeof(ChannelFilter),
sizeof(F),
// init_channel_elem
[](grpc_channel_element* elem, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
auto status = ChannelFilter::Create(args->channel_args);
auto status = F::Create(args->channel_args,
ChannelFilter::Args(args->channel_stack));
if (!status.ok()) return absl_status_to_grpc_error(status.status());
new (elem->channel_data) ChannelFilter(std::move(*status));
new (elem->channel_data) F(std::move(*status));
return GRPC_ERROR_NONE;
},
// destroy_channel_elem
[](grpc_channel_element* elem) {
static_cast<ChannelFilter*>(elem->channel_data)->~ChannelFilter();
static_cast<F*>(elem->channel_data)->~F();
},
// get_channel_info
grpc_channel_next_get_info,

@ -20,6 +20,7 @@
#include <new>
#include <type_traits>
#include "absl/status/statusor.h"
#include "absl/types/variant.h"
#include "src/core/lib/promise/detail/promise_factory.h"
@ -44,20 +45,44 @@ struct LoopTraits;
template <typename T>
struct LoopTraits<LoopCtl<T>> {
using Result = T;
static LoopCtl<T> ToLoopCtl(LoopCtl<T> value) { return value; }
};
template <typename T>
struct LoopTraits<absl::StatusOr<LoopCtl<T>>> {
using Result = absl::StatusOr<T>;
static LoopCtl<Result> ToLoopCtl(absl::StatusOr<LoopCtl<T>> value) {
if (!value.ok()) return value.status();
const auto& inner = *value;
if (absl::holds_alternative<Continue>(inner)) return Continue{};
return absl::get<T>(inner);
}
};
template <>
struct LoopTraits<absl::StatusOr<LoopCtl<absl::Status>>> {
using Result = absl::Status;
static LoopCtl<Result> ToLoopCtl(
absl::StatusOr<LoopCtl<absl::Status>> value) {
if (!value.ok()) return value.status();
const auto& inner = *value;
if (absl::holds_alternative<Continue>(inner)) return Continue{};
return absl::get<absl::Status>(inner);
}
};
template <typename F>
class Loop {
private:
using Factory = promise_detail::PromiseFactory<void, F>;
using Promise = decltype(std::declval<Factory>().Repeated());
using PromiseResult = typename Promise::Result;
using PromiseType = decltype(std::declval<Factory>().Repeated());
using PromiseResult = typename PromiseType::Result;
public:
using Result = typename LoopTraits<PromiseResult>::Result;
explicit Loop(F f) : factory_(std::move(f)), promise_(factory_.Repeated()) {}
~Loop() { promise_.~Promise(); }
~Loop() { promise_.~PromiseType(); }
Loop(Loop&& loop) noexcept
: factory_(std::move(loop.factory_)),
@ -74,13 +99,14 @@ class Loop {
if (auto* p = absl::get_if<kPollReadyIdx>(&promise_result)) {
// - then if it's Continue, destroy the promise and recreate a new one
// from our factory.
if (absl::holds_alternative<Continue>(*p)) {
promise_.~Promise();
new (&promise_) Promise(factory_.Repeated());
auto lc = LoopTraits<PromiseResult>::ToLoopCtl(*p);
if (absl::holds_alternative<Continue>(lc)) {
promise_.~PromiseType();
new (&promise_) PromiseType(factory_.Repeated());
continue;
}
// - otherwise there's our result... return it out.
return absl::get<Result>(*p);
return absl::get<Result>(lc);
} else {
// Otherwise the inner promise was pending, so we are pending.
return Pending();
@ -90,7 +116,7 @@ class Loop {
private:
GPR_NO_UNIQUE_ADDRESS Factory factory_;
GPR_NO_UNIQUE_ADDRESS union { GPR_NO_UNIQUE_ADDRESS Promise promise_; };
GPR_NO_UNIQUE_ADDRESS union { GPR_NO_UNIQUE_ADDRESS PromiseType promise_; };
};
} // namespace promise_detail

@ -32,7 +32,7 @@ GrpcServerAuthzFilter::GrpcServerAuthzFilter(
provider_(std::move(provider)) {}
absl::StatusOr<GrpcServerAuthzFilter> GrpcServerAuthzFilter::Create(
const grpc_channel_args* args) {
const grpc_channel_args* args, ChannelFilter::Args) {
grpc_auth_context* auth_context = grpc_find_auth_context_in_args(args);
grpc_authorization_policy_provider* provider =
grpc_channel_args_find_pointer<grpc_authorization_policy_provider>(

@ -18,20 +18,21 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/security/authorization/authorization_policy_provider.h"
namespace grpc_core {
class GrpcServerAuthzFilter {
class GrpcServerAuthzFilter final : public ChannelFilter {
public:
static const grpc_channel_filter kFilterVtable;
static absl::StatusOr<GrpcServerAuthzFilter> Create(
const grpc_channel_args* args);
const grpc_channel_args* args, ChannelFilter::Args);
ArenaPromise<TrailingMetadata> MakeCallPromise(
ClientInitialMetadata initial_metadata,
NextPromiseFactory next_promise_factory);
NextPromiseFactory next_promise_factory) override;
private:
GrpcServerAuthzFilter(

@ -224,7 +224,9 @@ static void destroy_made_transport_stream_op(void* arg,
made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
grpc_closure* c = op->inner_on_complete;
delete op;
grpc_core::Closure::Run(DEBUG_LOCATION, c, GRPC_ERROR_REF(error));
if (c != nullptr) {
grpc_core::Closure::Run(DEBUG_LOCATION, c, GRPC_ERROR_REF(error));
}
}
grpc_transport_stream_op_batch* grpc_make_transport_stream_op(

@ -575,6 +575,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/profiling/basic_timers.cc',
'src/core/lib/profiling/stap_timers.cc',
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',
'src/core/lib/resolver/server_address.cc',

@ -40,26 +40,29 @@ class TestChannelArgs {
};
TEST(ClientAuthorityFilterTest, DefaultFails) {
EXPECT_FALSE(ClientAuthorityFilter::Create(nullptr).ok());
EXPECT_FALSE(
ClientAuthorityFilter::Create(nullptr, ChannelFilter::Args()).ok());
}
TEST(ClientAuthorityFilterTest, WithArgSucceeds) {
EXPECT_EQ(ClientAuthorityFilter::Create(
TestChannelArgs("foo.test.google.au").args())
.status(),
absl::OkStatus());
EXPECT_EQ(
ClientAuthorityFilter::Create(
TestChannelArgs("foo.test.google.au").args(), ChannelFilter::Args())
.status(),
absl::OkStatus());
}
TEST(ClientAuthorityFilterTest, NonStringArgFails) {
grpc_arg arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY), 123);
grpc_channel_args args = {1, &arg};
EXPECT_FALSE(ClientAuthorityFilter::Create(&args).ok());
EXPECT_FALSE(
ClientAuthorityFilter::Create(&args, ChannelFilter::Args()).ok());
}
TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndSetsAuthority) {
auto filter = *ClientAuthorityFilter::Create(
TestChannelArgs("foo.test.google.au").args());
TestChannelArgs("foo.test.google.au").args(), ChannelFilter::Args());
auto arena = MakeScopedArena(1024, g_memory_allocator);
grpc_metadata_batch initial_metadata_batch(arena.get());
grpc_metadata_batch trailing_metadata_batch(arena.get());
@ -85,7 +88,7 @@ TEST(ClientAuthorityFilterTest, PromiseCompletesImmediatelyAndSetsAuthority) {
TEST(ClientAuthorityFilterTest,
PromiseCompletesImmediatelyAndDoesNotClobberAlreadySetsAuthority) {
auto filter = *ClientAuthorityFilter::Create(
TestChannelArgs("foo.test.google.au").args());
TestChannelArgs("foo.test.google.au").args(), ChannelFilter::Args());
auto arena = MakeScopedArena(1024, g_memory_allocator);
grpc_metadata_batch initial_metadata_batch(arena.get());
grpc_metadata_batch trailing_metadata_batch(arena.get());

@ -1959,6 +1959,7 @@ src/core/lib/gpr/useful.h \
src/core/lib/gpr/wrap_memcpy.cc \
src/core/lib/gprpp/atomic_utils.h \
src/core/lib/gprpp/bitset.h \
src/core/lib/gprpp/capture.h \
src/core/lib/gprpp/chunked_vector.h \
src/core/lib/gprpp/construct_destruct.h \
src/core/lib/gprpp/cpp_impl_of.h \
@ -2188,6 +2189,9 @@ src/core/lib/promise/poll.h \
src/core/lib/promise/promise.h \
src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/sleep.h \
src/core/lib/promise/try_seq.h \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver.h \
src/core/lib/resolver/resolver_factory.h \

@ -1753,6 +1753,7 @@ src/core/lib/gpr/wrap_memcpy.cc \
src/core/lib/gprpp/README.md \
src/core/lib/gprpp/atomic_utils.h \
src/core/lib/gprpp/bitset.h \
src/core/lib/gprpp/capture.h \
src/core/lib/gprpp/chunked_vector.h \
src/core/lib/gprpp/construct_destruct.h \
src/core/lib/gprpp/cpp_impl_of.h \
@ -1983,6 +1984,9 @@ src/core/lib/promise/poll.h \
src/core/lib/promise/promise.h \
src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/sleep.h \
src/core/lib/promise/try_seq.h \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver.h \
src/core/lib/resolver/resolver_factory.h \

Loading…
Cancel
Save