Merge branch 'client_channel_new' of github.com:markdroth/grpc into transport-refs-7

pull/36732/head
Craig Tiller 11 months ago
commit 1a66558de1
  1. 18
      BUILD
  2. 2
      CMakeLists.txt
  3. 1
      Makefile
  4. 3
      Package.swift
  5. 3
      bazel/experiments.bzl
  6. 6
      build_autogenerated.yaml
  7. 1
      config.m4
  8. 1
      config.w32
  9. 4
      gRPC-C++.podspec
  10. 5
      gRPC-Core.podspec
  11. 3
      grpc.gemspec
  12. 2669
      grpc.gyp
  13. 3
      package.xml
  14. 2
      src/core/BUILD
  15. 1977
      src/core/client_channel/client_channel.cc
  16. 249
      src/core/client_channel/client_channel.h
  17. 27
      src/core/client_channel/client_channel_filter.cc
  18. 13
      src/core/client_channel/client_channel_filter.h
  19. 255
      src/core/client_channel/subchannel.cc
  20. 38
      src/core/client_channel/subchannel.h
  21. 316
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  22. 156
      src/core/ext/filters/channel_idle/channel_idle_filter.h
  23. 6
      src/core/lib/channel/channel_stack.h
  24. 93
      src/core/lib/experiments/experiments.cc
  25. 72
      src/core/lib/experiments/experiments.h
  26. 1
      src/core/lib/experiments/experiments.yaml
  27. 17
      src/core/lib/surface/channel.cc
  28. 10
      src/core/lib/surface/channel.h
  29. 10
      src/core/lib/surface/channel_create.cc
  30. 4
      src/core/lib/surface/legacy_channel.cc
  31. 10
      src/core/lib/surface/legacy_channel.h
  32. 110
      src/core/lib/transport/call_spine.h
  33. 11
      src/core/lib/transport/metadata_batch.h
  34. 1
      src/python/grpcio/grpc_core_dependencies.py
  35. 18
      test/core/client_channel/client_channel_test.cc
  36. 3
      tools/doxygen/Doxyfile.c++.internal
  37. 3
      tools/doxygen/Doxyfile.core.internal

18
BUILD

