pull/35400/head
Craig Tiller 1 year ago
commit 64ba51ae82
  1. 6
      include/grpc/impl/channel_arg_names.h
  2. 15
      src/core/ext/filters/backend_metrics/backend_metric_filter.cc
  3. 4
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  4. 4
      src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc
  5. 10
      src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
  6. 5
      src/core/ext/filters/client_channel/lb_policy/endpoint_list.h
  7. 3
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  8. 18
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  9. 2
      src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
  10. 9
      src/core/ext/filters/http/client_authority_filter.cc
  11. 48
      src/core/ext/filters/http/http_filters_plugin.cc
  12. 4
      src/core/ext/filters/logging/logging_filter.cc
  13. 8
      src/core/ext/filters/message_size/message_size_filter.cc
  14. 4
      src/core/lib/channel/server_call_tracer_filter.cc
  15. 64
      src/core/lib/promise/party.cc
  16. 45
      src/core/lib/promise/party.h
  17. 2
      src/core/lib/surface/builtins.cc
  18. 26
      src/core/lib/surface/channel_init.h
  19. 11
      src/core/lib/surface/init.cc
  20. 6
      src/core/lib/surface/server.cc
  21. 2
      src/core/lib/surface/server.h
  22. 4
      test/core/end2end/tests/http2_stats.cc
  23. 48
      test/core/promise/party_test.cc

