Precondition ChannelArgs with EventEngines (#31166)

* Precondition ChannelArgs with EventEngines

If an EventEngine is not explicitly provided to ChannelArgs, the default
EventEngine will be set when ChannelArgs are preconditioned.

* channel_idle_filter: EE from channel_args

* grpclb: EE from channel_args

* weighted_target: ee from channel_args

* sanitize

* xds cluster manager

* posix native resolver: own an EE ref from iomgr initialization

* reviewer feedback

* reviewer feedback

* iwyu

* iwyu

* change ownership and remove unneeded methods

* clang_format and use consistent engine naming

* store EE ref in channel_stack and use it in channel idle filter

* don't store a separate shared_ptr in NativeDNSResolver

* add GetEventEngine() method to LB policy helper interface

* stop holding refs to the EE instance in LB policies

* clang-format

* change channel stack to get EE instance from channel args

* update XdsWrrLocalityLb

* fix lb_policy_test

* precondition channel_args in ServerBuilder and microbenchmark fixtures

* add required engine to channel_stack test

* sanitize

* dep fix

* add EE to filter fuzzer

* precondition BM_IsolatedFilter channelargs

* fix

* remove unused using statement

* iwyu again??

* remove preconditioning from C++ surface API

* fix bm_call_create

* Automated change: Fix sanity tests

* iwyu

* rm this->

* rm unused deps

* add internal EE arg macro

* precondition filter_fuzzer

* Automated change: Fix sanity tests

* iwyu

* ChannelStackBuilder requires preconditioned ChannelArgs

* iwyu

* iwyu again?

* rm build.SetChannelArgs; rm unused declaration

* fix nullptr string creation

Co-authored-by: Mark D. Roth <roth@google.com>
Co-authored-by: drfloob <drfloob@users.noreply.github.com>
pull/31462/head
AJ Heller 2 years ago committed by GitHub
parent c65348879a
commit 23c7e48779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 7
      src/core/BUILD
  3. 29
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  4. 15
      src/core/ext/filters/channel_idle/channel_idle_filter.h
  5. 5
      src/core/ext/filters/client_channel/client_channel.cc
  6. 3
      src/core/ext/filters/client_channel/dynamic_filters.cc
  7. 5
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
  8. 30
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  9. 7
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  10. 8
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  11. 7
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  12. 36
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  13. 6
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  14. 7
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  15. 45
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  16. 8
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  17. 9
      src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc
  18. 16
      src/core/ext/filters/client_channel/subchannel.cc
  19. 2
      src/core/ext/filters/client_channel/subchannel.h
  20. 17
      src/core/lib/channel/channel_args.h
  21. 4
      src/core/lib/channel/channel_stack.cc
  22. 10
      src/core/lib/channel/channel_stack.h
  23. 11
      src/core/lib/channel/channel_stack_builder.cc
  24. 10
      src/core/lib/channel/channel_stack_builder.h
  25. 17
      src/core/lib/event_engine/default_event_engine.cc
  26. 6
      src/core/lib/event_engine/default_event_engine.h
  27. 4
      src/core/lib/load_balancing/lb_policy.h
  28. 71
      src/core/lib/surface/channel.cc
  29. 6
      src/core/lib/surface/channel.h
  30. 7
      src/core/plugin_registry/grpc_plugin_registry.cc
  31. 31
      test/core/channel/channel_args_test.cc
  32. 3
      test/core/channel/channel_stack_builder_test.cc
  33. 24
      test/core/channel/channel_stack_test.cc
  34. 10
      test/core/channel/minimal_stack_is_minimal_test.cc
  35. 6
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  36. 10
      test/core/filters/filter_fuzzer.cc
  37. 21
      test/core/util/test_lb_policies.cc
  38. 8
      test/core/xds/xds_channel_stack_modifier_test.cc
  39. 14
      test/cpp/microbenchmarks/bm_call_create.cc
  40. 19
      test/cpp/microbenchmarks/fullstack_fixtures.h

@ -1794,6 +1794,7 @@ grpc_cc_library(
"absl/status:statusor",
"absl/strings",
"absl/synchronization",
"absl/types:optional",
"absl/memory",
"upb_lib",
"protobuf_headers",
@ -2670,7 +2671,6 @@ grpc_cc_library(
"//src/core:channel_init",
"//src/core:channel_stack_type",
"//src/core:construct_destruct",
"//src/core:default_event_engine",
"//src/core:dual_ref_counted",
"//src/core:env",
"//src/core:gpr_atm",

@ -1809,10 +1809,13 @@ grpc_cc_library(
],
external_deps = ["absl/functional:any_invocable"],
deps = [
"channel_args",
"channel_args_preconditioning",
"context",
"default_event_engine_factory",
"event_engine_trace",
"no_destruct",
"//:config",
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc_trace",
@ -2958,7 +2961,6 @@ grpc_cc_library(
"channel_init",
"channel_stack_type",
"closure",
"default_event_engine",
"exec_ctx_wakeup_scheduler",
"http2_errors",
"idle_filter_state",
@ -3189,7 +3191,6 @@ grpc_cc_library(
"channel_fwd",
"channel_init",
"channel_stack_type",
"default_event_engine",
"gpr_atm",
"grpc_sockaddr",
"json",
@ -3751,7 +3752,6 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"default_event_engine",
"grpc_resolver_xds_header",
"json",
"json_args",
@ -4076,7 +4076,6 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"default_event_engine",
"grpc_lb_address_filtering",
"json",
"json_args",

@ -26,7 +26,6 @@
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
@ -35,7 +34,6 @@
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/status_helper.h"
@ -56,8 +54,6 @@ namespace grpc_core {
namespace {
using ::grpc_event_engine::experimental::EventEngine;
// TODO(ctiller): The idle filter was disabled in client channel by default
// due to b/143502997. Now the bug is fixed enable the filter by default.
const auto kDefaultIdleTimeout = Duration::Infinity();
@ -125,19 +121,15 @@ struct MaxAgeFilter::Config {
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
// TODO(hork): pull EventEngine from args
ClientIdleFilter filter(
filter_args.channel_stack(), GetClientIdleTimeout(args),
grpc_event_engine::experimental::GetDefaultEventEngine());
ClientIdleFilter filter(filter_args.channel_stack(),
GetClientIdleTimeout(args));
return absl::StatusOr<ClientIdleFilter>(std::move(filter));
}
absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
// TODO(hork): pull EventEngine from args
MaxAgeFilter filter(filter_args.channel_stack(),
Config::FromChannelArgs(args),
grpc_event_engine::experimental::GetDefaultEventEngine());
Config::FromChannelArgs(args));
return absl::StatusOr<MaxAgeFilter>(std::move(filter));
}
@ -211,7 +203,7 @@ void MaxAgeFilter::PostInit() {
// (if it did not, it was cancelled)
if (status.ok()) CloseChannel();
},
engine_.get()));
channel_stack->EventEngine()));
}
}
@ -272,7 +264,7 @@ void ChannelIdleFilter::StartIdleTimer() {
[channel_stack, this](absl::Status status) {
if (status.ok()) CloseChannel();
},
engine_.get()));
channel_stack->EventEngine()));
}
void ChannelIdleFilter::CloseChannel() {
@ -314,13 +306,10 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
});
}
MaxAgeFilter::MaxAgeFilter(
grpc_channel_stack* channel_stack, const Config& max_age_config,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine)
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle,
engine),
MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack,
const Config& max_age_config)
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle),
max_connection_age_(max_age_config.max_connection_age),
max_connection_age_grace_(max_age_config.max_connection_age_grace),
engine_(engine) {}
max_connection_age_grace_(max_age_config.max_connection_age_grace) {}
} // namespace grpc_core

