[call-v3] Begin migration of channel_init to templates.

Eventually for call-v3 we're going to want to have registration of filters generate the appropriate glue into the channel runtime to execute a call.

Begin that process now and gradually by introducing the new syntax and allowing a piecemeal migration to it - by the time we're done converting filters to the v3 APIs we'll also have the registration piece done.

PiperOrigin-RevId: 596013927
pull/35331/head
Craig Tiller 1 year ago committed by Copybara-Service
parent 5923e6cd1e
commit fede9051e1
  1. 15
      src/core/ext/filters/backend_metrics/backend_metric_filter.cc
  2. 4
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  3. 4
      src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc
  4. 3
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 2
      src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
  6. 9
      src/core/ext/filters/http/client_authority_filter.cc
  7. 48
      src/core/ext/filters/http/http_filters_plugin.cc
  8. 4
      src/core/ext/filters/logging/logging_filter.cc
  9. 8
      src/core/ext/filters/message_size/message_size_filter.cc
  10. 4
      src/core/lib/channel/server_call_tracer_filter.cc
  11. 2
      src/core/lib/surface/builtins.cc
  12. 26
      src/core/lib/surface/channel_init.h
  13. 11
      src/core/lib/surface/init.cc
  14. 4
      test/core/end2end/tests/http2_stats.cc

@ -190,12 +190,15 @@ void BackendMetricFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
}
void RegisterBackendMetricFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL,
IsV3BackendMetricFilterEnabled()
? &BackendMetricFilter::kFilter
: &LegacyBackendMetricFilter::kFilter)
.IfHasChannelArg(GRPC_ARG_SERVER_CALL_METRIC_RECORDING);
if (IsV3BackendMetricFilterEnabled()) {
builder->channel_init()
->RegisterFilter<BackendMetricFilter>(GRPC_SERVER_CHANNEL)
.IfHasChannelArg(GRPC_ARG_SERVER_CALL_METRIC_RECORDING);
} else {
builder->channel_init()
->RegisterFilter<LegacyBackendMetricFilter>(GRPC_SERVER_CHANNEL)
.IfHasChannelArg(GRPC_ARG_SERVER_CALL_METRIC_RECORDING);
}
}
} // namespace grpc_core

@ -296,13 +296,13 @@ const grpc_channel_filter MaxAgeFilter::kFilter =
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
if (!IsV3ChannelIdleFiltersEnabled()) return;
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, &ClientIdleFilter::kFilter)
->RegisterFilter<ClientIdleFilter>(GRPC_CLIENT_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return GetClientIdleTimeout(channel_args) != Duration::Infinity();
});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &MaxAgeFilter::kFilter)
->RegisterFilter<MaxAgeFilter>(GRPC_SERVER_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return MaxAgeFilter::Config::FromChannelArgs(channel_args).enable();

@ -302,13 +302,13 @@ const grpc_channel_filter LegacyMaxAgeFilter::kFilter =
void RegisterLegacyChannelIdleFilters(CoreConfiguration::Builder* builder) {
if (IsV3ChannelIdleFiltersEnabled()) return;
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, &LegacyClientIdleFilter::kFilter)
->RegisterFilter<LegacyClientIdleFilter>(GRPC_CLIENT_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return GetClientIdleTimeout(channel_args) != Duration::Infinity();
});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &LegacyMaxAgeFilter::kFilter)
->RegisterFilter<LegacyMaxAgeFilter>(GRPC_SERVER_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return LegacyMaxAgeFilter::Config::FromChannelArgs(channel_args)

@ -1928,8 +1928,7 @@ void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<GrpcLbFactory>());
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientLoadReportingFilter::kFilter)
->RegisterFilter<ClientLoadReportingFilter>(GRPC_CLIENT_SUBCHANNEL)
.IfChannelArg(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, false);
}