@ -106,6 +106,12 @@
*/
#define GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS \
"grpc.http2.min_ping_interval_without_data_ms"
/** Maximum time to allow a request to be:
(1) received by the server, but
(2) not requested by a RequestCall (in the completion queue based API)
before the request is cancelled */
#define GRPC_ARG_SERVER_MAX_UNREQUESTED_TIME_IN_SERVER_SECONDS \
"grpc.server_max_unrequested_time_in_server"
/** Channel arg to override the http2 :scheme header */
#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"
/** How many pings can the client send before needing to send a

@ -190,12 +190,15 @@ void BackendMetricFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
}
void RegisterBackendMetricFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL,
IsV3BackendMetricFilterEnabled()
? &BackendMetricFilter::kFilter
: &LegacyBackendMetricFilter::kFilter)
.IfHasChannelArg(GRPC_ARG_SERVER_CALL_METRIC_RECORDING);
if (IsV3BackendMetricFilterEnabled()) {
builder->channel_init()
->RegisterFilter<BackendMetricFilter>(GRPC_SERVER_CHANNEL)
.IfHasChannelArg(GRPC_ARG_SERVER_CALL_METRIC_RECORDING);
} else {
builder->channel_init()
->RegisterFilter<LegacyBackendMetricFilter>(GRPC_SERVER_CHANNEL)
.IfHasChannelArg(GRPC_ARG_SERVER_CALL_METRIC_RECORDING);
}
}
} // namespace grpc_core

@ -296,13 +296,13 @@ const grpc_channel_filter MaxAgeFilter::kFilter =
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
if (!IsV3ChannelIdleFiltersEnabled()) return;
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, &ClientIdleFilter::kFilter)
->RegisterFilter<ClientIdleFilter>(GRPC_CLIENT_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return GetClientIdleTimeout(channel_args) != Duration::Infinity();
});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &MaxAgeFilter::kFilter)
->RegisterFilter<MaxAgeFilter>(GRPC_SERVER_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return MaxAgeFilter::Config::FromChannelArgs(channel_args).enable();

@ -302,13 +302,13 @@ const grpc_channel_filter LegacyMaxAgeFilter::kFilter =
void RegisterLegacyChannelIdleFilters(CoreConfiguration::Builder* builder) {
if (IsV3ChannelIdleFiltersEnabled()) return;
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, &LegacyClientIdleFilter::kFilter)
->RegisterFilter<LegacyClientIdleFilter>(GRPC_CLIENT_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return GetClientIdleTimeout(channel_args) != Duration::Infinity();
});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &LegacyMaxAgeFilter::kFilter)
->RegisterFilter<LegacyMaxAgeFilter>(GRPC_SERVER_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return LegacyMaxAgeFilter::Config::FromChannelArgs(channel_args)

@ -69,6 +69,9 @@ class EndpointList::Endpoint::Helper
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
auto old_state = std::exchange(endpoint_->connectivity_state_, state);
if (!old_state.has_value()) {
++endpoint_->endpoint_list_->num_endpoints_seen_initial_state_;
}
endpoint_->picker_ = std::move(picker);
endpoint_->OnStateUpdate(old_state, state, status);
}
@ -181,11 +184,4 @@ void EndpointList::ResetBackoffLocked() {
}
}
bool EndpointList::AllEndpointsSeenInitialState() const {
for (const auto& endpoint : endpoints_) {
if (!endpoint->connectivity_state().has_value()) return false;
}
return true;
}
} // namespace grpc_core

@ -199,7 +199,9 @@ class EndpointList : public InternallyRefCounted<EndpointList> {
// Returns true if all endpoints have seen their initial connectivity
// state notification.
bool AllEndpointsSeenInitialState() const;
bool AllEndpointsSeenInitialState() const {
return num_endpoints_seen_initial_state_ == size();
}
private:
// Returns the parent policy's helper. Needed because the accessor
@ -210,6 +212,7 @@ class EndpointList : public InternallyRefCounted<EndpointList> {
RefCountedPtr<LoadBalancingPolicy> policy_;
const char* tracer_;
std::vector<OrphanablePtr<Endpoint>> endpoints_;
size_t num_endpoints_seen_initial_state_ = 0;
};
} // namespace grpc_core

@ -1928,8 +1928,7 @@ void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<GrpcLbFactory>());
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientLoadReportingFilter::kFilter)
->RegisterFilter<ClientLoadReportingFilter>(GRPC_CLIENT_SUBCHANNEL)
.IfChannelArg(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, false);
}

@ -224,7 +224,9 @@ class PickFirst : public LoadBalancingPolicy {
private:
// Returns true if all subchannels have seen their initial
// connectivity state notifications.
bool AllSubchannelsSeenInitialState();
bool AllSubchannelsSeenInitialState() const {
return num_subchannels_seen_initial_notification_ == size();
}
// Looks through subchannels_ starting from attempting_index_ to
// find the first one not currently in TRANSIENT_FAILURE, then
@ -255,6 +257,8 @@ class PickFirst : public LoadBalancingPolicy {
// TODO(roth): Remove this when we remove the Happy Eyeballs experiment.
bool in_transient_failure_ = false;
size_t num_subchannels_seen_initial_notification_ = 0;
// The index into subchannels_ to which we are currently attempting
// to connect during the initial Happy Eyeballs pass. Once the
// initial pass is over, this will be equal to size().
@ -754,6 +758,11 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
seen_transient_failure_ = true;
subchannel_list_->last_failure_ = connectivity_status_;
}
// If this is the initial connectivity state update for this subchannel,
// increment the counter in the subchannel list.
if (!old_state.has_value()) {
++subchannel_list_->num_subchannels_seen_initial_notification_;
}
// If we haven't yet seen the initial connectivity state notification
// for all subchannels, do nothing.
if (!subchannel_list_->AllSubchannelsSeenInitialState()) return;
@ -1122,13 +1131,6 @@ void PickFirst::SubchannelList::ResetBackoffLocked() {
}
}
bool PickFirst::SubchannelList::AllSubchannelsSeenInitialState() {
for (auto& sd : subchannels_) {
if (!sd.connectivity_state().has_value()) return false;
}
return true;
}
void PickFirst::SubchannelList::StartConnectingNextSubchannel() {
// Find the next subchannel not in state TRANSIENT_FAILURE.
// We skip subchannels in state TRANSIENT_FAILURE to avoid a

@ -128,7 +128,7 @@ void RegisterServiceConfigChannelArgFilter(
&kServiceConfigChannelArgFilter)
.ExcludeFromMinimalStack()
.IfHasChannelArg(GRPC_ARG_SERVICE_CONFIG)
.Before({&ClientMessageSizeFilter::kFilter});
.Before<ClientMessageSizeFilter>();
}
} // namespace grpc_core

@ -75,14 +75,13 @@ bool NeedsClientAuthorityFilter(const ChannelArgs& args) {
void RegisterClientAuthorityFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &ClientAuthorityFilter::kFilter)
->RegisterFilter<ClientAuthorityFilter>(GRPC_CLIENT_SUBCHANNEL)
.If(NeedsClientAuthorityFilter)
.Before({&ClientAuthFilter::kFilter});
.Before<ClientAuthFilter>();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientAuthorityFilter::kFilter)
->RegisterFilter<ClientAuthorityFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.If(NeedsClientAuthorityFilter)
.Before({&ClientAuthFilter::kFilter});
.Before<ClientAuthFilter>();
}
} // namespace grpc_core

@ -42,47 +42,49 @@ bool IsBuildingHttpLikeTransport(const ChannelArgs& args) {
void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
if (IsV3CompressionFilterEnabled()) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientCompressionFilter::kFilter)
->RegisterFilter<ClientCompressionFilter>(GRPC_CLIENT_SUBCHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
.After<HttpClientFilter>()
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientCompressionFilter::kFilter)
->RegisterFilter<ClientCompressionFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
.After<HttpClientFilter>()
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter)
->RegisterFilter<ServerCompressionFilter>(GRPC_SERVER_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
.After<HttpServerFilter>()
.After<ServerMessageSizeFilter>();
} else {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&LegacyClientCompressionFilter::kFilter)
->RegisterFilter<LegacyClientCompressionFilter>(GRPC_CLIENT_SUBCHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
.After<HttpClientFilter>()
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&LegacyClientCompressionFilter::kFilter)
->RegisterFilter<LegacyClientCompressionFilter>(
GRPC_CLIENT_DIRECT_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
.After<HttpClientFilter>()
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL,
&LegacyServerCompressionFilter::kFilter)
->RegisterFilter<LegacyServerCompressionFilter>(GRPC_SERVER_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
.After<HttpServerFilter>()
.After<ServerMessageSizeFilter>();
}
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter)
->RegisterFilter<HttpClientFilter>(GRPC_CLIENT_SUBCHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&ClientMessageSizeFilter::kFilter});
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &HttpClientFilter::kFilter)
->RegisterFilter<HttpClientFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&ClientMessageSizeFilter::kFilter});
.After<ClientMessageSizeFilter>();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &HttpServerFilter::kFilter)
->RegisterFilter<HttpServerFilter>(GRPC_SERVER_CHANNEL)
.If(IsBuildingHttpLikeTransport)
.After({&ServerMessageSizeFilter::kFilter});
.After<ServerMessageSizeFilter>();
}
} // namespace grpc_core

@ -536,11 +536,11 @@ void RegisterLoggingFilter(LoggingSink* sink) {
g_logging_sink = sink;
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerLoggingFilter::kFilter)
->RegisterFilter<ServerLoggingFilter>(GRPC_SERVER_CHANNEL)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL, &ClientLoggingFilter::kFilter)
->RegisterFilter<ClientLoggingFilter>(GRPC_CLIENT_CHANNEL)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true);
});

@ -240,12 +240,10 @@ bool HasMessageSizeLimits(const ChannelArgs& channel_args) {
void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
MessageSizeParser::Register(builder);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientMessageSizeFilter::kFilter)
->RegisterFilter<ClientMessageSizeFilter>(GRPC_CLIENT_SUBCHANNEL)
.ExcludeFromMinimalStack();
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientMessageSizeFilter::kFilter)
->RegisterFilter<ClientMessageSizeFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.ExcludeFromMinimalStack()
.If(HasMessageSizeLimits)
// TODO(ctiller): ordering constraint is here to match the ordering that
@ -253,7 +251,7 @@ void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
// filters from first principles.
.Before({&grpc_client_deadline_filter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerMessageSizeFilter::kFilter)
->RegisterFilter<ServerMessageSizeFilter>(GRPC_SERVER_CHANNEL)
.ExcludeFromMinimalStack()
.If(HasMessageSizeLimits)
// TODO(ctiller): ordering constraint is here to match the ordering that

@ -104,8 +104,8 @@ absl::StatusOr<ServerCallTracerFilter> ServerCallTracerFilter::Create(
} // namespace
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterFilter(GRPC_SERVER_CHANNEL,
&ServerCallTracerFilter::kFilter);
builder->channel_init()->RegisterFilter<ServerCallTracerFilter>(
GRPC_SERVER_CHANNEL);
}
} // namespace grpc_core

@ -227,40 +227,42 @@ void Party::RunLocked() {
bool Party::RunParty() {
ScopedActivity activity(this);
promise_detail::Context<Arena> arena_ctx(arena_);
return sync_.RunParty([this](int i) {
// If the participant is null, skip.
// This allows participants to complete whilst wakers still exist
// somewhere.
auto* participant = participants_[i].load(std::memory_order_acquire);
if (participant == nullptr) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s[party] wakeup %d already complete",
DebugTag().c_str(), i);
}
return false;
}
absl::string_view name;
return sync_.RunParty([this](int i) { return RunOneParticipant(i); });
}
bool Party::RunOneParticipant(int i) {
// If the participant is null, skip.
// This allows participants to complete whilst wakers still exist
// somewhere.
auto* participant = participants_[i].load(std::memory_order_acquire);
if (participant == nullptr) {
if (grpc_trace_promise_primitives.enabled()) {
name = participant->name();
gpr_log(GPR_DEBUG, "%s[%s] begin job %d", DebugTag().c_str(),
std::string(name).c_str(), i);
gpr_log(GPR_DEBUG, "%s[party] wakeup %d already complete",
DebugTag().c_str(), i);
}
// Poll the participant.
currently_polling_ = i;
bool done = participant->PollParticipantPromise();
currently_polling_ = kNotPolling;
if (done) {
if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll and finish job %d",
DebugTag().c_str(), std::string(name).c_str(), i);
}
participants_[i].store(nullptr, std::memory_order_relaxed);
} else if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll", DebugTag().c_str(),
std::string(name).c_str());
return false;
}
absl::string_view name;
if (grpc_trace_promise_primitives.enabled()) {
name = participant->name();
gpr_log(GPR_DEBUG, "%s[%s] begin job %d", DebugTag().c_str(),
std::string(name).c_str(), i);
}
// Poll the participant.
currently_polling_ = i;
bool done = participant->PollParticipantPromise();
currently_polling_ = kNotPolling;
if (done) {
if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll and finish job %d",
DebugTag().c_str(), std::string(name).c_str(), i);
}
return done;
});
participants_[i].store(nullptr, std::memory_order_relaxed);
} else if (!name.empty()) {
gpr_log(GPR_DEBUG, "%s[%s] end poll", DebugTag().c_str(),
std::string(name).c_str());
}
return done;
}
void Party::AddParticipants(Participant** participants, size_t count) {

@ -102,7 +102,8 @@ class PartySyncUsingAtomics {
template <typename F>
GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) {
uint64_t prev_state;
do {
iteration_.fetch_add(1, std::memory_order_relaxed);
for (;;) {
// Grab the current state, and clear the wakeup bits & add flag.
prev_state = state_.fetch_and(kRefMask | kLocked | kAllocatedMask,
std::memory_order_acquire);
@ -133,9 +134,23 @@ class PartySyncUsingAtomics {
// TODO(ctiller): consider mitigations for the accidental wakeup on owning
// waker creation case -- I currently expect this will be more expensive
// than this quick loop.
} while (!state_.compare_exchange_weak(
prev_state, (prev_state & (kRefMask | kAllocatedMask)),
std::memory_order_acq_rel, std::memory_order_acquire));
if (wake_after_poll_ == 0) {
if (state_.compare_exchange_weak(
prev_state, (prev_state & (kRefMask | kAllocatedMask)),
std::memory_order_acq_rel, std::memory_order_acquire)) {
return false;
}
} else {
if (state_.compare_exchange_weak(
prev_state,
(prev_state & (kRefMask | kAllocatedMask | kLocked)) |
wake_after_poll_,
std::memory_order_acq_rel, std::memory_order_acquire)) {
iteration_.fetch_add(1, std::memory_order_relaxed);
wake_after_poll_ = 0;
}
}
}
return false;
}
@ -186,6 +201,11 @@ class PartySyncUsingAtomics {
// Returns true if the caller should run the party.
GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask);
void WakeAfterPoll(WakeupMask mask) { wake_after_poll_ |= mask; }
uint32_t iteration() const {
return iteration_.load(std::memory_order_relaxed);
}
private:
bool UnreffedLast();
@ -225,6 +245,8 @@ class PartySyncUsingAtomics {
static constexpr uint64_t kOneRef = 1ull << kRefShift;
std::atomic<uint64_t> state_;
std::atomic<uint32_t> iteration_{0};
WakeupMask wake_after_poll_ = 0;
};
class PartySyncUsingMutex {
@ -358,6 +380,20 @@ class Party : public Activity, private Wakeable {
Arena* arena() const { return arena_; }
// Return a promise that resolves to Empty{} when the current party poll is
// complete.
// This is useful for implementing batching and the like: we can hold some
// action until the rest of the party resolves itself.
auto AfterCurrentPoll() {
GPR_DEBUG_ASSERT(Activity::current() == this);
sync_.WakeAfterPoll(CurrentParticipant());
return [this, iteration = sync_.iteration()]() -> Poll<Empty> {
GPR_DEBUG_ASSERT(Activity::current() == this);
if (iteration == sync_.iteration()) return Pending{};
return Empty{};
};
}
class BulkSpawner {
public:
explicit BulkSpawner(Party* party) : party_(party) {}
@ -548,6 +584,7 @@ class Party : public Activity, private Wakeable {
// Add a participant (backs Spawn, after type erasure to ParticipantFactory).
void AddParticipants(Participant** participant, size_t count);
bool RunOneParticipant(int i);
virtual grpc_event_engine::experimental::EventEngine* event_engine()
const = 0;

@ -27,7 +27,7 @@ namespace grpc_core {
void RegisterBuiltins(CoreConfiguration::Builder* builder) {
RegisterServerCallTracerFilter(builder);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_LAME_CHANNEL, &LameClientFilter::kFilter)
->RegisterFilter<LameClientFilter>(GRPC_CLIENT_LAME_CHANNEL)
.Terminal();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &Server::kServerTopFilter)

@ -87,11 +87,30 @@ class ChannelInit {
// Ensure that this filter is placed *after* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
template <typename Filter>
FilterRegistration& After() {
return After({&Filter::kFilter});
}
// Ensure that this filter is placed *before* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
template <typename Filter>
FilterRegistration& Before() {
return Before({&Filter::kFilter});
}
// Ensure that this filter is placed *after* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& After(
std::initializer_list<const grpc_channel_filter*> filters);
// Ensure that this filter is placed *before* the filters listed here.
// By Build() time all filters listed here must also be registered against
// the same channel stack type as this registration.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& Before(
std::initializer_list<const grpc_channel_filter*> filters);
// Add a predicate for this filters inclusion.
@ -145,9 +164,16 @@ class ChannelInit {
// This occurs first during channel build time.
// The FilterRegistration methods can be called to declaratively define
// properties of the filter being registered.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& RegisterFilter(grpc_channel_stack_type type,
const grpc_channel_filter* filter,
SourceLocation registration_source = {});
template <typename Filter>
FilterRegistration& RegisterFilter(
grpc_channel_stack_type type, SourceLocation registration_source = {}) {
return RegisterFilter(type, &Filter::kFilter, registration_source);
}
// Register a post processor for the builder.
// These run after the main graph has been placed into the builder.

@ -67,25 +67,26 @@ static bool g_shutting_down ABSL_GUARDED_BY(g_init_mu) = false;
namespace grpc_core {
void RegisterSecurityFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &ClientAuthFilter::kFilter)
->RegisterFilter<ClientAuthFilter>(GRPC_CLIENT_SUBCHANNEL)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &ClientAuthFilter::kFilter)
->RegisterFilter<ClientAuthFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
if (IsV3ServerAuthFilterEnabled()) {
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerAuthFilter::kFilter)
->RegisterFilter<ServerAuthFilter>(GRPC_SERVER_CHANNEL)
.IfHasChannelArg(GRPC_SERVER_CREDENTIALS_ARG);
} else {
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &LegacyServerAuthFilter::kFilter)
->RegisterFilter<LegacyServerAuthFilter>(GRPC_SERVER_CHANNEL)
.IfHasChannelArg(GRPC_SERVER_CREDENTIALS_ARG);
}
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL,
&GrpcServerAuthzFilter::kFilterVtable)
.IfHasChannelArg(GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER)
.After({&ServerAuthFilter::kFilter, &LegacyServerAuthFilter::kFilter});
.After<ServerAuthFilter>()
.After<LegacyServerAuthFilter>();
}
} // namespace grpc_core

@ -830,7 +830,11 @@ RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
Server::Server(const ChannelArgs& args)
: channel_args_(args),
channelz_node_(CreateChannelzNode(args)),
server_call_tracer_factory_(ServerCallTracerFactory::Get(args)) {}
server_call_tracer_factory_(ServerCallTracerFactory::Get(args)),
max_time_in_pending_queue_(Duration::Seconds(
channel_args_
.GetInt(GRPC_ARG_SERVER_MAX_UNREQUESTED_TIME_IN_SERVER_SECONDS)
.value_or(30))) {}
Server::~Server() {
// Remove the cq pollsets from the config_fetcher.

@ -493,7 +493,7 @@ class Server : public InternallyRefCounted<Server>,
0,
channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT)
.value_or(3000)))};
Duration max_time_in_pending_queue_{Duration::Seconds(30)};
const Duration max_time_in_pending_queue_;
absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_);
std::list<ChannelData*> channels_;

@ -214,8 +214,8 @@ CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
g_client_call_ended_notify = new Notification();
g_server_call_ended_notify = new Notification();
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterFilter(GRPC_CLIENT_CHANNEL,
&FakeClientFilter::kFilter);
builder->channel_init()->RegisterFilter<FakeClientFilter>(
GRPC_CLIENT_CHANNEL);
});
ServerCallTracerFactory::RegisterGlobal(new FakeServerCallTracerFactory);

@ -490,6 +490,54 @@ TEST_F(PartyTest, CanBulkSpawn) {
n2.WaitForNotification();
}
TEST_F(PartyTest, AfterCurrentPollWorks) {
auto party = MakeRefCounted<TestParty>();
Notification n;
int state = 0;
{
Party::BulkSpawner spawner(party.get());
// BulkSpawner will schedule and poll this promise first, but the
// `AfterCurrentPoll` will pause it.
// Then spawn1, spawn2, and spawn3 will run in order (with EXPECT_EQ checks
// demonstrating this), at which point the poll will complete, causing
// spawn_final to be awoken and scheduled and see the final state.
spawner.Spawn(
"spawn_final",
[&state, &party]() {
return Seq(party->AfterCurrentPoll(), [&state]() {
EXPECT_EQ(state, 3);
return Empty{};
});
},
[&n](Empty) { n.Notify(); });
spawner.Spawn(
"spawn1",
[&state]() {
EXPECT_EQ(state, 0);
state = 1;
return Empty{};
},
[](Empty) {});
spawner.Spawn(
"spawn2",
[&state]() {
EXPECT_EQ(state, 1);
state = 2;
return Empty{};
},
[](Empty) {});
spawner.Spawn(
"spawn3",
[&state]() {
EXPECT_EQ(state, 2);
state = 3;
return Empty{};
},
[](Empty) {});
}
n.WaitForNotification();
}
TEST_F(PartyTest, ThreadStressTest) {
auto party = MakeRefCounted<TestParty>();
std::vector<std::thread> threads;

Loading…
Cancel
Save