@ -22,7 +22,6 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include "src/core/ext/filters/channel_idle/idle_filter_state.h"
@ -59,12 +58,10 @@ class ChannelIdleFilter : public ChannelFilter {
using SingleSetActivityPtr =
SingleSetPtr<Activity, typename ActivityPtr::deleter_type>;
ChannelIdleFilter(
grpc_channel_stack* channel_stack, Duration client_idle_timeout,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine)
ChannelIdleFilter(grpc_channel_stack* channel_stack,
Duration client_idle_timeout)
: channel_stack_(channel_stack),
client_idle_timeout_(client_idle_timeout),
engine_(engine) {}
client_idle_timeout_(client_idle_timeout) {}
grpc_channel_stack* channel_stack() { return channel_stack_; };
@ -90,7 +87,6 @@ class ChannelIdleFilter : public ChannelFilter {
std::make_shared<IdleFilterState>(false)};
SingleSetActivityPtr activity_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};
class ClientIdleFilter final : public ChannelIdleFilter {
@ -131,16 +127,13 @@ class MaxAgeFilter final : public ChannelIdleFilter {
MaxAgeFilter* filter_;
};
MaxAgeFilter(
grpc_channel_stack* channel_stack, const Config& max_age_config,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine);
MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config);
void Shutdown() override;
SingleSetActivityPtr max_age_activity_;
Duration max_connection_age_;
Duration max_connection_age_grace_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};
} // namespace grpc_core

@ -37,6 +37,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/slice.h>
#include <grpc/status.h>
@ -915,6 +916,10 @@ class ClientChannel::ClientChannelControlHelper
return chand_->default_authority_;
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return chand_->owning_stack_->EventEngine();
}
void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
if (chand_->resolver_ == nullptr) return; // Shutting down.

@ -140,8 +140,7 @@ namespace {
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> CreateChannelStack(
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters) {
ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC);
builder.SetChannelArgs(args);
ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC, args);
for (auto filter : filters) {
builder.AppendFilter(filter);
}

@ -25,6 +25,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/support/log.h>
@ -107,6 +108,10 @@ class ChildPolicyHandler::Helper
return parent_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return parent_->channel_control_helper()->GetEventEngine();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
if (parent_->shutting_down_) return;

@ -105,7 +105,6 @@
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -303,7 +302,6 @@ class GrpcLb : public LoadBalancingPolicy {
bool client_load_report_is_due_ = false;
// The closure used for the completion of sending the load report.
grpc_closure client_load_report_done_closure_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};
class SubchannelWrapper : public DelegatingSubchannel {
@ -469,6 +467,7 @@ class GrpcLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -872,6 +871,10 @@ absl::string_view GrpcLb::Helper::GetAuthority() {
return parent_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine* GrpcLb::Helper::GetEventEngine() {
return parent_->channel_control_helper()->GetEventEngine();
}
void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (parent_->shutting_down_) return;
@ -887,8 +890,7 @@ GrpcLb::BalancerCallState::BalancerCallState(
: InternallyRefCounted<BalancerCallState>(
GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState"
: nullptr),
grpclb_policy_(std::move(parent_grpclb_policy)),
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
grpclb_policy_(std::move(parent_grpclb_policy)) {
GPR_ASSERT(grpclb_policy_ != nullptr);
GPR_ASSERT(!grpclb_policy()->shutting_down_);
// Init the LB call. Note that the LB call will progress every time there's
@ -946,7 +948,8 @@ void GrpcLb::BalancerCallState::Orphan() {
// call, then the following cancellation will be a no-op.
grpc_call_cancel_internal(lb_call_);
if (client_load_report_handle_.has_value() &&
engine_->Cancel(client_load_report_handle_.value())) {
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
client_load_report_handle_.value())) {
Unref(DEBUG_LOCATION, "client_load_report cancelled");
}
// Note that the initial ref is hold by lb_on_balancer_status_received_
@ -1032,12 +1035,13 @@ void GrpcLb::BalancerCallState::StartQuery() {
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
client_load_report_handle_ =
engine_->RunAfter(client_stats_report_interval_, [this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
grpclb_policy()->work_serializer()->Run(
[this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION);
});
grpclb_policy()->channel_control_helper()->GetEventEngine()->RunAfter(
client_stats_report_interval_, [this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
grpclb_policy()->work_serializer()->Run(
[this] { MaybeSendClientLoadReportLocked(); }, DEBUG_LOCATION);
});
}
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked() {
@ -1903,13 +1907,10 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
} // namespace
} // namespace grpc_core
//
// Plugin registration
//
namespace grpc_core {
void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<GrpcLbFactory>());
@ -1928,4 +1929,5 @@ void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
return true;
});
}
} // namespace grpc_core