@ -1789,6 +1789,7 @@ grpc_cc_library(
"//src/core:connectivity_state",
"//src/core:iomgr_fwd",
"//src/core:ref_counted",
"//src/core:resource_quota",
"//src/core:slice",
"//src/core:stats_data",
"//src/core:time",
@ -1866,6 +1867,7 @@ grpc_cc_library(
"config",
"gpr",
"grpc_base",
"grpc_client_channel",
"grpc_public_hdrs",
"legacy_channel",
"ref_counted_ptr",
@ -1873,6 +1875,7 @@ grpc_cc_library(
"//src/core:arena",
"//src/core:channel_args",
"//src/core:channel_stack_type",
"//src/core:experiments",
"//src/core:iomgr_fwd",
"//src/core:ref_counted",
"//src/core:slice",
@ -3637,6 +3640,7 @@ grpc_cc_library(
grpc_cc_library(
name = "grpc_client_channel",
srcs = [
"//src/core:client_channel/client_channel.cc",
"//src/core:client_channel/client_channel_factory.cc",
"//src/core:client_channel/client_channel_filter.cc",
"//src/core:client_channel/client_channel_plugin.cc",
@ -3649,6 +3653,7 @@ grpc_cc_library(
"//src/core:client_channel/subchannel_stream_client.cc",
],
hdrs = [
"//src/core:client_channel/client_channel.h",
"//src/core:client_channel/client_channel_factory.h",
"//src/core:client_channel/client_channel_filter.h",
"//src/core:client_channel/dynamic_filters.h",
@ -3684,6 +3689,7 @@ grpc_cc_library(
"backoff",
"call_combiner",
"call_tracer",
"channel",
"channel_arg_names",
"channelz",
"config",
@ -3718,6 +3724,8 @@ grpc_cc_library(
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:backend_metric_parser",
"//src/core:call_filters",
"//src/core:call_spine",
"//src/core:cancel_callback",
"//src/core:channel_args",
"//src/core:channel_fwd",
@ -3736,12 +3744,15 @@ grpc_cc_library(
"//src/core:env",
"//src/core:error",
"//src/core:error_utils",
"//src/core:exec_ctx_wakeup_scheduler",
"//src/core:experiments",
"//src/core:gpr_atm",
"//src/core:gpr_manual_constructor",
"//src/core:grpc_backend_metric_data",
"//src/core:grpc_channel_idle_filter",
"//src/core:grpc_message_size_filter",
"//src/core:grpc_service_config",
"//src/core:idle_filter_state",
"//src/core:init_internally",
"//src/core:iomgr_fwd",
"//src/core:json",
@ -3751,9 +3762,13 @@ grpc_cc_library(
"//src/core:latch",
"//src/core:lb_policy",
"//src/core:lb_policy_registry",
"//src/core:loop",
"//src/core:map",
"//src/core:memory_quota",
"//src/core:metadata",
"//src/core:metadata_batch",
"//src/core:metrics",
"//src/core:observable",
"//src/core:pipe",
"//src/core:poll",
"//src/core:pollset_set",
@ -3766,10 +3781,13 @@ grpc_cc_library(
"//src/core:retry_throttle",
"//src/core:seq",
"//src/core:service_config_parser",
"//src/core:single_set_ptr",
"//src/core:sleep",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:slice_refcount",
"//src/core:stats_data",
"//src/core:status_flag",
"//src/core:status_helper",
"//src/core:subchannel_connector",
"//src/core:subchannel_interface",

2
CMakeLists.txt generated

@ -1829,6 +1829,7 @@ add_library(grpc
src/core/channelz/channelz.cc
src/core/channelz/channelz_registry.cc
src/core/client_channel/backup_poller.cc
src/core/client_channel/client_channel.cc
src/core/client_channel/client_channel_factory.cc
src/core/client_channel/client_channel_filter.cc
src/core/client_channel/client_channel_plugin.cc
@ -2925,6 +2926,7 @@ add_library(grpc_unsecure
src/core/channelz/channelz.cc
src/core/channelz/channelz_registry.cc
src/core/client_channel/backup_poller.cc
src/core/client_channel/client_channel.cc
src/core/client_channel/client_channel_factory.cc
src/core/client_channel/client_channel_filter.cc
src/core/client_channel/client_channel_plugin.cc

1
Makefile generated

@ -670,6 +670,7 @@ LIBGRPC_SRC = \
src/core/channelz/channelz.cc \
src/core/channelz/channelz_registry.cc \
src/core/client_channel/backup_poller.cc \
src/core/client_channel/client_channel.cc \
src/core/client_channel/client_channel_factory.cc \
src/core/client_channel/client_channel_filter.cc \
src/core/client_channel/client_channel_plugin.cc \

3
Package.swift generated

@ -125,6 +125,8 @@ let package = Package(
"src/core/channelz/channelz_registry.h",
"src/core/client_channel/backup_poller.cc",
"src/core/client_channel/backup_poller.h",
"src/core/client_channel/client_channel.cc",
"src/core/client_channel/client_channel.h",
"src/core/client_channel/client_channel_factory.cc",
"src/core/client_channel/client_channel_factory.h",
"src/core/client_channel/client_channel_filter.cc",
@ -1541,6 +1543,7 @@ let package = Package(
"src/core/lib/promise/latch.h",
"src/core/lib/promise/loop.h",
"src/core/lib/promise/map.h",
"src/core/lib/promise/observable.h",
"src/core/lib/promise/party.cc",
"src/core/lib/promise/party.h",
"src/core/lib/promise/pipe.h",

@ -18,7 +18,6 @@
EXPERIMENT_ENABLES = {
"call_status_override_on_cancellation": "call_status_override_on_cancellation",
"call_v3": "call_v3",
"canary_client_privacy": "canary_client_privacy",
"client_privacy": "client_privacy",
"event_engine_client": "event_engine_client",
@ -44,6 +43,8 @@ EXPERIMENT_ENABLES = {
"unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size",
"work_serializer_clears_time_cache": "work_serializer_clears_time_cache",
"work_serializer_dispatch": "event_engine_client,work_serializer_dispatch",
"call_v3": "call_v3,event_engine_client,event_engine_listener,work_serializer_dispatch",
"wrr_delegate_to_pick_first": "wrr_delegate_to_pick_first",
}
EXPERIMENT_POLLERS = [

@ -226,6 +226,7 @@ libs:
- src/core/channelz/channelz.h
- src/core/channelz/channelz_registry.h
- src/core/client_channel/backup_poller.h
- src/core/client_channel/client_channel.h
- src/core/client_channel/client_channel_factory.h
- src/core/client_channel/client_channel_filter.h
- src/core/client_channel/client_channel_internal.h
@ -1008,6 +1009,7 @@ libs:
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/observable.h
- src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
@ -1250,6 +1252,7 @@ libs:
- src/core/channelz/channelz.cc
- src/core/channelz/channelz_registry.cc
- src/core/client_channel/backup_poller.cc
- src/core/client_channel/client_channel.cc
- src/core/client_channel/client_channel_factory.cc
- src/core/client_channel/client_channel_filter.cc
- src/core/client_channel/client_channel_plugin.cc
@ -2213,6 +2216,7 @@ libs:
- src/core/channelz/channelz.h
- src/core/channelz/channelz_registry.h
- src/core/client_channel/backup_poller.h
- src/core/client_channel/client_channel.h
- src/core/client_channel/client_channel_factory.h
- src/core/client_channel/client_channel_filter.h
- src/core/client_channel/client_channel_internal.h
@ -2540,6 +2544,7 @@ libs:
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
- src/core/lib/promise/observable.h
- src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
@ -2706,6 +2711,7 @@ libs:
- src/core/channelz/channelz.cc
- src/core/channelz/channelz_registry.cc
- src/core/client_channel/backup_poller.cc
- src/core/client_channel/client_channel.cc
- src/core/client_channel/client_channel_factory.cc
- src/core/client_channel/client_channel_filter.cc
- src/core/client_channel/client_channel_plugin.cc

1
config.m4 generated

@ -45,6 +45,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/channelz/channelz.cc \
src/core/channelz/channelz_registry.cc \
src/core/client_channel/backup_poller.cc \
src/core/client_channel/client_channel.cc \
src/core/client_channel/client_channel_factory.cc \
src/core/client_channel/client_channel_filter.cc \
src/core/client_channel/client_channel_plugin.cc \

1
config.w32 generated

@ -10,6 +10,7 @@ if (PHP_GRPC != "no") {
"src\\core\\channelz\\channelz.cc " +
"src\\core\\channelz\\channelz_registry.cc " +
"src\\core\\client_channel\\backup_poller.cc " +
"src\\core\\client_channel\\client_channel.cc " +
"src\\core\\client_channel\\client_channel_factory.cc " +
"src\\core\\client_channel\\client_channel_filter.cc " +
"src\\core\\client_channel\\client_channel_plugin.cc " +

4
gRPC-C++.podspec generated

@ -267,6 +267,7 @@ Pod::Spec.new do |s|
'src/core/channelz/channelz.h',
'src/core/channelz/channelz_registry.h',
'src/core/client_channel/backup_poller.h',
'src/core/client_channel/client_channel.h',
'src/core/client_channel/client_channel_factory.h',
'src/core/client_channel/client_channel_filter.h',
'src/core/client_channel/client_channel_internal.h',
@ -1111,6 +1112,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/observable.h',
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
@ -1556,6 +1558,7 @@ Pod::Spec.new do |s|
'src/core/channelz/channelz.h',
'src/core/channelz/channelz_registry.h',
'src/core/client_channel/backup_poller.h',
'src/core/client_channel/client_channel.h',
'src/core/client_channel/client_channel_factory.h',
'src/core/client_channel/client_channel_filter.h',
'src/core/client_channel/client_channel_internal.h',
@ -2382,6 +2385,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/observable.h',
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',

5
gRPC-Core.podspec generated

@ -244,6 +244,8 @@ Pod::Spec.new do |s|
'src/core/channelz/channelz_registry.h',
'src/core/client_channel/backup_poller.cc',
'src/core/client_channel/backup_poller.h',
'src/core/client_channel/client_channel.cc',
'src/core/client_channel/client_channel.h',
'src/core/client_channel/client_channel_factory.cc',
'src/core/client_channel/client_channel_factory.h',
'src/core/client_channel/client_channel_filter.cc',
@ -1660,6 +1662,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/observable.h',
'src/core/lib/promise/party.cc',
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
@ -2356,6 +2359,7 @@ Pod::Spec.new do |s|
'src/core/channelz/channelz.h',
'src/core/channelz/channelz_registry.h',
'src/core/client_channel/backup_poller.h',
'src/core/client_channel/client_channel.h',
'src/core/client_channel/client_channel_factory.h',
'src/core/client_channel/client_channel_filter.h',
'src/core/client_channel/client_channel_internal.h',
@ -3162,6 +3166,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
'src/core/lib/promise/observable.h',
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',

3
grpc.gemspec generated

@ -131,6 +131,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/channelz/channelz_registry.h )
s.files += %w( src/core/client_channel/backup_poller.cc )
s.files += %w( src/core/client_channel/backup_poller.h )
s.files += %w( src/core/client_channel/client_channel.cc )
s.files += %w( src/core/client_channel/client_channel.h )
s.files += %w( src/core/client_channel/client_channel_factory.cc )
s.files += %w( src/core/client_channel/client_channel_factory.h )
s.files += %w( src/core/client_channel/client_channel_filter.cc )
@ -1547,6 +1549,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/latch.h )
s.files += %w( src/core/lib/promise/loop.h )
s.files += %w( src/core/lib/promise/map.h )
s.files += %w( src/core/lib/promise/observable.h )
s.files += %w( src/core/lib/promise/party.cc )
s.files += %w( src/core/lib/promise/party.h )
s.files += %w( src/core/lib/promise/pipe.h )

2669
grpc.gyp

File diff suppressed because it is too large Load Diff

3
package.xml generated

@ -113,6 +113,8 @@
<file baseinstalldir="/" name="src/core/channelz/channelz_registry.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/backup_poller.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/backup_poller.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/client_channel.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/client_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/client_channel_factory.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/client_channel_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/client_channel_filter.cc" role="src" />
@ -1529,6 +1531,7 @@
<file baseinstalldir="/" name="src/core/lib/promise/latch.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/loop.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/map.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/observable.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/party.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/party.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/pipe.h" role="src" />

@ -7483,6 +7483,8 @@ grpc_cc_library(
"1999",
"call_arena_allocator",
"call_filters",
"call_final_info",
"dual_ref_counted",
"for_each",
"if",
"latch",

File diff suppressed because it is too large Load Diff

@ -0,0 +1,249 @@
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_H
#define GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_H
#include <grpc/support/port_platform.h>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "src/core/client_channel/config_selector.h"
#include "src/core/client_channel/client_channel_factory.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/ext/filters/channel_idle/idle_filter_state.h"
#include "src/core/lib/gprpp/single_set_ptr.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/observable.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/resolver/resolver.h"
#include "src/core/service_config/service_config.h"
namespace grpc_core {
class ClientChannel : public Channel {
public:
static absl::StatusOr<OrphanablePtr<Channel>> Create(
std::string target, ChannelArgs channel_args,
grpc_channel_stack_type channel_stack_type);
// Do not instantiate directly -- use Create() instead.
ClientChannel(std::string target_uri, ChannelArgs args,
std::string uri_to_resolve,
RefCountedPtr<ServiceConfig> default_service_config,
ClientChannelFactory* client_channel_factory);
~ClientChannel() override;
void Orphan() override;
grpc_call* CreateCall(grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* cq,
grpc_pollset_set* /*pollset_set_alternative*/,
Slice path, absl::optional<Slice> authority,
Timestamp deadline, bool registered_method) override;
grpc_event_engine::experimental::EventEngine* event_engine()
const override {
return event_engine_.get();
}
// FIXME: should we support lame channels somehow?
bool IsLame() const override { return false; }
bool SupportsConnectivityWatcher() const override { return true; }
// Returns the current connectivity state. If try_to_connect is true,
// triggers a connection attempt if not already connected.
grpc_connectivity_state CheckConnectivityState(bool try_to_connect) override;
void WatchConnectivityState(
grpc_connectivity_state last_observed_state, Timestamp deadline,
grpc_completion_queue* cq, void* tag) override;
// Starts and stops a connectivity watch. The watcher will be initially
// notified as soon as the state changes from initial_state and then on
// every subsequent state change until either the watch is stopped or
// it is notified that the state has changed to SHUTDOWN.
//
// This is intended to be used when starting watches from code inside of
// C-core (e.g., for a nested control plane channel for things like xds).
void AddConnectivityWatcher(
grpc_connectivity_state initial_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) override;
void RemoveConnectivityWatcher(
AsyncConnectivityStateWatcherInterface* watcher) override;
void GetInfo(const grpc_channel_info* channel_info) override;
void ResetConnectionBackoff() override;
void Ping(grpc_completion_queue* cq, void* tag) override;
// Flag that this object gets stored in channel args as a raw pointer.
struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() {
return "grpc.internal.client_channel";
}
private:
class ResolverResultHandler;
class ClientChannelControlHelper;
class SubchannelWrapper;
class LoadBalancedCallDestination;
void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void DestroyResolverAndLbPolicyLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void OnResolverResultChangedLocked(Resolver::Result result)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void OnResolverErrorLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
absl::Status CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const ChannelArgs& args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr<ServiceConfig> service_config,
RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void UpdateServiceConfigInDataPlaneLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void UpdateStateLocked(grpc_connectivity_state state,
const absl::Status& status, const char* reason)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void StartIdleTimer();
CallInitiator CreateCall(ClientMetadataHandle client_initial_metadata,
Arena* arena);
// Applies service config settings from config_selector to the call.
// May modify call context and client_initial_metadata.
absl::Status ApplyServiceConfigToCall(
ConfigSelector& config_selector,
ClientMetadataHandle& client_initial_metadata) const;
// Does an LB pick for a call. Returns one of the following things:
// - Continue{}, meaning to queue the pick
// - a non-OK status, meaning to fail the call
// - a connected subchannel, meaning that the pick is complete
// When the pick is complete, pushes client_initial_metadata onto
// call_initiator. Also adds the subchannel call tracker (if any) to
// context.
LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>> PickSubchannel(
LoadBalancingPolicy::SubchannelPicker& picker,
UnstartedCallHandler& unstarted_handler);
//
// Fields set at construction and never modified.
//
ChannelArgs channel_args_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
std::string uri_to_resolve_;
const size_t service_config_parser_index_;
RefCountedPtr<ServiceConfig> default_service_config_;
ClientChannelFactory* client_channel_factory_;
std::string default_authority_;
channelz::ChannelNode* channelz_node_;
GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_;
grpc_pollset_set* interested_parties_;
//
// State for LB calls.
//
CallSizeEstimator lb_call_size_estimator_;
MemoryAllocator lb_call_allocator_;
//
// Idleness state.
//
const Duration idle_timeout_;
IdleFilterState idle_state_{false};
SingleSetPtr<Activity, typename ActivityPtr::deleter_type> idle_activity_;
//
// Fields related to name resolution.
//
struct ResolverDataForCalls {
RefCountedPtr<ConfigSelector> config_selector;
RefCountedPtr<CallDestination> call_destination;
};
Observable<absl::StatusOr<ResolverDataForCalls>> resolver_data_for_calls_;
//
// Fields related to LB picks.
//
Observable<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> picker_;
//
// Fields used in the control plane. Guarded by work_serializer.
//
std::shared_ptr<WorkSerializer> work_serializer_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_);
OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(*work_serializer_);
bool previous_resolution_contained_addresses_
ABSL_GUARDED_BY(*work_serializer_) = false;
RefCountedPtr<ServiceConfig> saved_service_config_
ABSL_GUARDED_BY(*work_serializer_);
RefCountedPtr<ConfigSelector> saved_config_selector_
ABSL_GUARDED_BY(*work_serializer_);
OrphanablePtr<LoadBalancingPolicy> lb_policy_
ABSL_GUARDED_BY(*work_serializer_);
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_
ABSL_GUARDED_BY(*work_serializer_);
// The number of SubchannelWrapper instances referencing a given Subchannel.
std::map<Subchannel*, int> subchannel_refcount_map_
ABSL_GUARDED_BY(*work_serializer_);
// The set of SubchannelWrappers that currently exist.
// No need to hold a ref, since the set is updated in the control-plane
// work_serializer when the SubchannelWrappers are created and destroyed.
absl::flat_hash_set<SubchannelWrapper*> subchannel_wrappers_
ABSL_GUARDED_BY(*work_serializer_);
int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1;
absl::Status disconnect_error_ ABSL_GUARDED_BY(*work_serializer_);
//
// Fields accessed via GetChannelInfo().
//
Mutex info_mu_;
std::string info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_);
std::string info_service_config_json_ ABSL_GUARDED_BY(info_mu_);
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_H

@ -1109,7 +1109,7 @@ class ClientChannelFilter::ClientChannelControlHelper final
const ChannelArgs& args) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
ChannelArgs subchannel_args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs subchannel_args = Subchannel::MakeSubchannelArgs(
args, per_address_args, chand_->subchannel_pool_,
chand_->default_authority_);
// Create subchannel.
@ -1359,31 +1359,6 @@ ClientChannelFilter::CreateLoadBalancedCallPromise(
return call_ptr->MakeCallPromise(std::move(call_args), std::move(lb_call));
}
ChannelArgs ClientChannelFilter::MakeSubchannelArgs(
const ChannelArgs& channel_args, const ChannelArgs& address_args,
const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
const std::string& channel_default_authority) {
// Note that we start with the channel-level args and then apply the
// per-address args, so that if a value is present in both, the one
// in the channel-level args is used. This is particularly important
// for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow
// resolvers to set on a per-address basis only if the application
// did not explicitly set it at the channel level.
return channel_args.UnionWith(address_args)
.SetObject(subchannel_pool)
// If we haven't already set the default authority arg (i.e., it
// was not explicitly set by the application nor overridden by
// the resolver), add it from the channel's default.
.SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority)
// Remove channel args that should not affect subchannel
// uniqueness.
.Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME)
.Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING)
.Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
// Remove all keys with the no-subchannel prefix.
.RemoveAllKeysWithPrefix(GRPC_ARG_NO_SUBCHANNEL_PREFIX);
}
void ClientChannelFilter::ReprocessQueuedResolverCalls() {
for (CallData* calld : resolver_queued_calls_) {
calld->RemoveCallFromResolverQueuedCallsLocked();

@ -86,9 +86,6 @@
// Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri"
// Channel arg containing a pointer to the ClientChannelFilter object.
#define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel_filter"
// Max number of batches that can be pending on a call at any given
// time. This includes one batch for each of the following ops:
// recv_initial_metadata
@ -112,7 +109,9 @@ class ClientChannelFilter final {
// Flag that this object gets stored in channel args as a raw pointer.
struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() { return GRPC_ARG_CLIENT_CHANNEL; }
static absl::string_view ChannelArgName() {
return "grpc.internal.client_channel_filter";
}
static ArenaPromise<ServerMetadataHandle> MakeCallPromise(
grpc_channel_element* elem, CallArgs call_args,
@ -166,12 +165,6 @@ class ClientChannelFilter final {
CallArgs call_args, absl::AnyInvocable<void()> on_commit,
bool is_transparent_retry);
// Exposed for testing only.
static ChannelArgs MakeSubchannelArgs(
const ChannelArgs& channel_args, const ChannelArgs& address_args,
const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
const std::string& channel_default_authority);
private:
class CallData;
class FilterBasedCallData;

@ -38,8 +38,12 @@
#include <grpc/status.h>
#include <grpc/support/log.h>
<<<<<<< HEAD
#include "src/core/channelz/channel_trace.h"
#include "src/core/channelz/channelz.h"
=======
#include "src/core/client_channel/client_channel_internal.h"
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
#include "src/core/client_channel/subchannel_pool_interface.h"
#include "src/core/handshaker/proxy_mapper_registry.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
@ -51,6 +55,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -96,52 +101,31 @@ DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
//
ConnectedSubchannel::ConnectedSubchannel(
grpc_channel_stack* channel_stack, const ChannelArgs& args,
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: RefCounted<ConnectedSubchannel>(
GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount)
? "ConnectedSubchannel"
: nullptr),
channel_stack_(channel_stack),
args_(args),
channelz_subchannel_(std::move(channelz_subchannel)) {}
ConnectedSubchannel::~ConnectedSubchannel() {
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
}
void ConnectedSubchannel::StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch = std::move(watcher);
op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
op->bind_pollset_set = interested_parties;
grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
grpc_closure* on_ack) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
op->send_ping.on_initiate = on_initiate;
op->send_ping.on_ack = on_ack;
elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
//
// LegacyConnectedSubchannel
//
size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const {
return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
channel_stack_->call_stack_size;
}
class LegacyConnectedSubchannel : public ConnectedSubchannel {
public:
LegacyConnectedSubchannel(
RefCountedPtr<grpc_channel_stack> channel_stack, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
channel_stack_(std::move(channel_stack)) {}
ArenaPromise<ServerMetadataHandle> ConnectedSubchannel::MakeCallPromise(
CallArgs call_args) {
// If not using channelz, we just need to call the channel stack.
if (channelz_subchannel() == nullptr) {
return channel_stack_->MakeClientCallPromise(std::move(call_args));
~LegacyConnectedSubchannel() override {
channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel");
}
<<<<<<< HEAD
// Otherwise, we need to wrap the channel stack promise with code that
// handles the channelz updates.
return OnCancel(
@ -165,6 +149,127 @@ ArenaPromise<ServerMetadataHandle> ConnectedSubchannel::MakeCallPromise(
channelz_subchannel->RecordCallFailed();
});
}
=======
void StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch = std::move(watcher);
op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
op->bind_pollset_set = interested_parties;
grpc_channel_element* elem =
grpc_channel_stack_element(channel_stack_.get(), 0);
elem->filter->start_transport_op(elem, op);
}
void Ping(absl::AnyInvocable<void(absl::Status)> on_ack) override {
Crash("call v3 ping method called in legacy impl");
}
void StartCall(UnstartedCallHandler) override {
Crash("call v3 StartCall() method called in legacy impl");
}
grpc_channel_stack* channel_stack() const override {
return channel_stack_.get();
}
size_t GetInitialCallSizeEstimate() const override {
return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
channel_stack_->call_stack_size;
}
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args) override {
// If not using channelz, we just need to call the channel stack.
if (channelz_subchannel() == nullptr) {
return channel_stack_->MakeClientCallPromise(std::move(call_args));
}
// Otherwise, we need to wrap the channel stack promise with code that
// handles the channelz updates.
return OnCancel(
Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)),
[self = Ref()](ServerMetadataHandle metadata) {
channelz::SubchannelNode* channelz_subchannel =
self->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
if (metadata->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) {
channelz_subchannel->RecordCallFailed();
} else {
channelz_subchannel->RecordCallSucceeded();
}
return metadata;
}),
[self = Ref()]() {
channelz::SubchannelNode* channelz_subchannel =
self->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
channelz_subchannel->RecordCallFailed();
});
}
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->send_ping.on_initiate = on_initiate;
op->send_ping.on_ack = on_ack;
grpc_channel_element* elem =
grpc_channel_stack_element(channel_stack_.get(), 0);
elem->filter->start_transport_op(elem, op);
}
private:
RefCountedPtr<grpc_channel_stack> channel_stack_;
};
//
// NewConnectedSubchannel
//
class NewConnectedSubchannel : public ConnectedSubchannel {
public:
NewConnectedSubchannel(
RefCountedPtr<CallFilters::Stack> filter_stack,
OrphanablePtr<Transport> transport, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
filter_stack_(std::move(filter_stack)),
transport_(std::move(transport)) {}
void StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
// FIXME: add new transport API for this in v3 stack
}
void Ping(absl::AnyInvocable<void(absl::Status)> on_ack) override {
// FIXME: add new transport API for this in v3 stack
}
void StartCall(UnstartedCallHandler unstarted_handler) override {
auto handler = unstarted_handler.StartCall(filter_stack_);
transport_->client_transport()->StartCall(std::move(handler));
}
grpc_channel_stack* channel_stack() const override { return nullptr; }
size_t GetInitialCallSizeEstimate() const override { return 0; }
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args) override {
Crash("legacy MakeCallPromise() method called in call v3 impl");
}
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override {
Crash("legacy ping method called in call v3 impl");
}
private:
RefCountedPtr<CallFilters::Stack> filter_stack_;
OrphanablePtr<Transport> transport_;
};
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
//
// SubchannelCall
@ -769,37 +874,46 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
}
bool Subchannel::PublishTransportLocked() {
// Construct channel stack.
// Builder takes ownership of transport.
ChannelStackBuilderImpl builder(
"subchannel", GRPC_CLIENT_SUBCHANNEL,
connecting_result_.channel_args.SetObject(
std::exchange(connecting_result_.transport, nullptr)));
if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
return false;
}
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> stk = builder.Build();
if (!stk.ok()) {
auto error = absl_status_to_grpc_error(stk.status());
connecting_result_.Reset();
gpr_log(GPR_ERROR,
"subchannel %p %s: error initializing subchannel stack: %s", this,
key_.ToString().c_str(), StatusToString(error).c_str());
return false;
auto socket_node = std::move(connecting_result_.socket_node);
if (!IsCallV3Enabled()) {
// Construct channel stack.
// Builder takes ownership of transport.
ChannelStackBuilderImpl builder(
"subchannel", GRPC_CLIENT_SUBCHANNEL,
connecting_result_.channel_args.SetObject(
std::exchange(connecting_result_.transport, nullptr)));
if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
return false;
}
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> stack = builder.Build();
if (!stack.ok()) {
connecting_result_.Reset();
gpr_log(GPR_ERROR,
"subchannel %p %s: error initializing subchannel stack: %s", this,
key_.ToString().c_str(), stack.status().ToString().c_str());
return false;
}
connected_subchannel_ = MakeRefCounted<LegacyConnectedSubchannel>(
std::move(*stack), args_, channelz_node_);
} else {
// Call v3 stack.
CallFilters::StackBuilder builder;
// FIXME: add filters registered for CLIENT_SUBCHANNEL
auto filter_stack = builder.Build();
connected_subchannel_ = MakeRefCounted<NewConnectedSubchannel>(
std::move(filter_stack),
OrphanablePtr<Transport>(
std::exchange(connecting_result_.transport, nullptr)),
args_, channelz_node_);
}
RefCountedPtr<channelz::SocketNode> socket =
std::move(connecting_result_.socket_node);
connecting_result_.Reset();
if (shutdown_) return false;
// Publish.
connected_subchannel_.reset(
new ConnectedSubchannel(stk->release(), args_, channelz_node_));
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
gpr_log(GPR_INFO, "subchannel %p %s: new connected subchannel at %p", this,
key_.ToString().c_str(), connected_subchannel_.get());
}
if (channelz_node_ != nullptr) {
channelz_node_->SetChildSocket(std::move(socket));
channelz_node_->SetChildSocket(std::move(socket_node));
}
// Start watching connected subchannel.
connected_subchannel_->StartWatch(
@ -810,4 +924,29 @@ bool Subchannel::PublishTransportLocked() {
return true;
}
ChannelArgs Subchannel::MakeSubchannelArgs(
const ChannelArgs& channel_args, const ChannelArgs& address_args,
const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
const std::string& channel_default_authority) {
// Note that we start with the channel-level args and then apply the
// per-address args, so that if a value is present in both, the one
// in the channel-level args is used. This is particularly important
// for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow
// resolvers to set on a per-address basis only if the application
// did not explicitly set it at the channel level.
return channel_args.UnionWith(address_args)
.SetObject(subchannel_pool)
// If we haven't already set the default authority arg (i.e., it
// was not explicitly set by the application nor overridden by
// the resolver), add it from the channel's default.
.SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority)
// Remove channel args that should not affect subchannel
// uniqueness.
.Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME)
.Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING)
.Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
// Remove all keys with the no-subchannel prefix.
.RemoveAllKeysWithPrefix(GRPC_ARG_NO_SUBCHANNEL_PREFIX);
}
} // namespace grpc_core

@ -66,28 +66,32 @@ class SubchannelCall;
class ConnectedSubchannel final : public RefCounted<ConnectedSubchannel> {
public:
ConnectedSubchannel(
grpc_channel_stack* channel_stack, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
~ConnectedSubchannel() override;
void StartWatch(grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_channel_stack* channel_stack() const { return channel_stack_; }
const ChannelArgs& args() const { return args_; }
channelz::SubchannelNode* channelz_subchannel() const {
return channelz_subchannel_.get();
}
size_t GetInitialCallSizeEstimate() const;
virtual void StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) = 0;
ArenaPromise<ServerMetadataHandle> MakeCallPromise(CallArgs call_args);
// Methods for v3 stack.
virtual void StartCall(UnstartedCallHandler unstarted_handler) = 0;
virtual void Ping(absl::AnyInvocable<void(absl::Status)> on_ack) = 0;
// Methods for legacy stack.
virtual grpc_channel_stack* channel_stack() const = 0;
virtual size_t GetInitialCallSizeEstimate() const = 0;
virtual ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args) = 0;
virtual void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) = 0;
protected:
ConnectedSubchannel(
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
private:
grpc_channel_stack* channel_stack_;
ChannelArgs args_;
// ref counted pointer to the channelz node in this connected subchannel's
// owning subchannel.
@ -272,6 +276,12 @@ class Subchannel final : public DualRefCounted<Subchannel> {
return event_engine_;
}
// Exposed for testing purposes only.
static ChannelArgs MakeSubchannelArgs(
const ChannelArgs& channel_args, const ChannelArgs& address_args,
const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
const std::string& channel_default_authority);
private:
// Tears down any existing connection, and arranges for destruction
void Orphaned() override ABSL_LOCKS_EXCLUDED(mu_);

@ -0,0 +1,316 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(ctiller): Add a unit test suite for these filters once it's practical to
// mock transport operations.
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/channel_idle/channel_idle_filter.h"
#include <functional>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/meta/type_traits.h"
#include "absl/random/random.h"
#include "absl/types/optional.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/per_cpu.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc_core {
const NoInterceptor ChannelIdleFilter::Call::OnClientInitialMetadata;
const NoInterceptor ChannelIdleFilter::Call::OnServerInitialMetadata;
const NoInterceptor ChannelIdleFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ChannelIdleFilter::Call::OnClientToServerMessage;
const NoInterceptor ChannelIdleFilter::Call::OnServerToClientMessage;
namespace {
// TODO(roth): This can go back to being a constant when the experiment
// is removed.
Duration DefaultIdleTimeout() {
if (IsClientIdlenessEnabled()) return Duration::Minutes(30);
return Duration::Infinity();
}
// If these settings change, make sure that we are not sending a GOAWAY for
// inproc transport, since a GOAWAY to inproc ends up destroying the transport.
const auto kDefaultMaxConnectionAge = Duration::Infinity();
const auto kDefaultMaxConnectionAgeGrace = Duration::Infinity();
const auto kDefaultMaxConnectionIdle = Duration::Infinity();
const auto kMaxConnectionAgeJitter = 0.1;
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
} // namespace
#define GRPC_IDLE_FILTER_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
} \
} while (0)
Duration GetClientIdleTimeout(const ChannelArgs& args) {
return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS)
.value_or(DefaultIdleTimeout());
}
struct MaxAgeFilter::Config {
Duration max_connection_age;
Duration max_connection_idle;
Duration max_connection_age_grace;
bool enable() const {
return max_connection_age != Duration::Infinity() ||
max_connection_idle != Duration::Infinity();
}
// A random jitter of +/-10% will be added to MAX_CONNECTION_AGE and
// MAX_CONNECTION_IDLE to spread out reconnection storms.
static Config FromChannelArgs(const ChannelArgs& args) {
const Duration args_max_age =
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_MS)
.value_or(kDefaultMaxConnectionAge);
const Duration args_max_idle =
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
.value_or(kDefaultMaxConnectionIdle);
const Duration args_max_age_grace =
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)
.value_or(kDefaultMaxConnectionAgeGrace);
// generate a random number between 1 - kMaxConnectionAgeJitter and
// 1 + kMaxConnectionAgeJitter
struct BitGen {
Mutex mu;
absl::BitGen bit_gen ABSL_GUARDED_BY(mu);
double MakeUniformDouble(double min, double max) {
MutexLock lock(&mu);
return absl::Uniform(bit_gen, min, max);
}
};
static NoDestruct<PerCpu<BitGen>> bit_gen(PerCpuOptions().SetMaxShards(8));
const double multiplier = bit_gen->this_cpu().MakeUniformDouble(
1.0 - kMaxConnectionAgeJitter, 1.0 + kMaxConnectionAgeJitter);
// GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result
// will not be cast to int implicitly before the comparison.
return Config{args_max_age * multiplier, args_max_idle * multiplier,
args_max_age_grace};
}
};
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
ClientIdleFilter filter(filter_args.channel_stack(),
GetClientIdleTimeout(args));
return absl::StatusOr<ClientIdleFilter>(std::move(filter));
}
absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
MaxAgeFilter filter(filter_args.channel_stack(),
Config::FromChannelArgs(args));
return absl::StatusOr<MaxAgeFilter>(std::move(filter));
}
void MaxAgeFilter::Shutdown() {
max_age_activity_.Reset();
ChannelIdleFilter::Shutdown();
}
void MaxAgeFilter::PostInit() {
struct StartupClosure {
RefCountedPtr<grpc_channel_stack> channel_stack;
MaxAgeFilter* filter;
grpc_closure closure;
};
auto run_startup = [](void* p, grpc_error_handle) {
auto* startup = static_cast<StartupClosure*>(p);
// Trigger idle timer
startup->filter->IncreaseCallCount();
startup->filter->DecreaseCallCount();
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch.reset(
new ConnectivityWatcher(startup->filter));
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
grpc_channel_next_op(
grpc_channel_stack_element(startup->channel_stack.get(), 0), op);
delete startup;
};
auto* startup =
new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}};
GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &startup->closure, absl::OkStatus());
auto channel_stack = this->channel_stack()->Ref();
// Start the max age timer
if (max_connection_age_ != Duration::Infinity()) {
max_age_activity_.Set(MakeActivity(
TrySeq(
// First sleep until the max connection age
Sleep(Timestamp::Now() + max_connection_age_),
// Then send a goaway.
[this] {
GRPC_CHANNEL_STACK_REF(this->channel_stack(),
"max_age send_goaway");
// Jump out of the activity to send the goaway.
auto fn = [](void* arg, grpc_error_handle) {
auto* channel_stack = static_cast<grpc_channel_stack*>(arg);
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->goaway_error = grpc_error_set_int(
GRPC_ERROR_CREATE("max_age"),
StatusIntProperty::kHttp2Error, GRPC_HTTP2_NO_ERROR);
grpc_channel_element* elem =
grpc_channel_stack_element(channel_stack, 0);
elem->filter->start_transport_op(elem, op);
GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway");
};
ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr),
absl::OkStatus());
return Immediate(absl::OkStatus());
},
// Sleep for the grace period
[this] {
return Sleep(Timestamp::Now() + max_connection_age_grace_);
}),
ExecCtxWakeupScheduler(),
[channel_stack, this](absl::Status status) {
// OnDone -- close the connection if the promise completed
// successfully.
// (if it did not, it was cancelled)
if (status.ok()) CloseChannel();
},
channel_stack->EventEngine()));
}
}
bool ChannelIdleFilter::StartTransportOp(grpc_transport_op* op) {
// Catch the disconnect_with_error transport op.
if (!op->disconnect_with_error.ok()) Shutdown();
// Pass the op to the next filter.
return false;
}
void ChannelIdleFilter::Shutdown() {
// IncreaseCallCount() introduces a phony call and prevent the timer from
// being reset by other threads.
IncreaseCallCount();
activity_.Reset();
}
void ChannelIdleFilter::IncreaseCallCount() {
idle_filter_state_->IncreaseCallCount();
}
void ChannelIdleFilter::DecreaseCallCount() {
if (idle_filter_state_->DecreaseCallCount()) {
// If there are no more calls in progress, start the idle timer.
StartIdleTimer();
}
}
void ChannelIdleFilter::StartIdleTimer() {
GRPC_IDLE_FILTER_LOG("timer has started");
auto idle_filter_state = idle_filter_state_;
// Hold a ref to the channel stack for the timer callback.
auto channel_stack = channel_stack_->Ref();
auto timeout = client_idle_timeout_;
auto promise = Loop([timeout, idle_filter_state]() {
return TrySeq(Sleep(Timestamp::Now() + timeout),
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> {
if (idle_filter_state->CheckTimer()) {
return Continue{};
} else {
return absl::OkStatus();
}
});
});
activity_.Set(MakeActivity(
std::move(promise), ExecCtxWakeupScheduler{},
[channel_stack, this](absl::Status status) {
if (status.ok()) CloseChannel();
},
channel_stack->EventEngine()));
}
void ChannelIdleFilter::CloseChannel() {
auto* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = grpc_error_set_int(
GRPC_ERROR_CREATE("enter idle"),
StatusIntProperty::ChannelConnectivityState, GRPC_CHANNEL_IDLE);
// Pass the transport op down to the channel stack.
auto* elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
const grpc_channel_filter ClientIdleFilter::kFilter =
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>(
"client_idle");
const grpc_channel_filter MaxAgeFilter::kFilter =
MakePromiseBasedFilter<MaxAgeFilter, FilterEndpoint::kServer>("max_age");
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
GPR_ASSERT(MaxAgeFilter::kFilter.init_call != nullptr);
if (!IsV3ChannelIdleFiltersEnabled()) return;
if (!IsCallV3Enabled()) {
builder->channel_init()
->RegisterFilter<ClientIdleFilter>(GRPC_CLIENT_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return GetClientIdleTimeout(channel_args) != Duration::Infinity();
});
}
builder->channel_init()
->RegisterFilter<MaxAgeFilter>(GRPC_SERVER_CHANNEL)
.ExcludeFromMinimalStack()
.If([](const ChannelArgs& channel_args) {
return MaxAgeFilter::Config::FromChannelArgs(channel_args).enable();
});
}
MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack,
const Config& max_age_config)
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle),
max_connection_age_(max_age_config.max_connection_age),
max_connection_age_grace_(max_age_config.max_connection_age_grace) {}
} // namespace grpc_core