@ -128,7 +128,7 @@ void RegisterServiceConfigChannelArgFilter(
&kServiceConfigChannelArgFilter)
.ExcludeFromMinimalStack()
.IfHasChannelArg(GRPC_ARG_SERVICE_CONFIG)
.Before({&ClientMessageSizeFilter::kFilter});
.Before<ClientMessageSizeFilter>();
}
} // namespace grpc_core

@ -75,14 +75,13 @@ bool NeedsClientAuthorityFilter(const ChannelArgs& args) {
void RegisterClientAuthorityFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &ClientAuthorityFilter::kFilter)
->RegisterFilter<ClientAuthorityFilter>(GRPC_CLIENT_SUBCHANNEL)
.If(NeedsClientAuthorityFilter)
.Before({&ClientAuthFilter::kFilter});
.Before<ClientAuthFilter>();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientAuthorityFilter::kFilter)
->RegisterFilter<ClientAuthorityFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.If(NeedsClientAuthorityFilter)
.Before({&ClientAuthFilter::kFilter});
.Before<ClientAuthFilter>();
}
} // namespace grpc_core

@ -42,47 +42,49 @@ bool IsBuildingHttpLikeTransport(const ChannelArgs& args) {
void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
if (IsV3CompressionFilterEnabled()) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientCompressionFilter::kFilter)
->RegisterFilter<ClientCompressionFilter>(GRPC_CLIENT_SUBCHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
.After<HttpClientFilter>()
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientCompressionFilter::kFilter)
->RegisterFilter<ClientCompressionFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
.After<HttpClientFilter>()
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter)
->RegisterFilter<ServerCompressionFilter>(GRPC_SERVER_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
.After<HttpServerFilter>()
.After<ServerMessageSizeFilter>();
} else {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&LegacyClientCompressionFilter::kFilter)
->RegisterFilter<LegacyClientCompressionFilter>(GRPC_CLIENT_SUBCHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
.After<HttpClientFilter>()
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&LegacyClientCompressionFilter::kFilter)
->RegisterFilter<LegacyClientCompressionFilter>(
GRPC_CLIENT_DIRECT_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
.After<HttpClientFilter>()
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL,
&LegacyServerCompressionFilter::kFilter)
->RegisterFilter<LegacyServerCompressionFilter>(GRPC_SERVER_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
.After<HttpServerFilter>()
.After<ServerMessageSizeFilter>();
}
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter)
->RegisterFilter<HttpClientFilter>(GRPC_CLIENT_SUBCHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&ClientMessageSizeFilter::kFilter});
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &HttpClientFilter::kFilter)
->RegisterFilter<HttpClientFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&ClientMessageSizeFilter::kFilter});
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &HttpServerFilter::kFilter)
->RegisterFilter<HttpServerFilter>(GRPC_SERVER_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&ServerMessageSizeFilter::kFilter});
.After<ServerMessageSizeFilter>();
}
} // namespace grpc_core

@ -536,11 +536,11 @@ void RegisterLoggingFilter(LoggingSink* sink) {
g_logging_sink = sink;
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerLoggingFilter::kFilter)
->RegisterFilter<ServerLoggingFilter>(GRPC_SERVER_CHANNEL)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, &ClientLoggingFilter::kFilter)
->RegisterFilter<ClientLoggingFilter>(GRPC_CLIENT_CHANNEL)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true);
});

@ -240,12 +240,10 @@ bool HasMessageSizeLimits(const ChannelArgs& channel_args) {
void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
MessageSizeParser::Register(builder);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientMessageSizeFilter::kFilter)
->RegisterFilter<ClientMessageSizeFilter>(GRPC_CLIENT_SUBCHANNEL)
.ExcludeFromMinimalStack();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientMessageSizeFilter::kFilter)
->RegisterFilter<ClientMessageSizeFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.ExcludeFromMinimalStack()
.If(HasMessageSizeLimits)
// TODO(ctiller): ordering constraint is here to match the ordering that
@ -253,7 +251,7 @@ void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
// filters from first principles.
.Before({&grpc_client_deadline_filter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerMessageSizeFilter::kFilter)
->RegisterFilter<ServerMessageSizeFilter>(GRPC_SERVER_CHANNEL)
.ExcludeFromMinimalStack()
.If(HasMessageSizeLimits)
// TODO(ctiller): ordering constraint is here to match the ordering that

@ -104,8 +104,8 @@ absl::StatusOr<ServerCallTracerFilter> ServerCallTracerFilter::Create(
} // namespace
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterFilter(GRPC_SERVER_CHANNEL,
&ServerCallTracerFilter::kFilter);
builder->channel_init()->RegisterFilter<ServerCallTracerFilter>(
GRPC_SERVER_CHANNEL);
}
} // namespace grpc_core