@ -38,6 +38,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/support/log.h>
@ -345,6 +346,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -769,6 +771,11 @@ absl::string_view OutlierDetectionLb::Helper::GetAuthority() {
return outlier_detection_policy_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
OutlierDetectionLb::Helper::GetEventEngine() {
return outlier_detection_policy_->channel_control_helper()->GetEventEngine();
}
void OutlierDetectionLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (outlier_detection_policy_->shutting_down_) return;

@ -33,6 +33,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
@ -197,6 +198,7 @@ class PriorityLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -849,6 +851,12 @@ absl::string_view PriorityLb::ChildPriority::Helper::GetAuthority() {
return priority_->priority_policy_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
PriorityLb::ChildPriority::Helper::GetEventEngine() {
return priority_->priority_policy_->channel_control_helper()
->GetEventEngine();
}
void PriorityLb::ChildPriority::Helper::AddTraceEvent(
TraceSeverity severity, absl::string_view message) {
if (priority_->priority_policy_->shutting_down_) return;

@ -53,6 +53,7 @@
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
@ -335,6 +336,7 @@ class RlsLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -915,6 +917,11 @@ absl::string_view RlsLb::ChildPolicyWrapper::ChildPolicyHelper::GetAuthority() {
return wrapper_->lb_policy_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
RlsLb::ChildPolicyWrapper::ChildPolicyHelper::GetEventEngine() {
return wrapper_->lb_policy_->channel_control_helper()->GetEventEngine();
}
void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::AddTraceEvent(
TraceSeverity severity, absl::string_view message) {
if (wrapper_->is_shutdown_) return;

@ -43,7 +43,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -193,6 +192,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -212,7 +212,6 @@ class WeightedTargetLb : public LoadBalancingPolicy {
RefCountedPtr<WeightedChild> weighted_child_;
absl::optional<EventEngine::TaskHandle> timer_handle_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};
// Methods for dealing with the child policy.
@ -481,16 +480,17 @@ void WeightedTargetLb::UpdateStateLocked() {
WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
: weighted_child_(std::move(weighted_child)),
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
: weighted_child_(std::move(weighted_child)) {
timer_handle_ =
engine_->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
self->weighted_child_->weighted_target_policy_->work_serializer()->Run(
[self = std::move(self)] { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
weighted_child_->weighted_target_policy_->channel_control_helper()
->GetEventEngine()
->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
self->weighted_child_->weighted_target_policy_->work_serializer()
->Run([self = std::move(self)] { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
}
void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
@ -502,7 +502,9 @@ void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
weighted_child_->weighted_target_policy_.get(),
weighted_child_.get(), weighted_child_->name_.c_str());
}
engine_->Cancel(*timer_handle_);
weighted_child_->weighted_target_policy_->channel_control_helper()
->GetEventEngine()
->Cancel(*timer_handle_);
}
Unref();
}
@ -701,6 +703,12 @@ absl::string_view WeightedTargetLb::WeightedChild::Helper::GetAuthority() {
->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
WeightedTargetLb::WeightedChild::Helper::GetEventEngine() {
return weighted_child_->weighted_target_policy_->channel_control_helper()
->GetEventEngine();
}
void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent(
TraceSeverity severity, absl::string_view message) {
if (weighted_child_->weighted_target_policy_->shutting_down_) return;
@ -754,7 +762,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<WeightedTargetLb>(std::move(args));
} // namespace
}
absl::string_view name() const override { return kWeightedTarget; }
@ -771,7 +779,7 @@ class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
return LoadRefCountedFromJson<WeightedTargetLbConfig>(
json, JsonArgs(), "errors validating weighted_target LB policy config");
}
}; // namespace grpc_core
};
} // namespace

@ -30,6 +30,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/connectivity_state.h>
@ -176,6 +177,7 @@ class CdsLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -263,6 +265,10 @@ absl::string_view CdsLb::Helper::GetAuthority() {
return parent_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine* CdsLb::Helper::GetEventEngine() {
return parent_->channel_control_helper()->GetEventEngine();
}
void CdsLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (parent_->shutting_down_) return;

@ -34,6 +34,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/support/log.h>
@ -253,6 +254,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -679,6 +681,11 @@ absl::string_view XdsClusterImplLb::Helper::GetAuthority() {
return xds_cluster_impl_policy_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
XdsClusterImplLb::Helper::GetEventEngine() {
return xds_cluster_impl_policy_->channel_control_helper()->GetEventEngine();
}
void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (xds_cluster_impl_policy_->shutting_down_) return;

@ -42,7 +42,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -69,7 +68,6 @@ TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
namespace {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
constexpr absl::string_view kXdsClusterManager =
@ -197,6 +195,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -224,7 +223,6 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
// States for delayed removal.
absl::optional<EventEngine::TaskHandle> delayed_removal_timer_handle_;
bool shutdown_ = false;
std::shared_ptr<EventEngine> engine_;
};
~XdsClusterManagerLb() override;
@ -430,8 +428,7 @@ XdsClusterManagerLb::ClusterChild::ClusterChild(
RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
const std::string& name)
: xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
name_(name),
engine_(GetDefaultEventEngine()) {
name_(name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
@ -466,7 +463,9 @@ void XdsClusterManagerLb::ClusterChild::Orphan() {
// the child.
picker_wrapper_.reset();
if (delayed_removal_timer_handle_.has_value()) {
engine_->Cancel(*delayed_removal_timer_handle_);
xds_cluster_manager_policy_->channel_control_helper()
->GetEventEngine()
->Cancel(*delayed_removal_timer_handle_);
}
shutdown_ = true;
Unref();
@ -509,7 +508,9 @@ absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
// Update child weight.
// Reactivate if needed.
if (delayed_removal_timer_handle_.has_value() &&
engine_->Cancel(*delayed_removal_timer_handle_)) {
xds_cluster_manager_policy_->channel_control_helper()
->GetEventEngine()
->Cancel(*delayed_removal_timer_handle_)) {
delayed_removal_timer_handle_.reset();
}
// Create child policy if needed.
@ -544,17 +545,21 @@ void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
// If already deactivated, don't do that again.
if (delayed_removal_timer_handle_.has_value()) return;
// Set the child weight to 0 so that future picker won't contain this child.
// Start a timer to delete the child.
delayed_removal_timer_handle_ = engine_->RunAfter(
kChildRetentionInterval,
[self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
self->xds_cluster_manager_policy_->work_serializer()->Run(
[self = std::move(self)]() { self->OnDelayedRemovalTimerLocked(); },
DEBUG_LOCATION);
});
delayed_removal_timer_handle_ =
xds_cluster_manager_policy_->channel_control_helper()
->GetEventEngine()
->RunAfter(
kChildRetentionInterval,
[self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
self->xds_cluster_manager_policy_->work_serializer()->Run(
[self = std::move(self)]() {
self->OnDelayedRemovalTimerLocked();
},
DEBUG_LOCATION);
});
}
void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked() {
@ -625,6 +630,12 @@ absl::string_view XdsClusterManagerLb::ClusterChild::Helper::GetAuthority() {
->GetAuthority();
}
EventEngine* XdsClusterManagerLb::ClusterChild::Helper::GetEventEngine() {
return xds_cluster_manager_child_->xds_cluster_manager_policy_
->channel_control_helper()
->GetEventEngine();
}
void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent(
TraceSeverity severity, absl::string_view message) {
if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {

@ -34,6 +34,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
@ -378,6 +379,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
// client, which is a watch-based API.
void RequestReresolution() override {}
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -454,6 +456,12 @@ absl::string_view XdsClusterResolverLb::Helper::GetAuthority() {
return xds_cluster_resolver_policy_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
XdsClusterResolverLb::Helper::GetEventEngine() {
return xds_cluster_resolver_policy_->channel_control_helper()
->GetEventEngine();
}
void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (xds_cluster_resolver_policy_->shutting_down_) return;

@ -28,6 +28,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/support/log.h>
@ -129,6 +130,7 @@ class XdsWrrLocalityLb : public LoadBalancingPolicy {
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
grpc_event_engine::experimental::EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
@ -265,7 +267,7 @@ OrphanablePtr<LoadBalancingPolicy> XdsWrrLocalityLb::CreateChildPolicyLocked(
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
auto lb_policy =
CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
"weighted_target_experimental", std::move(lb_policy_args));
@ -313,6 +315,11 @@ absl::string_view XdsWrrLocalityLb::Helper::GetAuthority() {
return xds_wrr_locality_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine*
XdsWrrLocalityLb::Helper::GetEventEngine() {
return xds_wrr_locality_->channel_control_helper()->GetEventEngine();
}
void XdsWrrLocalityLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
xds_wrr_locality_->channel_control_helper()->AddTraceEvent(severity, message);

@ -43,7 +43,6 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
@ -51,7 +50,6 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -84,6 +82,8 @@
namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine;
TraceFlag grpc_trace_subchannel(false, "subchannel");
DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
@ -626,7 +626,7 @@ Subchannel::Subchannel(SubchannelKey key,
pollset_set_(grpc_pollset_set_create()),
connector_(std::move(connector)),
backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)),
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
event_engine_(args_.GetObjectRef<EventEngine>()) {
// A grpc_init is added here to ensure that grpc_shutdown does not happen
// until the subchannel is destroyed. Subchannels can persist longer than
// channels because they maybe reused/shared among multiple channels. As a
@ -763,7 +763,7 @@ void Subchannel::ResetBackoff() {
MutexLock lock(&mu_);
backoff_.Reset();
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
engine_->Cancel(retry_timer_handle_)) {
event_engine_->Cancel(retry_timer_handle_)) {
OnRetryTimerLocked();
} else if (state_ == GRPC_CHANNEL_CONNECTING) {
next_attempt_time_ = Timestamp::Now();
@ -912,7 +912,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
time_until_next_attempt.millis());
SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error));
retry_timer_handle_ = engine_->RunAfter(
retry_timer_handle_ = event_engine_->RunAfter(
time_until_next_attempt,
[self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
{
@ -933,9 +933,9 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
bool Subchannel::PublishTransportLocked() {
// Construct channel stack.
ChannelStackBuilderImpl builder("subchannel", GRPC_CLIENT_SUBCHANNEL);
builder.SetChannelArgs(connecting_result_.channel_args)
.SetTransport(connecting_result_.transport);
ChannelStackBuilderImpl builder("subchannel", GRPC_CLIENT_SUBCHANNEL,
connecting_result_.channel_args);
builder.SetTransport(connecting_result_.transport);
if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
return false;
}

@ -427,7 +427,7 @@ class Subchannel : public DualRefCounted<Subchannel> {
// Data producer map.
std::map<UniqueTypeName, DataProducerInterface*> data_producer_map_
ABSL_GUARDED_BY(mu_);
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
};
} // namespace grpc_core

@ -34,6 +34,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/avl/avl.h"
@ -45,6 +46,11 @@
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/surface/channel_stack_type.h"
// TODO(hork): When we're ready to allow setting via a channel arg from the
// application, replace this with a macro in
// include/grpc/impl/codegen/grpc_types.h.
#define GRPC_INTERNAL_ARG_EVENT_ENGINE "grpc.internal.event_engine"
// Channel args are intentionally immutable, to avoid the need for locking.
namespace grpc_core {
@ -170,6 +176,9 @@ template <typename T>
struct WrapInSharedPtr
: std::integral_constant<
bool, std::is_base_of<std::enable_shared_from_this<T>, T>::value> {};
template <>
struct WrapInSharedPtr<grpc_event_engine::experimental::EventEngine>
: std::true_type {};
template <typename T, typename Ignored = void /* for SFINAE */>
struct GetObjectImpl;
// std::shared_ptr implementation
@ -213,7 +222,13 @@ template <typename T>
struct ChannelArgNameTraits<std::shared_ptr<T>> {
static absl::string_view ChannelArgName() { return T::ChannelArgName(); }
};
// Specialization for the EventEngine
template <>
struct ChannelArgNameTraits<grpc_event_engine::experimental::EventEngine> {
static absl::string_view ChannelArgName() {
return GRPC_INTERNAL_ARG_EVENT_ENGINE;
}
};
class ChannelArgs {
public:
class Pointer {

@ -30,6 +30,8 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/alloc.h"
using grpc_event_engine::experimental::EventEngine;
grpc_core::TraceFlag grpc_trace_channel(false, "channel");
grpc_core::TraceFlag grpc_trace_channel_stack(false, "channel_stack");
@ -116,6 +118,7 @@ grpc_error_handle grpc_channel_stack_init(
}
stack->on_destroy.Init([]() {});
stack->event_engine.Init(channel_args.GetObjectRef<EventEngine>());
size_t call_size =
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
@ -175,6 +178,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack* stack) {
(*stack->on_destroy)();
stack->on_destroy.Destroy();
stack->event_engine.Destroy();
}
grpc_error_handle grpc_call_stack_init(

@ -49,7 +49,9 @@
#include <stddef.h>
#include <functional>
#include <memory>
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/slice.h>
@ -212,6 +214,14 @@ struct grpc_channel_stack {
// should look like and this can go.
grpc_core::ManualConstructor<std::function<void()>> on_destroy;
grpc_core::ManualConstructor<
std::shared_ptr<grpc_event_engine::experimental::EventEngine>>
event_engine;
grpc_event_engine::experimental::EventEngine* EventEngine() const {
return event_engine->get();
}
// Minimal infrastructure to act like a RefCounted thing without converting
// everything.
// It's likely that we'll want to replace grpc_channel_stack with something

@ -26,7 +26,10 @@
namespace grpc_core {
ChannelStackBuilder::~ChannelStackBuilder() = default;
ChannelStackBuilder::ChannelStackBuilder(const char* name,
grpc_channel_stack_type type,
const ChannelArgs& channel_args)
: name_(name), type_(type), args_(channel_args) {}
ChannelStackBuilder& ChannelStackBuilder::SetTarget(const char* target) {
if (target == nullptr) {
@ -37,12 +40,6 @@ ChannelStackBuilder& ChannelStackBuilder::SetTarget(const char* target) {
return *this;
}
ChannelStackBuilder& ChannelStackBuilder::SetChannelArgs(
const ChannelArgs& args) {
args_ = args;
return *this;
}
void ChannelStackBuilder::PrependFilter(const grpc_channel_filter* filter) {
stack_.insert(stack_.begin(), filter);
}

@ -41,8 +41,9 @@ namespace grpc_core {
class ChannelStackBuilder {
public:
// Initialize with a name.
ChannelStackBuilder(const char* name, grpc_channel_stack_type type)
: name_(name), type_(type) {}
// channel_args *must be* preconditioned already.
ChannelStackBuilder(const char* name, grpc_channel_stack_type type,
const ChannelArgs& channel_args);
const char* name() const { return name_; }
@ -62,9 +63,6 @@ class ChannelStackBuilder {
// Query the transport.
grpc_transport* transport() const { return transport_; }
// Set channel args.
ChannelStackBuilder& SetChannelArgs(const ChannelArgs& args);
// Query the channel args.
const ChannelArgs& channel_args() const { return args_; }
@ -98,7 +96,7 @@ class ChannelStackBuilder {
virtual absl::StatusOr<RefCountedPtr<grpc_channel_stack>> Build() = 0;
protected:
~ChannelStackBuilder();
~ChannelStackBuilder() = default;
private:
static std::string unknown_target() { return "unknown"; }

@ -23,6 +23,9 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine_factory.h"
#include "src/core/lib/event_engine/trace.h"
@ -74,5 +77,19 @@ std::shared_ptr<EventEngine> GetDefaultEventEngine() {
return engine;
}
namespace {
grpc_core::ChannelArgs EnsureEventEngineInChannelArgs(
grpc_core::ChannelArgs args) {
if (args.ContainsObject<EventEngine>()) return args;
return args.SetObject<EventEngine>(GetDefaultEventEngine());
}
} // namespace
void RegisterEventEngineChannelArgPreconditioning(
grpc_core::CoreConfiguration::Builder* builder) {
builder->channel_args_preconditioning()->RegisterStage(
grpc_event_engine::experimental::EnsureEventEngineInChannelArgs);
}
} // namespace experimental
} // namespace grpc_event_engine

@ -21,6 +21,7 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/promise/context.h"
namespace grpc_core {
@ -37,6 +38,11 @@ namespace experimental {
/// Strongly consider whether you could use \a CreateEventEngine instead.
std::shared_ptr<EventEngine> GetDefaultEventEngine();
/// On ingress, ensure that an EventEngine exists in channel args via
/// preconditioning.
void RegisterEventEngineChannelArgPreconditioning(
grpc_core::CoreConfiguration::Builder* builder);
} // namespace experimental
} // namespace grpc_event_engine

@ -33,6 +33,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
@ -291,6 +292,9 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Returns the channel authority.
virtual absl::string_view GetAuthority() = 0;
/// Returns the EventEngine to use for timers and async work.
virtual grpc_event_engine::experimental::EventEngine* GetEventEngine() = 0;
/// Adds a trace message associated with the channel.
enum TraceSeverity { TRACE_INFO, TRACE_WARNING, TRACE_ERROR };
virtual void AddTraceEvent(TraceSeverity severity,

@ -170,44 +170,12 @@ void channelz_node_destroy(void* p) {
int channelz_node_cmp(void* p1, void* p2) { return QsortCompare(p1, p2); }
const grpc_arg_pointer_vtable channelz_node_arg_vtable = {
channelz_node_copy, channelz_node_destroy, channelz_node_cmp};
void CreateChannelzNode(ChannelStackBuilder* builder) {
auto args = builder->channel_args();
// Check whether channelz is enabled.
const bool channelz_enabled = args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT);
if (!channelz_enabled) return;
// Get parameters needed to create the channelz node.
const size_t channel_tracer_max_memory = std::max(
0, args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
.value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT));
const bool is_internal_channel =
args.GetBool(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL).value_or(false);
// Create the channelz node.
std::string target(builder->target());
RefCountedPtr<channelz::ChannelNode> channelz_node =
MakeRefCounted<channelz::ChannelNode>(
target.c_str(), channel_tracer_max_memory, is_internal_channel);
channelz_node->AddTraceEvent(
channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
// Add channelz node to channel args.
// We remove the is_internal_channel arg, since we no longer need it.
builder->SetChannelArgs(
args.Remove(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL)
.Set(GRPC_ARG_CHANNELZ_CHANNEL_NODE,
ChannelArgs::Pointer(channelz_node.release(),
&channelz_node_arg_vtable)));
}
} // namespace
absl::StatusOr<RefCountedPtr<Channel>> Channel::Create(
const char* target, ChannelArgs args,
grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport) {
ChannelStackBuilderImpl builder(
grpc_channel_stack_type_string(channel_stack_type), channel_stack_type);
if (!args.GetString(GRPC_ARG_DEFAULT_AUTHORITY).has_value()) {
auto ssl_override = args.GetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
if (ssl_override.has_value()) {
@ -222,15 +190,42 @@ absl::StatusOr<RefCountedPtr<Channel>> Channel::Create(
args = channel_args_mutator(target, args, channel_stack_type);
}
}
builder.SetChannelArgs(args).SetTarget(target).SetTransport(
optional_transport);
if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
return nullptr;
}
// We only need to do this for clients here. For servers, this will be
// done in src/core/lib/surface/server.cc.
if (grpc_channel_stack_type_is_client(channel_stack_type)) {
CreateChannelzNode(&builder);
// Check whether channelz is enabled.
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
// Get parameters needed to create the channelz node.
const size_t channel_tracer_max_memory = std::max(
0,
args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
.value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT));
const bool is_internal_channel =
args.GetBool(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL).value_or(false);
// Create the channelz node.
std::string channelz_node_target{target == nullptr ? "unknown" : target};
RefCountedPtr<channelz::ChannelNode> channelz_node =
MakeRefCounted<channelz::ChannelNode>(channelz_node_target,
channel_tracer_max_memory,
is_internal_channel);
channelz_node->AddTraceEvent(
channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
// Add channelz node to channel args.
// We remove the is_internal_channel arg, since we no longer need it.
args = args.Remove(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL)
.Set(GRPC_ARG_CHANNELZ_CHANNEL_NODE,
ChannelArgs::Pointer(channelz_node.release(),
&channelz_node_arg_vtable));
}
}
ChannelStackBuilderImpl builder(
grpc_channel_stack_type_string(channel_stack_type), channel_stack_type,
args);
builder.SetTarget(target).SetTransport(optional_transport);
if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
return nullptr;
}
return CreateWithBuilder(&builder);
}

@ -26,7 +26,6 @@
#include <atomic>
#include <map>
#include <memory>
#include <string>
#include <utility>
@ -46,7 +45,6 @@
#include "src/core/lib/channel/channel_stack.h" // IWYU pragma: keep
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/cpp_impl_of.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -158,7 +156,7 @@ class Channel : public RefCounted<Channel>,
}
grpc_event_engine::experimental::EventEngine* event_engine() const {
return event_engine_.get();
return channel_stack_->EventEngine();
}
private:
@ -176,8 +174,6 @@ class Channel : public RefCounted<Channel>,
MemoryAllocator allocator_;
std::string target_;
const RefCountedPtr<grpc_channel_stack> channel_stack_;
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
};
} // namespace grpc_core

@ -25,6 +25,12 @@
#include "src/core/lib/transport/http_connect_handshaker.h"
#include "src/core/lib/transport/tcp_connect_handshaker.h"
namespace grpc_event_engine {
namespace experimental {
extern void RegisterEventEngineChannelArgPreconditioning(grpc_core::CoreConfiguration::Builder* builder);
} // namespace experimental
} // namespace grpc_event_engine
namespace grpc_core {
extern void BuildClientChannelConfiguration(
@ -63,6 +69,7 @@ extern void RegisterBinderResolver(CoreConfiguration::Builder* builder);
#endif
void BuildCoreConfiguration(CoreConfiguration::Builder* builder) {
grpc_event_engine::experimental::RegisterEventEngineChannelArgPreconditioning(builder);
// The order of the handshaker registration is crucial here.
// We want TCP connect handshaker to be registered last so that it is added to
// the start of the handshaker list.

@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -35,6 +36,9 @@
namespace grpc_core {
using ::grpc_event_engine::experimental::CreateEventEngine;
using ::grpc_event_engine::experimental::EventEngine;
TEST(ChannelArgsTest, Noop) { ChannelArgs(); }
TEST(ChannelArgsTest, SetGetRemove) {
@ -164,6 +168,33 @@ TEST(ChannelArgsTest, RetrieveRawPointerFromStoredSharedPtr) {
EXPECT_EQ(2, shared_obj.use_count());
}
TEST(ChannelArgsTest, StoreSharedPtrEventEngine) {
auto p = std::shared_ptr<EventEngine>(CreateEventEngine());
ChannelArgs a;
a = a.SetObject(p);
Notification signal;
bool triggered = false;
a.GetObjectRef<EventEngine>()->Run([&triggered, &signal] {
triggered = true;
signal.Notify();
});
signal.WaitForNotification();
ASSERT_TRUE(triggered);
}
TEST(ChannelArgsTest, GetNonOwningEventEngine) {
auto p = std::shared_ptr<EventEngine>(CreateEventEngine());
ASSERT_TRUE(p.unique());
ChannelArgs a;
a = a.SetObject(p);
ASSERT_FALSE(p.unique());
ASSERT_EQ(p.use_count(), 2);
EventEngine* engine = a.GetObject<EventEngine>();
(void)engine;
// p and the channel args
ASSERT_EQ(p.use_count(), 2);
}
} // namespace grpc_core
TEST(GrpcChannelArgsTest, Create) {

@ -119,7 +119,8 @@ bool AddOriginalFilter(ChannelStackBuilder* builder) {
}
TEST(ChannelStackBuilder, UnknownTarget) {
ChannelStackBuilderImpl builder("alpha-beta-gamma", GRPC_CLIENT_CHANNEL);
ChannelStackBuilderImpl builder("alpha-beta-gamma", GRPC_CLIENT_CHANNEL,
ChannelArgs());
EXPECT_EQ(builder.target(), "unknown");
}

@ -18,24 +18,31 @@
#include "src/core/lib/channel/channel_stack.h"
#include <limits.h>
#include <string>
#include "absl/status/status.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/util/test_config.h"
static grpc_error_handle channel_init_func(grpc_channel_element* elem,
grpc_channel_element_args* args) {
EXPECT_EQ(args->channel_args->num_args, 1);
EXPECT_EQ(args->channel_args->args[0].type, GRPC_ARG_INTEGER);
EXPECT_STREQ(args->channel_args->args[0].key, "test_key");
EXPECT_EQ(args->channel_args->args[0].value.integer, 42);
int test_value = grpc_channel_args_find_integer(args->channel_args,
"test_key", {-1, 0, INT_MAX});
EXPECT_EQ(test_value, 42);
auto* ee = grpc_channel_args_find_pointer<
grpc_event_engine::experimental::EventEngine>(
args->channel_args, GRPC_INTERNAL_ARG_EVENT_ENGINE);
EXPECT_NE(ee, nullptr);
EXPECT_TRUE(args->is_first);
EXPECT_TRUE(args->is_last);
*static_cast<int*>(elem->channel_data) = 0;
@ -104,11 +111,14 @@ TEST(ChannelStackTest, CreateChannelStack) {
channel_stack = static_cast<grpc_channel_stack*>(
gpr_malloc(grpc_channel_stack_size(&filters, 1)));
auto channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.Set("test_key", 42);
ASSERT_TRUE(GRPC_LOG_IF_ERROR(
"grpc_channel_stack_init",
grpc_channel_stack_init(1, free_channel, channel_stack, &filters, 1,
grpc_core::ChannelArgs().Set("test_key", 42),
"test", channel_stack)));
channel_args, "test", channel_stack)));
EXPECT_EQ(channel_stack->count, 1);
channel_elem = grpc_channel_stack_element(channel_stack, 0);
channel_data = static_cast<int*>(channel_elem->channel_data);

@ -43,7 +43,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -129,16 +128,17 @@ int main(int argc, char** argv) {
static int check_stack(const char* file, int line, const char* transport_name,
grpc_channel_args* init_args,
unsigned channel_stack_type, ...) {
grpc_core::ChannelArgs channel_args =
grpc_core::ChannelArgs::FromC(init_args);
// create phony channel stack
grpc_core::ChannelStackBuilderImpl builder(
"test", static_cast<grpc_channel_stack_type>(channel_stack_type));
"test", static_cast<grpc_channel_stack_type>(channel_stack_type),
channel_args);
grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
fake_transport_vtable.name = transport_name;
grpc_transport fake_transport = {&fake_transport_vtable};
grpc_core::ChannelArgs channel_args =
grpc_core::ChannelArgs::FromC(init_args);
builder.SetTarget("foo.test.google.fr").SetChannelArgs(channel_args);
builder.SetTarget("foo.test.google.fr");
if (transport_name != nullptr) {
builder.SetTransport(&fake_transport);
}

@ -38,6 +38,7 @@
#include "absl/types/variant.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -47,6 +48,7 @@
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -237,6 +239,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
absl::string_view GetAuthority() override { return "server.example.com"; }
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return grpc_event_engine::experimental::GetDefaultEventEngine().get();
}
void AddTraceEvent(TraceSeverity, absl::string_view) override {}
LoadBalancingPolicyTest* test_;

@ -43,11 +43,13 @@
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/env.h"
@ -247,7 +249,9 @@ RefCountedPtr<AuthorizationEngine> LoadAuthorizationEngine(
template <typename FuzzerChannelArgs>
ChannelArgs LoadChannelArgs(const FuzzerChannelArgs& fuzz_args,
GlobalObjects* globals) {
ChannelArgs args = ChannelArgs().SetObject(ResourceQuota::Default());
ChannelArgs args = CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
for (const auto& arg : fuzz_args) {
if (arg.key() == ResourceQuota::ChannelArgName()) {
if (arg.value_case() == filter_fuzzer::ChannelArg::kResourceQuota) {
@ -675,8 +679,8 @@ DEFINE_PROTO_FUZZER(const filter_fuzzer::Msg& msg) {
grpc_core::ChannelStackBuilderImpl builder(
msg.stack_name().c_str(),
static_cast<grpc_channel_stack_type>(msg.channel_stack_type()));
builder.SetChannelArgs(channel_args);
static_cast<grpc_channel_stack_type>(msg.channel_stack_type()),
channel_args);
builder.AppendFilter(filter);
const bool is_client =
grpc_channel_stack_type_is_client(builder.channel_stack_type());

@ -26,6 +26,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
@ -156,6 +157,10 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy {
return parent_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return parent_->channel_control_helper()->GetEventEngine();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
parent_->channel_control_helper()->AddTraceEvent(severity, message);
@ -274,6 +279,10 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
return parent_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return parent_->channel_control_helper()->GetEventEngine();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
parent_->channel_control_helper()->AddTraceEvent(severity, message);
@ -385,6 +394,10 @@ class AddressTestLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
return parent_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return parent_->channel_control_helper()->GetEventEngine();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
parent_->channel_control_helper()->AddTraceEvent(severity, message);
@ -500,6 +513,10 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
return parent_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return parent_->channel_control_helper()->GetEventEngine();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
parent_->channel_control_helper()->AddTraceEvent(severity, message);
@ -617,6 +634,10 @@ class OobBackendMetricTestLoadBalancingPolicy
return parent_->channel_control_helper()->GetAuthority();
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return parent_->channel_control_helper()->GetEventEngine();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
parent_->channel_control_helper()->AddTraceEvent(severity, message);

@ -90,8 +90,8 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) {
grpc_arg arg = channel_stack_modifier->MakeChannelArg();
// Create a phony ChannelStackBuilder object
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL);
builder.SetChannelArgs(ChannelArgs::FromC(args));
ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL,
ChannelArgs::FromC(args));
grpc_channel_args_destroy(args);
grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
@ -128,8 +128,8 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertionAfterCensus) {
grpc_arg arg = channel_stack_modifier->MakeChannelArg();
// Create a phony ChannelStackBuilder object
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL);
builder.SetChannelArgs(ChannelArgs::FromC(args));
ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL,
ChannelArgs::FromC(args));
grpc_channel_args_destroy(args);
grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));

@ -501,7 +501,9 @@ static void BM_IsolatedFilter(benchmark::State& state) {
FakeClientChannelFactory fake_client_channel_factory;
grpc_core::ChannelArgs channel_args =
grpc_core::ChannelArgs()
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.SetObject(&fake_client_channel_factory)
.Set(GRPC_ARG_SERVER_URI, "localhost");
if (fixture.flags & REQUIRES_TRANSPORT) {
@ -696,12 +698,12 @@ class IsolatedCallFixture {
// the grpc_shutdown() run by grpc_channel_destroy(). So we need to
// call grpc_init() manually here to balance things out.
grpc_init();
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
grpc_core::ChannelStackBuilderImpl builder("phony", GRPC_CLIENT_CHANNEL);
grpc_core::ChannelStackBuilderImpl builder(
"phony", GRPC_CLIENT_CHANNEL,
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr));
builder.SetTarget("phony_target");
builder.SetChannelArgs(args);
builder.AppendFilter(&isolated_call_filter::isolated_call_filter);
{
grpc_core::ExecCtx exec_ctx;

@ -30,6 +30,7 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -189,12 +190,18 @@ class EndpointPairFixture : public BaseFixture {
/* create channel */
{
ChannelArguments args;
args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
fixture_configuration.ApplyCommonChannelArguments(&args);
grpc_core::ChannelArgs c_args =
grpc_core::ChannelArgs::FromC(args.c_channel_args());
grpc_core::ChannelArgs c_args;
{
ChannelArguments args;
args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
fixture_configuration.ApplyCommonChannelArguments(&args);
// precondition
grpc_channel_args tmp_args;
args.SetChannelArgs(&tmp_args);
c_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(&tmp_args);
}
client_transport_ =
grpc_create_chttp2_transport(c_args, endpoints.client, true);
GPR_ASSERT(client_transport_);

Loading…
Cancel
Save