@ -0,0 +1,156 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_CHANNEL_IDLE_FILTER_H
#define GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_CHANNEL_IDLE_FILTER_H
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/impl/connectivity_state.h>
#include "src/core/ext/filters/channel_idle/idle_filter_state.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/single_set_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
Duration GetClientIdleTimeout(const ChannelArgs& args);
class ChannelIdleFilter : public ImplementChannelFilter<ChannelIdleFilter> {
public:
~ChannelIdleFilter() override = default;
ChannelIdleFilter(const ChannelIdleFilter&) = delete;
ChannelIdleFilter& operator=(const ChannelIdleFilter&) = delete;
ChannelIdleFilter(ChannelIdleFilter&&) = default;
ChannelIdleFilter& operator=(ChannelIdleFilter&&) = default;
class Call {
public:
explicit Call(ChannelIdleFilter* filter) : filter_(filter) {
filter_->IncreaseCallCount();
}
~Call() { MaybeDecrement(); }
static const NoInterceptor OnClientInitialMetadata;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
void OnFinalize(const grpc_call_final_info*) { MaybeDecrement(); }
private:
void MaybeDecrement() {
auto* filter = std::exchange(filter_, nullptr);
if (filter != nullptr) filter->DecreaseCallCount();
}
ChannelIdleFilter* filter_;
};
bool StartTransportOp(grpc_transport_op* op) override;
protected:
using SingleSetActivityPtr =
SingleSetPtr<Activity, typename ActivityPtr::deleter_type>;
ChannelIdleFilter(grpc_channel_stack* channel_stack,
Duration client_idle_timeout)
: channel_stack_(channel_stack),
client_idle_timeout_(client_idle_timeout) {}
grpc_channel_stack* channel_stack() { return channel_stack_; };
virtual void Shutdown();
void CloseChannel();
void IncreaseCallCount();
void DecreaseCallCount();
private:
void StartIdleTimer();
// The channel stack to which we take refs for pending callbacks.
grpc_channel_stack* channel_stack_;
Duration client_idle_timeout_;
std::shared_ptr<IdleFilterState> idle_filter_state_{
std::make_shared<IdleFilterState>(false)};
SingleSetActivityPtr activity_;
};
class ClientIdleFilter final : public ChannelIdleFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientIdleFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
private:
using ChannelIdleFilter::ChannelIdleFilter;
};
class MaxAgeFilter final : public ChannelIdleFilter {
public:
static const grpc_channel_filter kFilter;
struct Config;
static absl::StatusOr<MaxAgeFilter> Create(const ChannelArgs& args,
ChannelFilter::Args filter_args);
void PostInit() override;
private:
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface {
public:
explicit ConnectivityWatcher(MaxAgeFilter* filter)
: channel_stack_(filter->channel_stack()->Ref()), filter_(filter) {}
~ConnectivityWatcher() override = default;
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status&) override {
if (new_state == GRPC_CHANNEL_SHUTDOWN) filter_->Shutdown();
}
private:
RefCountedPtr<grpc_channel_stack> channel_stack_;
MaxAgeFilter* filter_;
};
MaxAgeFilter(grpc_channel_stack* channel_stack, const Config& max_age_config);
void Shutdown() override;
SingleSetActivityPtr max_age_activity_;
Duration max_connection_age_;
Duration max_connection_age_grace_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_CHANNEL_IDLE_CHANNEL_IDLE_FILTER_H