@ -27,7 +27,7 @@ namespace grpc_core {
void RegisterBuiltins(CoreConfiguration::Builder* builder) {
RegisterServerCallTracerFilter(builder);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_LAME_CHANNEL, &LameClientFilter::kFilter)
->RegisterFilter<LameClientFilter>(GRPC_CLIENT_LAME_CHANNEL)
.Terminal();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &Server::kServerTopFilter)

@ -87,11 +87,30 @@ class ChannelInit {
// Ensure that this filter is placed *after* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
template <typename Filter>
FilterRegistration& After() {
return After({&Filter::kFilter});
}
// Ensure that this filter is placed *before* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
template <typename Filter>
FilterRegistration& Before() {
return Before({&Filter::kFilter});
}
// Ensure that this filter is placed *after* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& After(
std::initializer_list<const grpc_channel_filter*> filters);
// Ensure that this filter is placed *before* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& Before(
std::initializer_list<const grpc_channel_filter*> filters);
// Add a predicate for this filters inclusion.
@ -145,9 +164,16 @@ class ChannelInit {
// This occurs first during channel build time.
// The FilterRegistration methods can be called to declaratively define
// properties of the filter being registered.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& RegisterFilter(grpc_channel_stack_type type,
const grpc_channel_filter* filter,
SourceLocation registration_source = {});
template <typename Filter>
FilterRegistration& RegisterFilter(
grpc_channel_stack_type type, SourceLocation registration_source = {}) {
return RegisterFilter(type, &Filter::kFilter, registration_source);
}
// Register a post processor for the builder.
// These run after the main graph has been placed into the builder.

@ -67,25 +67,26 @@ static bool g_shutting_down ABSL_GUARDED_BY(g_init_mu) = false;
namespace grpc_core {
void RegisterSecurityFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &ClientAuthFilter::kFilter)
->RegisterFilter<ClientAuthFilter>(GRPC_CLIENT_SUBCHANNEL)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &ClientAuthFilter::kFilter)
->RegisterFilter<ClientAuthFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
if (IsV3ServerAuthFilterEnabled()) {
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerAuthFilter::kFilter)
->RegisterFilter<ServerAuthFilter>(GRPC_SERVER_CHANNEL)
.IfHasChannelArg(GRPC_SERVER_CREDENTIALS_ARG);
} else {
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &LegacyServerAuthFilter::kFilter)
->RegisterFilter<LegacyServerAuthFilter>(GRPC_SERVER_CHANNEL)
.IfHasChannelArg(GRPC_SERVER_CREDENTIALS_ARG);
}
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL,
&GrpcServerAuthzFilter::kFilterVtable)
.IfHasChannelArg(GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER)
.After({&ServerAuthFilter::kFilter, &LegacyServerAuthFilter::kFilter});
.After<ServerAuthFilter>()
.After<LegacyServerAuthFilter>();
}
} // namespace grpc_core

@ -214,8 +214,8 @@ CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
g_client_call_ended_notify = new Notification();
g_server_call_ended_notify = new Notification();
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterFilter(GRPC_CLIENT_CHANNEL,
&FakeClientFilter::kFilter);
builder->channel_init()->RegisterFilter<FakeClientFilter>(
GRPC_CLIENT_CHANNEL);
});
ServerCallTracerFactory::RegisterGlobal(new FakeServerCallTracerFactory);

Loading…
Cancel
Save