@ -230,6 +230,7 @@ struct grpc_channel_stack {
// full C++-ification for now.
void IncrementRefCount();
void Unref();
void Unref(const grpc_core::DebugLocation& location, const char* reason);
grpc_core::RefCountedPtr<grpc_channel_stack> Ref() {
IncrementRefCount();
return grpc_core::RefCountedPtr<grpc_channel_stack>(this);
@ -345,6 +346,11 @@ inline void grpc_channel_stack::Unref() {
GRPC_CHANNEL_STACK_UNREF(this, "smart_pointer");
}
inline void grpc_channel_stack::Unref(const grpc_core::DebugLocation&,
const char* reason) {
GRPC_CHANNEL_STACK_UNREF(this, reason);
}
inline void grpc_call_stack::IncrementRefCount() {
GRPC_CALL_STACK_REF(this, "smart_pointer");
}

@ -29,8 +29,6 @@ const char* const description_call_status_override_on_cancellation =
"with cancellation.";
const char* const additional_constraints_call_status_override_on_cancellation =
"{}";
const char* const description_call_v3 = "Promise-based call version 3.";
const char* const additional_constraints_call_v3 = "{}";
const char* const description_canary_client_privacy =
"If set, canary client privacy";
const char* const additional_constraints_canary_client_privacy = "{}";
@ -128,6 +126,24 @@ const char* const description_work_serializer_dispatch =
const char* const additional_constraints_work_serializer_dispatch = "{}";
const uint8_t required_experiments_work_serializer_dispatch[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient)};
<<<<<<< HEAD
=======
const char* const description_call_v3 = "Promise-based call version 3.";
const char* const additional_constraints_call_v3 = "{}";
const uint8_t required_experiments_call_v3[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient),
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineListener),
static_cast<uint8_t>(grpc_core::kExperimentIdWorkSerializerDispatch)};
const char* const description_wrr_delegate_to_pick_first =
"Change WRR code to delegate to pick_first as per dualstack backend "
"design.";
const char* const additional_constraints_wrr_delegate_to_pick_first = "{}";
#ifdef NDEBUG
const bool kDefaultForDebugOnly = false;
#else
const bool kDefaultForDebugOnly = true;
#endif
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
} // namespace
namespace grpc_core {
@ -136,9 +152,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"call_status_override_on_cancellation",
description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation, nullptr, 0,
<<<<<<< HEAD
true, true},
{"call_v3", description_call_v3, additional_constraints_call_v3, nullptr, 0,
false, true},
=======
kDefaultForDebugOnly, true},
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
{"canary_client_privacy", description_canary_client_privacy,
additional_constraints_canary_client_privacy, nullptr, 0, false, false},
{"client_privacy", description_client_privacy,
@ -200,6 +220,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch,
required_experiments_work_serializer_dispatch, 1, false, true},
<<<<<<< HEAD
=======
{"call_v3", description_call_v3, additional_constraints_call_v3,
required_experiments_call_v3, 3, false, true},
{"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first,
additional_constraints_wrr_delegate_to_pick_first, nullptr, 0, true, true},
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
};
} // namespace grpc_core
@ -211,8 +238,6 @@ const char* const description_call_status_override_on_cancellation =
"with cancellation.";
const char* const additional_constraints_call_status_override_on_cancellation =
"{}";
const char* const description_call_v3 = "Promise-based call version 3.";
const char* const additional_constraints_call_v3 = "{}";
const char* const description_canary_client_privacy =
"If set, canary client privacy";
const char* const additional_constraints_canary_client_privacy = "{}";
@ -310,6 +335,24 @@ const char* const description_work_serializer_dispatch =
const char* const additional_constraints_work_serializer_dispatch = "{}";
const uint8_t required_experiments_work_serializer_dispatch[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient)};
<<<<<<< HEAD
=======
const char* const description_call_v3 = "Promise-based call version 3.";
const char* const additional_constraints_call_v3 = "{}";
const uint8_t required_experiments_call_v3[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient),
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineListener),
static_cast<uint8_t>(grpc_core::kExperimentIdWorkSerializerDispatch)};
const char* const description_wrr_delegate_to_pick_first =
"Change WRR code to delegate to pick_first as per dualstack backend "
"design.";
const char* const additional_constraints_wrr_delegate_to_pick_first = "{}";
#ifdef NDEBUG
const bool kDefaultForDebugOnly = false;
#else
const bool kDefaultForDebugOnly = true;
#endif
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
} // namespace
namespace grpc_core {
@ -318,9 +361,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"call_status_override_on_cancellation",
description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation, nullptr, 0,
<<<<<<< HEAD
true, true},
{"call_v3", description_call_v3, additional_constraints_call_v3, nullptr, 0,
false, true},
=======
kDefaultForDebugOnly, true},
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
{"canary_client_privacy", description_canary_client_privacy,
additional_constraints_canary_client_privacy, nullptr, 0, false, false},
{"client_privacy", description_client_privacy,
@ -382,6 +429,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch,
required_experiments_work_serializer_dispatch, 1, false, true},
<<<<<<< HEAD
=======
{"call_v3", description_call_v3, additional_constraints_call_v3,
required_experiments_call_v3, 3, false, true},
{"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first,
additional_constraints_wrr_delegate_to_pick_first, nullptr, 0, true, true},
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
};
} // namespace grpc_core
@ -393,8 +447,6 @@ const char* const description_call_status_override_on_cancellation =
"with cancellation.";
const char* const additional_constraints_call_status_override_on_cancellation =
"{}";
const char* const description_call_v3 = "Promise-based call version 3.";
const char* const additional_constraints_call_v3 = "{}";
const char* const description_canary_client_privacy =
"If set, canary client privacy";
const char* const additional_constraints_canary_client_privacy = "{}";
@ -492,6 +544,24 @@ const char* const description_work_serializer_dispatch =
const char* const additional_constraints_work_serializer_dispatch = "{}";
const uint8_t required_experiments_work_serializer_dispatch[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient)};
<<<<<<< HEAD
=======
const char* const description_call_v3 = "Promise-based call version 3.";
const char* const additional_constraints_call_v3 = "{}";
const uint8_t required_experiments_call_v3[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient),
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineListener),
static_cast<uint8_t>(grpc_core::kExperimentIdWorkSerializerDispatch)};
const char* const description_wrr_delegate_to_pick_first =
"Change WRR code to delegate to pick_first as per dualstack backend "
"design.";
const char* const additional_constraints_wrr_delegate_to_pick_first = "{}";
#ifdef NDEBUG
const bool kDefaultForDebugOnly = false;
#else
const bool kDefaultForDebugOnly = true;
#endif
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
} // namespace
namespace grpc_core {
@ -500,9 +570,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"call_status_override_on_cancellation",
description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation, nullptr, 0,
<<<<<<< HEAD
true, true},
{"call_v3", description_call_v3, additional_constraints_call_v3, nullptr, 0,
false, true},
=======
kDefaultForDebugOnly, true},
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
{"canary_client_privacy", description_canary_client_privacy,
additional_constraints_canary_client_privacy, nullptr, 0, false, false},
{"client_privacy", description_client_privacy,
@ -564,6 +638,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch,
required_experiments_work_serializer_dispatch, 1, true, true},
<<<<<<< HEAD
=======
{"call_v3", description_call_v3, additional_constraints_call_v3,
required_experiments_call_v3, 3, false, true},
{"wrr_delegate_to_pick_first", description_wrr_delegate_to_pick_first,
additional_constraints_wrr_delegate_to_pick_first, nullptr, 0, true, true},
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
};
} // namespace grpc_core

@ -58,8 +58,19 @@ namespace grpc_core {
#if defined(GRPC_CFSTREAM)
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
<<<<<<< HEAD
inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; }
inline bool IsCallV3Enabled() { return false; }
=======
#endif
inline bool IsCallStatusOverrideOnCancellationEnabled() {
#ifdef NDEBUG
return false;
#else
return true;
#endif
}
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
inline bool IsCanaryClientPrivacyEnabled() { return false; }
inline bool IsClientPrivacyEnabled() { return false; }
inline bool IsEventEngineClientEnabled() { return false; }
@ -90,11 +101,28 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
<<<<<<< HEAD
=======
inline bool IsCallV3Enabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
#elif defined(GPR_WINDOWS)
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
<<<<<<< HEAD
inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; }
inline bool IsCallV3Enabled() { return false; }
=======
#endif
inline bool IsCallStatusOverrideOnCancellationEnabled() {
#ifdef NDEBUG
return false;
#else
return true;
#endif
}
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
inline bool IsCanaryClientPrivacyEnabled() { return false; }
inline bool IsClientPrivacyEnabled() { return false; }
inline bool IsEventEngineClientEnabled() { return false; }
@ -126,11 +154,28 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
<<<<<<< HEAD
=======
inline bool IsCallV3Enabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
#else
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
<<<<<<< HEAD
inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; }
inline bool IsCallV3Enabled() { return false; }
=======
#endif
inline bool IsCallStatusOverrideOnCancellationEnabled() {
#ifdef NDEBUG
return false;
#else
return true;
#endif
}
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
inline bool IsCanaryClientPrivacyEnabled() { return false; }
inline bool IsClientPrivacyEnabled() { return false; }
inline bool IsEventEngineClientEnabled() { return false; }
@ -164,12 +209,17 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_DISPATCH
inline bool IsWorkSerializerDispatchEnabled() { return true; }
<<<<<<< HEAD
=======
inline bool IsCallV3Enabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
#endif
#else
enum ExperimentIds {
kExperimentIdCallStatusOverrideOnCancellation,
kExperimentIdCallV3,
kExperimentIdCanaryClientPrivacy,
kExperimentIdClientPrivacy,
kExperimentIdEventEngineClient,
@ -195,16 +245,17 @@ enum ExperimentIds {
kExperimentIdUnconstrainedMaxQuotaBufferSize,
kExperimentIdWorkSerializerClearsTimeCache,
kExperimentIdWorkSerializerDispatch,
<<<<<<< HEAD
=======
kExperimentIdCallV3,
kExperimentIdWrrDelegateToPickFirst,
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
kNumExperiments
};
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
inline bool IsCallStatusOverrideOnCancellationEnabled() {
return IsExperimentEnabled(kExperimentIdCallStatusOverrideOnCancellation);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_V3
inline bool IsCallV3Enabled() {
return IsExperimentEnabled(kExperimentIdCallV3);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_CANARY_CLIENT_PRIVACY
inline bool IsCanaryClientPrivacyEnabled() {
return IsExperimentEnabled(kExperimentIdCanaryClientPrivacy);
@ -305,6 +356,17 @@ inline bool IsWorkSerializerClearsTimeCacheEnabled() {
inline bool IsWorkSerializerDispatchEnabled() {
return IsExperimentEnabled(kExperimentIdWorkSerializerDispatch);
}
<<<<<<< HEAD
=======
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_V3
inline bool IsCallV3Enabled() {
return IsExperimentEnabled(kExperimentIdCallV3);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WRR_DELEGATE_TO_PICK_FIRST
inline bool IsWrrDelegateToPickFirstEnabled() {
return IsExperimentEnabled(kExperimentIdWrrDelegateToPickFirst);
}
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];

@ -52,6 +52,7 @@
expiry: 2024/06/01
owner: ctiller@google.com
test_tags: []
requires: ["work_serializer_dispatch", "event_engine_listener", "event_engine_client"]
- name: canary_client_privacy
description:
If set, canary client privacy

@ -65,7 +65,22 @@ Channel::RegisteredCall::~RegisteredCall() {}
Channel::Channel(std::string target, const ChannelArgs& channel_args)
: target_(std::move(target)),
channelz_node_(channel_args.GetObjectRef<channelz::ChannelNode>()),
compression_options_(CompressionOptionsFromChannelArgs(channel_args)) {}
compression_options_(CompressionOptionsFromChannelArgs(channel_args)),
call_size_estimator_(1024),
allocator_(channel_args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryOwner()) {}
Arena* Channel::CreateArena() {
const size_t initial_size = call_size_estimator_.CallSizeEstimate();
global_stats().IncrementCallInitialSize(initial_size);
return Arena::Create(initial_size, &allocator_);
}
void Channel::DestroyArena(Arena* arena) {
call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
arena->Destroy();
}
Channel::RegisteredCall* Channel::RegisterCall(const char* method,
const char* host) {

@ -40,8 +40,10 @@
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/call_size_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
// Forward declaration to avoid dependency loop.
@ -68,9 +70,6 @@ class Channel : public RefCounted<Channel>,
virtual void Orphan() = 0;
virtual Arena* CreateArena() = 0;
virtual void DestroyArena(Arena* arena) = 0;
virtual bool IsLame() const = 0;
// TODO(roth): This should return a C++ type.
@ -125,6 +124,8 @@ class Channel : public RefCounted<Channel>,
virtual void Ping(grpc_completion_queue* cq, void* tag) = 0;
// TODO(roth): Remove these methods when LegacyChannel goes away.
Arena* CreateArena();
void DestroyArena(Arena* arena);
virtual grpc_channel_stack* channel_stack() const { return nullptr; }
virtual bool is_client() const { return true; }
virtual bool is_promising() const { return true; }
@ -137,6 +138,9 @@ class Channel : public RefCounted<Channel>,
const RefCountedPtr<channelz::ChannelNode> channelz_node_;
const grpc_compression_options compression_options_;
CallSizeEstimator call_size_estimator_;
MemoryAllocator allocator_;
Mutex mu_;
// The map key needs to be owned strings rather than unowned char*'s to
// guarantee that it outlives calls on the core channel (which may outlast

@ -22,10 +22,12 @@
#include <grpc/support/port_platform.h>
#include "src/core/channelz/channelz.h"
#include "src/core/client_channel/client_channel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/surface/legacy_channel.h"
@ -77,8 +79,12 @@ absl::StatusOr<OrphanablePtr<Channel>> ChannelCreate(
if (optional_transport != nullptr) {
args = args.SetObject(optional_transport);
}
// Delegate to legacy channel impl.
return LegacyChannel::Create(std::move(target), std::move(args),
// Delegate to appropriate channel impl.
if (!IsCallV3Enabled()) {
return LegacyChannel::Create(std::move(target), std::move(args),
channel_stack_type);
}
return ClientChannel::Create(std::move(target), std::move(args),
channel_stack_type);
}

@ -113,10 +113,14 @@ LegacyChannel::LegacyChannel(bool is_client, bool is_promising,
: Channel(std::move(target), channel_args),
is_client_(is_client),
is_promising_(is_promising),
<<<<<<< HEAD
channel_stack_(std::move(channel_stack)),
allocator_(channel_args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryOwner()) {
=======
channel_stack_(std::move(channel_stack)) {
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
// We need to make sure that grpc_shutdown() does not shut things down
// until after the channel is destroyed. However, the channel may not
// actually be destroyed by the time grpc_channel_destroy() returns,

@ -39,7 +39,11 @@
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_stack_type.h"
<<<<<<< HEAD
#include "src/core/lib/transport/call_arena_allocator.h"
=======
#include "src/core/lib/transport/transport.h"
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
namespace grpc_core {
@ -56,6 +60,7 @@ class LegacyChannel final : public Channel {
void Orphan() override;
<<<<<<< HEAD
Arena* CreateArena() override {
const size_t initial_size = call_size_estimator_.CallSizeEstimate();
global_stats().IncrementCallInitialSize(initial_size);
@ -66,6 +71,8 @@ class LegacyChannel final : public Channel {
arena->Destroy();
}
=======
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
bool IsLame() const override;
grpc_call* CreateCall(grpc_call* parent_call, uint32_t propagation_mask,
@ -114,8 +121,11 @@ class LegacyChannel final : public Channel {
const bool is_client_;
const bool is_promising_;
RefCountedPtr<grpc_channel_stack> channel_stack_;
<<<<<<< HEAD
CallSizeEstimator call_size_estimator_{1024};
grpc_event_engine::experimental::MemoryAllocator allocator_;
=======
>>>>>>> 513bd21ea9db49d061e0382289319ddb126f812c
};
} // namespace grpc_core

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/latch.h"
@ -29,6 +30,7 @@
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/transport/call_arena_allocator.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/transport/message.h"
#include "src/core/lib/transport/metadata.h"
@ -493,6 +495,11 @@ class CallHandler {
explicit CallHandler(RefCountedPtr<CallSpineInterface> spine)
: spine_(std::move(spine)) {}
template <typename ContextType>
void SetContext(ContextType context) {
// FIXME: implement
}
auto PullClientInitialMetadata() {
return spine_->PullClientInitialMetadata();
}
@ -609,6 +616,109 @@ class UnstartedCallHandler {
RefCountedPtr<CallSpineInterface> spine_;
};
class UnstartedCallHandler;
// CallDestination is responsible for starting an UnstartedCallHandler
// and then processing operations on the resulting CallHandler.
//
// Examples of CallDestinations include:
// - a client transport
// - the server API
// - a load-balanced call in the client channel
// - a hijacking filter (see DelegatingCallDestination below)
//
// FIXME: do we want this to be ref-counted? that might not be
// desirable for the hijacking filter case, where we want the filter stack
// to own the filter rather than having every call take its own ref to every
// hijacking filter.
class CallDestination : public DualRefCounted<CallDestination> {
public:
virtual void StartCall(UnstartedCallHandler unstarted_call_handler) = 0;
};
// A delegating CallDestination for use as a hijacking filter.
// Implementations may look at the unprocessed initial metadata
// and decide to do one of two things:
//
// 1. It can be a no-op. In this case, it will simply pass the
// unstarted_call_handler to the wrapped CallDestination.
//
// 2. It can hijack the call by doing the following:
// - Start unstarted_call_handler and take ownership of the
// resulting handler.
// - Create a new CallInitiator/UnstartedCallHandler pair, and pass
// that new UnstartedCallHandler down to the wrapped CallDestination.
// - The implementation is then responsible for forwarding between
// the started handler and the new initiator. Note that in
// simple cases, this can be done via ForwardCall().
class DelegatingCallDestination : public CallDestination {
protected:
explicit DelegatingCallDestination(
RefCountedPtr<CallDestination> wrapped_destination)
: wrapped_destination_(std::move(wrapped_destination)) {}
CallDestination* wrapped_destination() const {
return wrapped_destination_.get();
}
private:
RefCountedPtr<CallDestination> wrapped_destination_;
};
class UnstartedCallHandler {
public:
UnstartedCallHandler(RefCountedPtr<CallSpineInterface> spine,
ClientMetadataHandle client_initial_metadata)
: spine_(std::move(spine)) {
spine_->SpawnGuarded(
"send_initial_metadata",
[client_initial_metadata = std::move(client_initial_metadata),
spine = spine_]() mutable {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine->party());
return Map(spine->client_initial_metadata().sender.Push(
std::move(client_initial_metadata)),
[](bool ok) { return StatusFlag(ok); });
});
}
// Returns the client initial metadata, which has not yet been
// processed by the stack that will ultimately be used for this call.
ClientMetadataHandle& UnprocessedClientInitialMetadata();
// Starts the call using the specified stack.
// This must be called only once, and the UnstartedCallHandler object
// may not be used after this is called.
CallHandler StartCall(RefCountedPtr<CallFilters::Stack> stack);
template <typename ContextType>
void SetContext(ContextType context) {
// FIXME: implement
}
template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
}
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnGuarded(name, std::move(promise_factory));
}
template <typename PromiseFactory>
void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnInfallible(name, std::move(promise_factory));
}
template <typename PromiseFactory>
auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) {
return spine_->party().SpawnWaitable(name, std::move(promise_factory));
}
private:
RefCountedPtr<CallSpineInterface> spine_;
};
struct CallInitiatorAndHandler {
CallInitiator initiator;
UnstartedCallHandler handler;

@ -528,6 +528,14 @@ struct WaitForReady {
static std::string DisplayValue(ValueType x);
};
// Annotation added by retry code to indicate a transparent retry.
struct IsTransparentRetry {
static absl::string_view DebugKey() { return "IsTransparentRetry"; }
static constexpr bool kRepeatable = false;
using ValueType = bool;
static std::string DisplayValue(ValueType x) { return x ? "true" : "false"; }
};
// Annotation added by a transport to note that server trailing metadata
// is a Trailers-Only response.
struct GrpcTrailersOnly {
@ -1536,7 +1544,8 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
grpc_core::GrpcStreamNetworkState, grpc_core::PeerString,
grpc_core::GrpcStatusContext, grpc_core::GrpcStatusFromWire,
grpc_core::GrpcCallWasCancelled, grpc_core::WaitForReady,
grpc_core::GrpcTrailersOnly, grpc_core::GrpcTarPit,
grpc_core::IsTransparentRetry, grpc_core::GrpcTrailersOnly,
grpc_core::GrpcTarPit,
grpc_core::GrpcRegisteredMethod GRPC_CUSTOM_CLIENT_METADATA
GRPC_CUSTOM_SERVER_METADATA>;

@ -19,6 +19,7 @@ CORE_SOURCE_FILES = [
'src/core/channelz/channelz.cc',
'src/core/channelz/channelz_registry.cc',
'src/core/client_channel/backup_poller.cc',
'src/core/client_channel/client_channel.cc',
'src/core/client_channel/client_channel_factory.cc',
'src/core/client_channel/client_channel_filter.cc',
'src/core/client_channel/client_channel_plugin.cc',

@ -22,7 +22,7 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/port_platform.h>
#include "src/core/client_channel/client_channel_filter.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/resolver/endpoint_addresses.h"
@ -33,20 +33,20 @@ namespace testing {
namespace {
TEST(MakeSubchannelArgs, UsesChannelDefaultAuthorityByDefault) {
ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs args = Subchannel::MakeSubchannelArgs(
ChannelArgs(), ChannelArgs(), nullptr, "foo.example.com");
EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "foo.example.com");
}
TEST(MakeSubchannelArgs, DefaultAuthorityFromChannelArgs) {
ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs args = Subchannel::MakeSubchannelArgs(
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"),
ChannelArgs(), nullptr, "foo.example.com");
EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "bar.example.com");
}
TEST(MakeSubchannelArgs, DefaultAuthorityFromResolver) {
ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs args = Subchannel::MakeSubchannelArgs(
ChannelArgs(),
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), nullptr,
"foo.example.com");
@ -55,7 +55,7 @@ TEST(MakeSubchannelArgs, DefaultAuthorityFromResolver) {
TEST(MakeSubchannelArgs,
DefaultAuthorityFromChannelArgsOverridesValueFromResolver) {
ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs args = Subchannel::MakeSubchannelArgs(
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"),
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "baz.example.com"), nullptr,
"foo.example.com");
@ -63,14 +63,14 @@ TEST(MakeSubchannelArgs,
}
TEST(MakeSubchannelArgs, ArgsFromChannelTrumpPerAddressArgs) {
ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs().Set("foo", 1), ChannelArgs().Set("foo", 2), nullptr,
"foo.example.com");
ChannelArgs args = Subchannel::MakeSubchannelArgs(ChannelArgs().Set("foo", 1),
ChannelArgs().Set("foo", 2),
nullptr, "foo.example.com");
EXPECT_EQ(args.GetInt("foo"), 1);
}
TEST(MakeSubchannelArgs, StripsOutNoSubchannelArgs) {
ChannelArgs args = ClientChannelFilter::MakeSubchannelArgs(
ChannelArgs args = Subchannel::MakeSubchannelArgs(
ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "foo", 1),
ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "bar", 1), nullptr,
"foo.example.com");

@ -1091,6 +1091,8 @@ src/core/channelz/channelz_registry.cc \
src/core/channelz/channelz_registry.h \
src/core/client_channel/backup_poller.cc \
src/core/client_channel/backup_poller.h \
src/core/client_channel/client_channel.cc \
src/core/client_channel/client_channel.h \
src/core/client_channel/client_channel_factory.cc \
src/core/client_channel/client_channel_factory.h \
src/core/client_channel/client_channel_filter.cc \
@ -2545,6 +2547,7 @@ src/core/lib/promise/interceptor_list.h \
src/core/lib/promise/latch.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \
src/core/lib/promise/observable.h \
src/core/lib/promise/party.cc \
src/core/lib/promise/party.h \
src/core/lib/promise/pipe.h \

@ -895,6 +895,8 @@ src/core/channelz/channelz_registry.h \
src/core/client_channel/README.md \
src/core/client_channel/backup_poller.cc \
src/core/client_channel/backup_poller.h \
src/core/client_channel/client_channel.cc \
src/core/client_channel/client_channel.h \
src/core/client_channel/client_channel_factory.cc \
src/core/client_channel/client_channel_factory.h \
src/core/client_channel/client_channel_filter.cc \
@ -2321,6 +2323,7 @@ src/core/lib/promise/interceptor_list.h \
src/core/lib/promise/latch.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \
src/core/lib/promise/observable.h \
src/core/lib/promise/party.cc \
src/core/lib/promise/party.h \
src/core/lib/promise/pipe.h \

Loading…
Cancel
Save