[LB policy API] change metadata mutation API to handle discarded picks (#36968)

Previously, metadata mutations were made by the picker directly, which meant that they would be applied even if the channel winds up discarding the pick due to the returned subchannel having been disconnected by the time the pick result is returned.  This changes the API such that pickers return metadata mutations along with the pick result, so that the mutations won't get applied unless the pick result is actually used.

Closes #36968

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36968 from markdroth:lb_metadata_api 2765da6121
PiperOrigin-RevId: 645451869
pull/37017/head
Mark D. Roth 5 months ago committed by Copybara-Service
parent 698a66b9c3
commit 34a0318dbf
  1. 1
      BUILD
  2. 2
      CMakeLists.txt
  3. 1
      Makefile
  4. 4
      Package.swift
  5. 7
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 6
      gRPC-C++.podspec
  9. 7
      gRPC-Core.podspec
  10. 4
      grpc.gemspec
  11. 4
      package.xml
  12. 20
      src/core/BUILD
  13. 81
      src/core/client_channel/client_channel_filter.cc
  14. 99
      src/core/client_channel/lb_metadata.cc
  15. 50
      src/core/client_channel/lb_metadata.h
  16. 72
      src/core/client_channel/load_balanced_call_destination.cc
  17. 41
      src/core/load_balancing/grpclb/grpclb.cc
  18. 56
      src/core/load_balancing/lb_policy.h
  19. 53
      src/core/load_balancing/rls/rls.cc
  20. 8
      src/core/util/upb_utils.h
  21. 2
      src/core/xds/grpc/xds_client_grpc.cc
  22. 2
      src/core/xds/grpc/xds_cluster.cc
  23. 2
      src/core/xds/grpc/xds_common_types.cc
  24. 2
      src/core/xds/grpc/xds_endpoint.cc
  25. 2
      src/core/xds/grpc/xds_http_rbac_filter.cc
  26. 2
      src/core/xds/grpc/xds_http_stateful_session_filter.cc
  27. 2
      src/core/xds/grpc/xds_listener.cc
  28. 2
      src/core/xds/grpc/xds_route_config.cc
  29. 2
      src/core/xds/xds_client/xds_api.cc
  30. 2
      src/core/xds/xds_client/xds_client.cc
  31. 1
      src/python/grpcio/grpc_core_dependencies.py
  32. 13
      test/core/load_balancing/lb_policy_test_lib.h
  33. 2
      test/core/test_util/BUILD
  34. 8
      test/core/test_util/test_lb_policies.cc
  35. 22
      test/core/transport/metadata_map_test.cc
  36. 2
      test/core/xds/xds_common_types_test.cc
  37. 15
      test/cpp/interop/rpc_behavior_lb_policy.cc
  38. 4
      tools/doxygen/Doxyfile.c++.internal
  39. 4
      tools/doxygen/Doxyfile.core.internal
  40. 5
      tools/run_tests/sanity/core_banned_functions.py

@ -3818,6 +3818,7 @@ grpc_cc_library(
"//src/core:iomgr_fwd",
"//src/core:json",
"//src/core:latch",
"//src/core:lb_metadata",
"//src/core:lb_policy",
"//src/core:lb_policy_registry",
"//src/core:loop",

2
CMakeLists.txt generated

@ -1876,6 +1876,7 @@ add_library(grpc
src/core/client_channel/direct_channel.cc
src/core/client_channel/dynamic_filters.cc
src/core/client_channel/global_subchannel_pool.cc
src/core/client_channel/lb_metadata.cc
src/core/client_channel/load_balanced_call_destination.cc
src/core/client_channel/local_subchannel_pool.cc
src/core/client_channel/retry_filter.cc
@ -2969,6 +2970,7 @@ add_library(grpc_unsecure
src/core/client_channel/direct_channel.cc
src/core/client_channel/dynamic_filters.cc
src/core/client_channel/global_subchannel_pool.cc
src/core/client_channel/lb_metadata.cc
src/core/client_channel/load_balanced_call_destination.cc
src/core/client_channel/local_subchannel_pool.cc
src/core/client_channel/retry_filter.cc

1
Makefile generated

@ -678,6 +678,7 @@ LIBGRPC_SRC = \
src/core/client_channel/direct_channel.cc \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/lb_metadata.cc \
src/core/client_channel/load_balanced_call_destination.cc \
src/core/client_channel/local_subchannel_pool.cc \
src/core/client_channel/retry_filter.cc \

4
Package.swift generated

@ -144,6 +144,8 @@ let package = Package(
"src/core/client_channel/dynamic_filters.h",
"src/core/client_channel/global_subchannel_pool.cc",
"src/core/client_channel/global_subchannel_pool.h",
"src/core/client_channel/lb_metadata.cc",
"src/core/client_channel/lb_metadata.h",
"src/core/client_channel/load_balanced_call_destination.cc",
"src/core/client_channel/load_balanced_call_destination.h",
"src/core/client_channel/local_subchannel_pool.cc",
@ -1946,6 +1948,7 @@ let package = Package(
"src/core/util/time_precise.cc",
"src/core/util/time_precise.h",
"src/core/util/tmpfile.h",
"src/core/util/upb_utils.h",
"src/core/util/useful.h",
"src/core/util/windows/cpu.cc",
"src/core/util/windows/log.cc",
@ -1958,7 +1961,6 @@ let package = Package(
"src/core/xds/grpc/certificate_provider_store.h",
"src/core/xds/grpc/file_watcher_certificate_provider_factory.cc",
"src/core/xds/grpc/file_watcher_certificate_provider_factory.h",
"src/core/xds/grpc/upb_utils.h",
"src/core/xds/grpc/xds_audit_logger_registry.cc",
"src/core/xds/grpc/xds_audit_logger_registry.h",
"src/core/xds/grpc/xds_bootstrap_grpc.cc",

@ -237,6 +237,7 @@ libs:
- src/core/client_channel/direct_channel.h
- src/core/client_channel/dynamic_filters.h
- src/core/client_channel/global_subchannel_pool.h
- src/core/client_channel/lb_metadata.h
- src/core/client_channel/load_balanced_call_destination.h
- src/core/client_channel/local_subchannel_pool.h
- src/core/client_channel/retry_filter.h
@ -1216,9 +1217,9 @@ libs:
- src/core/util/json/json_util.h
- src/core/util/json/json_writer.h
- src/core/util/spinlock.h
- src/core/util/upb_utils.h
- src/core/xds/grpc/certificate_provider_store.h
- src/core/xds/grpc/file_watcher_certificate_provider_factory.h
- src/core/xds/grpc/upb_utils.h
- src/core/xds/grpc/xds_audit_logger_registry.h
- src/core/xds/grpc/xds_bootstrap_grpc.h
- src/core/xds/grpc/xds_certificate_provider.h
@ -1261,6 +1262,7 @@ libs:
- src/core/client_channel/direct_channel.cc
- src/core/client_channel/dynamic_filters.cc
- src/core/client_channel/global_subchannel_pool.cc
- src/core/client_channel/lb_metadata.cc
- src/core/client_channel/load_balanced_call_destination.cc
- src/core/client_channel/local_subchannel_pool.cc
- src/core/client_channel/retry_filter.cc
@ -2223,6 +2225,7 @@ libs:
- src/core/client_channel/direct_channel.h
- src/core/client_channel/dynamic_filters.h
- src/core/client_channel/global_subchannel_pool.h
- src/core/client_channel/lb_metadata.h
- src/core/client_channel/load_balanced_call_destination.h
- src/core/client_channel/local_subchannel_pool.h
- src/core/client_channel/retry_filter.h
@ -2685,6 +2688,7 @@ libs:
- src/core/util/json/json_reader.h
- src/core/util/json/json_writer.h
- src/core/util/spinlock.h
- src/core/util/upb_utils.h
- third_party/upb/upb/generated_code_support.h
- third_party/upb/upb/mini_descriptor/build_enum.h
- third_party/upb/upb/mini_descriptor/decode.h
@ -2717,6 +2721,7 @@ libs:
- src/core/client_channel/direct_channel.cc
- src/core/client_channel/dynamic_filters.cc
- src/core/client_channel/global_subchannel_pool.cc
- src/core/client_channel/lb_metadata.cc
- src/core/client_channel/load_balanced_call_destination.cc
- src/core/client_channel/local_subchannel_pool.cc
- src/core/client_channel/retry_filter.cc

1
config.m4 generated

@ -53,6 +53,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/client_channel/direct_channel.cc \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/lb_metadata.cc \
src/core/client_channel/load_balanced_call_destination.cc \
src/core/client_channel/local_subchannel_pool.cc \
src/core/client_channel/retry_filter.cc \

1
config.w32 generated

@ -18,6 +18,7 @@ if (PHP_GRPC != "no") {
"src\\core\\client_channel\\direct_channel.cc " +
"src\\core\\client_channel\\dynamic_filters.cc " +
"src\\core\\client_channel\\global_subchannel_pool.cc " +
"src\\core\\client_channel\\lb_metadata.cc " +
"src\\core\\client_channel\\load_balanced_call_destination.cc " +
"src\\core\\client_channel\\local_subchannel_pool.cc " +
"src\\core\\client_channel\\retry_filter.cc " +

6
gRPC-C++.podspec generated

@ -283,6 +283,7 @@ Pod::Spec.new do |s|
'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/lb_metadata.h',
'src/core/client_channel/load_balanced_call_destination.h',
'src/core/client_channel/local_subchannel_pool.h',
'src/core/client_channel/retry_filter.h',
@ -1323,10 +1324,10 @@ Pod::Spec.new do |s|
'src/core/util/string.h',
'src/core/util/time_precise.h',
'src/core/util/tmpfile.h',
'src/core/util/upb_utils.h',
'src/core/util/useful.h',
'src/core/xds/grpc/certificate_provider_store.h',
'src/core/xds/grpc/file_watcher_certificate_provider_factory.h',
'src/core/xds/grpc/upb_utils.h',
'src/core/xds/grpc/xds_audit_logger_registry.h',
'src/core/xds/grpc/xds_bootstrap_grpc.h',
'src/core/xds/grpc/xds_certificate_provider.h',
@ -1574,6 +1575,7 @@ Pod::Spec.new do |s|
'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/lb_metadata.h',
'src/core/client_channel/load_balanced_call_destination.h',
'src/core/client_channel/local_subchannel_pool.h',
'src/core/client_channel/retry_filter.h',
@ -2596,10 +2598,10 @@ Pod::Spec.new do |s|
'src/core/util/string.h',
'src/core/util/time_precise.h',
'src/core/util/tmpfile.h',
'src/core/util/upb_utils.h',
'src/core/util/useful.h',
'src/core/xds/grpc/certificate_provider_store.h',
'src/core/xds/grpc/file_watcher_certificate_provider_factory.h',
'src/core/xds/grpc/upb_utils.h',
'src/core/xds/grpc/xds_audit_logger_registry.h',
'src/core/xds/grpc/xds_bootstrap_grpc.h',
'src/core/xds/grpc/xds_certificate_provider.h',

7
gRPC-Core.podspec generated

@ -263,6 +263,8 @@ Pod::Spec.new do |s|
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.cc',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/lb_metadata.cc',
'src/core/client_channel/lb_metadata.h',
'src/core/client_channel/load_balanced_call_destination.cc',
'src/core/client_channel/load_balanced_call_destination.h',
'src/core/client_channel/local_subchannel_pool.cc',
@ -2061,6 +2063,7 @@ Pod::Spec.new do |s|
'src/core/util/time_precise.cc',
'src/core/util/time_precise.h',
'src/core/util/tmpfile.h',
'src/core/util/upb_utils.h',
'src/core/util/useful.h',
'src/core/util/windows/cpu.cc',
'src/core/util/windows/log.cc',
@ -2073,7 +2076,6 @@ Pod::Spec.new do |s|
'src/core/xds/grpc/certificate_provider_store.h',
'src/core/xds/grpc/file_watcher_certificate_provider_factory.cc',
'src/core/xds/grpc/file_watcher_certificate_provider_factory.h',
'src/core/xds/grpc/upb_utils.h',
'src/core/xds/grpc/xds_audit_logger_registry.cc',
'src/core/xds/grpc/xds_audit_logger_registry.h',
'src/core/xds/grpc/xds_bootstrap_grpc.cc',
@ -2365,6 +2367,7 @@ Pod::Spec.new do |s|
'src/core/client_channel/direct_channel.h',
'src/core/client_channel/dynamic_filters.h',
'src/core/client_channel/global_subchannel_pool.h',
'src/core/client_channel/lb_metadata.h',
'src/core/client_channel/load_balanced_call_destination.h',
'src/core/client_channel/local_subchannel_pool.h',
'src/core/client_channel/retry_filter.h',
@ -3367,10 +3370,10 @@ Pod::Spec.new do |s|
'src/core/util/string.h',
'src/core/util/time_precise.h',
'src/core/util/tmpfile.h',
'src/core/util/upb_utils.h',
'src/core/util/useful.h',
'src/core/xds/grpc/certificate_provider_store.h',
'src/core/xds/grpc/file_watcher_certificate_provider_factory.h',
'src/core/xds/grpc/upb_utils.h',
'src/core/xds/grpc/xds_audit_logger_registry.h',
'src/core/xds/grpc/xds_bootstrap_grpc.h',
'src/core/xds/grpc/xds_certificate_provider.h',

4
grpc.gemspec generated

@ -150,6 +150,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/client_channel/dynamic_filters.h )
s.files += %w( src/core/client_channel/global_subchannel_pool.cc )
s.files += %w( src/core/client_channel/global_subchannel_pool.h )
s.files += %w( src/core/client_channel/lb_metadata.cc )
s.files += %w( src/core/client_channel/lb_metadata.h )
s.files += %w( src/core/client_channel/load_balanced_call_destination.cc )
s.files += %w( src/core/client_channel/load_balanced_call_destination.h )
s.files += %w( src/core/client_channel/local_subchannel_pool.cc )
@ -1948,6 +1950,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/util/time_precise.cc )
s.files += %w( src/core/util/time_precise.h )
s.files += %w( src/core/util/tmpfile.h )
s.files += %w( src/core/util/upb_utils.h )
s.files += %w( src/core/util/useful.h )
s.files += %w( src/core/util/windows/cpu.cc )
s.files += %w( src/core/util/windows/log.cc )
@ -1960,7 +1963,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/xds/grpc/certificate_provider_store.h )
s.files += %w( src/core/xds/grpc/file_watcher_certificate_provider_factory.cc )
s.files += %w( src/core/xds/grpc/file_watcher_certificate_provider_factory.h )
s.files += %w( src/core/xds/grpc/upb_utils.h )
s.files += %w( src/core/xds/grpc/xds_audit_logger_registry.cc )
s.files += %w( src/core/xds/grpc/xds_audit_logger_registry.h )
s.files += %w( src/core/xds/grpc/xds_bootstrap_grpc.cc )

4
package.xml generated

@ -132,6 +132,8 @@
<file baseinstalldir="/" name="src/core/client_channel/dynamic_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/global_subchannel_pool.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/global_subchannel_pool.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/lb_metadata.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/lb_metadata.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/load_balanced_call_destination.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/load_balanced_call_destination.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/local_subchannel_pool.cc" role="src" />
@ -1930,6 +1932,7 @@
<file baseinstalldir="/" name="src/core/util/time_precise.cc" role="src" />
<file baseinstalldir="/" name="src/core/util/time_precise.h" role="src" />
<file baseinstalldir="/" name="src/core/util/tmpfile.h" role="src" />
<file baseinstalldir="/" name="src/core/util/upb_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/util/useful.h" role="src" />
<file baseinstalldir="/" name="src/core/util/windows/cpu.cc" role="src" />
<file baseinstalldir="/" name="src/core/util/windows/log.cc" role="src" />
@ -1942,7 +1945,6 @@
<file baseinstalldir="/" name="src/core/xds/grpc/certificate_provider_store.h" role="src" />
<file baseinstalldir="/" name="src/core/xds/grpc/file_watcher_certificate_provider_factory.cc" role="src" />
<file baseinstalldir="/" name="src/core/xds/grpc/file_watcher_certificate_provider_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/xds/grpc/upb_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/xds/grpc/xds_audit_logger_registry.cc" role="src" />
<file baseinstalldir="/" name="src/core/xds/grpc/xds_audit_logger_registry.h" role="src" />
<file baseinstalldir="/" name="src/core/xds/grpc/xds_bootstrap_grpc.cc" role="src" />

@ -3456,6 +3456,7 @@ grpc_cc_library(
hdrs = ["load_balancing/lb_policy.h"],
external_deps = [
"absl/base:core_headers",
"absl/container:inlined_vector",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -3525,6 +3526,21 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "lb_metadata",
srcs = ["client_channel/lb_metadata.cc"],
hdrs = ["client_channel/lb_metadata.h"],
external_deps = [
"absl/log:log",
"absl/strings",
"absl/types:optional",
],
deps = [
"lb_policy",
"metadata_batch",
],
)
grpc_cc_library(
name = "subchannel_interface",
hdrs = ["load_balancing/subchannel_interface.h"],
@ -5009,6 +5025,7 @@ grpc_cc_library(
"slice_refcount",
"status_helper",
"time",
"upb_utils",
"uuid_v4",
"validation_errors",
"//:backoff",
@ -5039,14 +5056,13 @@ grpc_cc_library(
grpc_cc_library(
name = "upb_utils",
hdrs = [
"xds/grpc/upb_utils.h",
"util/upb_utils.h",
],
external_deps = [
"absl/strings",
"@com_google_protobuf//upb:base",
],
language = "c++",
deps = ["//:gpr_platform"],
)
grpc_cc_library(

@ -57,6 +57,7 @@
#include "src/core/client_channel/config_selector.h"
#include "src/core/client_channel/dynamic_filters.h"
#include "src/core/client_channel/global_subchannel_pool.h"
#include "src/core/client_channel/lb_metadata.h"
#include "src/core/client_channel/local_subchannel_pool.h"
#include "src/core/client_channel/retry_filter.h"
#include "src/core/client_channel/subchannel.h"
@ -2353,79 +2354,6 @@ class ClientChannelFilter::LoadBalancedCall::LbCallState final
LoadBalancedCall* lb_call_;
};
//
// ClientChannelFilter::LoadBalancedCall::Metadata
//
class ClientChannelFilter::LoadBalancedCall::Metadata final
: public LoadBalancingPolicy::MetadataInterface {
public:
explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {}
void Add(absl::string_view key, absl::string_view value) override {
if (batch_ == nullptr) return;
// Gross, egregious hack to support legacy grpclb behavior.
// TODO(ctiller): Use a promise context for this once that plumbing is done.
if (key == GrpcLbClientStatsMetadata::key()) {
batch_->Set(
GrpcLbClientStatsMetadata(),
const_cast<GrpcLbClientStats*>(
reinterpret_cast<const GrpcLbClientStats*>(value.data())));
return;
}
batch_->Append(key, Slice::FromStaticString(value),
[key](absl::string_view error, const Slice& value) {
LOG(ERROR) << error << " key:" << key
<< " value:" << value.as_string_view();
});
}
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
override {
if (batch_ == nullptr) return {};
Encoder encoder;
batch_->Encode(&encoder);
return encoder.Take();
}
absl::optional<absl::string_view> Lookup(absl::string_view key,
std::string* buffer) const override {
if (batch_ == nullptr) return absl::nullopt;
return batch_->GetStringValue(key, buffer);
}
private:
class Encoder final {
public:
void Encode(const Slice& key, const Slice& value) {
out_.emplace_back(std::string(key.as_string_view()),
std::string(value.as_string_view()));
}
template <class Which>
void Encode(Which, const typename Which::ValueType& value) {
auto value_slice = Which::Encode(value);
out_.emplace_back(std::string(Which::key()),
std::string(value_slice.as_string_view()));
}
void Encode(GrpcTimeoutMetadata,
const typename GrpcTimeoutMetadata::ValueType&) {}
void Encode(HttpPathMetadata, const Slice&) {}
void Encode(HttpMethodMetadata,
const typename HttpMethodMetadata::ValueType&) {}
std::vector<std::pair<std::string, std::string>> Take() {
return std::move(out_);
}
private:
std::vector<std::pair<std::string, std::string>> out_;
};
grpc_metadata_batch* batch_;
};
//
// ClientChannelFilter::LoadBalancedCall::LbCallState
//
@ -2536,7 +2464,7 @@ void ClientChannelFilter::LoadBalancedCall::RecordCallCompletion(
// If the LB policy requested a callback for trailing metadata, invoke
// the callback.
if (lb_subchannel_call_tracker_ != nullptr) {
Metadata trailing_metadata(recv_trailing_metadata);
LbMetadata trailing_metadata(recv_trailing_metadata);
BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata);
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
peer_address, status, &trailing_metadata, &backend_metric_accessor};
@ -2683,7 +2611,7 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
pick_args.path = path->as_string_view();
LbCallState lb_call_state(this);
pick_args.call_state = &lb_call_state;
Metadata initial_metadata(send_initial_metadata());
LbMetadata initial_metadata(send_initial_metadata());
pick_args.initial_metadata = &initial_metadata;
auto result = picker->Pick(pick_args);
return HandlePickResult<bool>(
@ -2716,6 +2644,9 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
if (lb_subchannel_call_tracker_ != nullptr) {
lb_subchannel_call_tracker_->Start();
}
// Handle metadata mutations.
MetadataMutationHandler::Apply(complete_pick->metadata_mutations,
send_initial_metadata());
return true;
},
// QueuePick

@ -0,0 +1,99 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/client_channel/lb_metadata.h"
#include "absl/log/log.h"
namespace grpc_core {
//
// LbMetadata
//
namespace {
class Encoder {
public:
void Encode(const Slice& key, const Slice& value) {
out_.emplace_back(std::string(key.as_string_view()),
std::string(value.as_string_view()));
}
template <class Which>
void Encode(Which, const typename Which::ValueType& value) {
auto value_slice = Which::Encode(value);
out_.emplace_back(std::string(Which::key()),
std::string(value_slice.as_string_view()));
}
void Encode(GrpcTimeoutMetadata,
const typename GrpcTimeoutMetadata::ValueType&) {}
void Encode(HttpPathMetadata, const Slice&) {}
void Encode(HttpMethodMetadata,
const typename HttpMethodMetadata::ValueType&) {}
std::vector<std::pair<std::string, std::string>> Take() {
return std::move(out_);
}
private:
std::vector<std::pair<std::string, std::string>> out_;
};
} // namespace
absl::optional<absl::string_view> LbMetadata::Lookup(
absl::string_view key, std::string* buffer) const {
if (batch_ == nullptr) return absl::nullopt;
return batch_->GetStringValue(key, buffer);
}
std::vector<std::pair<std::string, std::string>>
LbMetadata::TestOnlyCopyToVector() const {
if (batch_ == nullptr) return {};
Encoder encoder;
batch_->Encode(&encoder);
return encoder.Take();
}
//
// MetadataMutationHandler
//
void MetadataMutationHandler::Apply(
LoadBalancingPolicy::MetadataMutations& metadata_mutations,
grpc_metadata_batch* metadata) {
for (auto& p : metadata_mutations.additions_) {
absl::string_view key = p.first;
Slice& value =
grpc_event_engine::experimental::internal::SliceCast<Slice>(p.second);
// Gross, egregious hack to support legacy grpclb behavior.
// TODO(ctiller): Use a promise context for this once that plumbing is done.
if (key == GrpcLbClientStatsMetadata::key()) {
metadata->Set(
GrpcLbClientStatsMetadata(),
const_cast<GrpcLbClientStats*>(
reinterpret_cast<const GrpcLbClientStats*>(value.data())));
continue;
}
metadata->Append(key, std::move(value),
[key](absl::string_view error, const Slice& value) {
LOG(ERROR) << error << " key:" << key
<< " value:" << value.as_string_view();
});
}
}
} // namespace grpc_core

@ -0,0 +1,50 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_LB_METADATA_H
#define GRPC_SRC_CORE_CLIENT_CHANNEL_LB_METADATA_H
#include <string>
#include <vector>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/load_balancing/lb_policy.h"
namespace grpc_core {
class LbMetadata : public LoadBalancingPolicy::MetadataInterface {
public:
explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {}
absl::optional<absl::string_view> Lookup(absl::string_view key,
std::string* buffer) const override;
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector() const;
private:
grpc_metadata_batch* batch_;
};
class MetadataMutationHandler {
public:
static void Apply(LoadBalancingPolicy::MetadataMutations& metadata_mutations,
grpc_metadata_batch* metadata);
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_LB_METADATA_H

@ -18,6 +18,7 @@
#include "src/core/client_channel/client_channel.h"
#include "src/core/client_channel/client_channel_internal.h"
#include "src/core/client_channel/lb_metadata.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/config/core_configuration.h"
@ -28,74 +29,6 @@ namespace grpc_core {
namespace {
class LbMetadata : public LoadBalancingPolicy::MetadataInterface {
public:
explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {}
void Add(absl::string_view key, absl::string_view value) override {
if (batch_ == nullptr) return;
// Gross, egregious hack to support legacy grpclb behavior.
// TODO(ctiller): Use a promise context for this once that plumbing is done.
if (key == GrpcLbClientStatsMetadata::key()) {
batch_->Set(
GrpcLbClientStatsMetadata(),
const_cast<GrpcLbClientStats*>(
reinterpret_cast<const GrpcLbClientStats*>(value.data())));
return;
}
batch_->Append(key, Slice::FromStaticString(value),
[key](absl::string_view error, const Slice& value) {
LOG(ERROR) << error << " key:" << key
<< " value:" << value.as_string_view();
});
}
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
override {
if (batch_ == nullptr) return {};
Encoder encoder;
batch_->Encode(&encoder);
return encoder.Take();
}
absl::optional<absl::string_view> Lookup(absl::string_view key,
std::string* buffer) const override {
if (batch_ == nullptr) return absl::nullopt;
return batch_->GetStringValue(key, buffer);
}
private:
class Encoder {
public:
void Encode(const Slice& key, const Slice& value) {
out_.emplace_back(std::string(key.as_string_view()),
std::string(value.as_string_view()));
}
template <class Which>
void Encode(Which, const typename Which::ValueType& value) {
auto value_slice = Which::Encode(value);
out_.emplace_back(std::string(Which::key()),
std::string(value_slice.as_string_view()));
}
void Encode(GrpcTimeoutMetadata,
const typename GrpcTimeoutMetadata::ValueType&) {}
void Encode(HttpPathMetadata, const Slice&) {}
void Encode(HttpMethodMetadata,
const typename HttpMethodMetadata::ValueType&) {}
std::vector<std::pair<std::string, std::string>> Take() {
return std::move(out_);
}
private:
std::vector<std::pair<std::string, std::string>> out_;
};
grpc_metadata_batch* batch_;
};
void MaybeCreateCallAttemptTracer(bool is_transparent_retry) {
auto* call_tracer = MaybeGetContext<ClientCallTracer>();
if (call_tracer == nullptr) return;
@ -208,6 +141,9 @@ LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> PickSubchannel(
complete_pick->subchannel_call_tracker->Start();
SetContext(complete_pick->subchannel_call_tracker.release());
}
// Apply metadata mutations, if any.
MetadataMutationHandler::Apply(complete_pick->metadata_mutations,
&client_initial_metadata);
// Return the connected subchannel.
return call_destination;
},

@ -311,14 +311,17 @@ class GrpcLb final : public LoadBalancingPolicy {
class SubchannelWrapper final : public DelegatingSubchannel {
public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<GrpcLb> lb_policy, std::string lb_token,
RefCountedPtr<GrpcLb> lb_policy,
grpc_event_engine::experimental::Slice lb_token,
RefCountedPtr<GrpcLbClientStats> client_stats)
: DelegatingSubchannel(std::move(subchannel)),
lb_policy_(std::move(lb_policy)),
lb_token_(std::move(lb_token)),
client_stats_(std::move(client_stats)) {}
const std::string& lb_token() const { return lb_token_; }
const grpc_event_engine::experimental::Slice& lb_token() const {
return lb_token_;
}
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
private:
@ -340,14 +343,14 @@ class GrpcLb final : public LoadBalancingPolicy {
}
RefCountedPtr<GrpcLb> lb_policy_;
std::string lb_token_;
grpc_event_engine::experimental::Slice lb_token_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
class TokenAndClientStatsArg final
: public RefCounted<TokenAndClientStatsArg> {
public:
TokenAndClientStatsArg(std::string lb_token,
TokenAndClientStatsArg(grpc_event_engine::experimental::Slice lb_token,
RefCountedPtr<GrpcLbClientStats> client_stats)
: lb_token_(std::move(lb_token)),
client_stats_(std::move(client_stats)) {}
@ -358,18 +361,21 @@ class GrpcLb final : public LoadBalancingPolicy {
static int ChannelArgsCompare(const TokenAndClientStatsArg* a,
const TokenAndClientStatsArg* b) {
int r = a->lb_token_.compare(b->lb_token_);
int r =
a->lb_token_.as_string_view().compare(b->lb_token_.as_string_view());
if (r != 0) return r;
return QsortCompare(a->client_stats_.get(), b->client_stats_.get());
}
const std::string& lb_token() const { return lb_token_; }
const grpc_event_engine::experimental::Slice& lb_token() const {
return lb_token_;
}
RefCountedPtr<GrpcLbClientStats> client_stats() const {
return client_stats_;
}
private:
std::string lb_token_;
grpc_event_engine::experimental::Slice lb_token_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
@ -665,7 +671,8 @@ class GrpcLb::Serverlist::AddressIterator final
// LB token processing.
const size_t lb_token_length = strnlen(
server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
std::string lb_token(server.load_balance_token, lb_token_length);
auto lb_token = grpc_event_engine::experimental::Slice::FromCopiedBuffer(
server.load_balance_token, lb_token_length);
if (lb_token.empty()) {
auto addr_uri = grpc_sockaddr_to_uri(&addr);
LOG(INFO) << "Missing LB token for backend address '"
@ -773,9 +780,10 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
// a string and rely on the client_load_reporting filter to know
// how to interpret it.
// NOLINTBEGIN(bugprone-string-constructor)
args.initial_metadata->Add(
complete_pick->metadata_mutations.Add(
GrpcLbClientStatsMetadata::key(),
absl::string_view(reinterpret_cast<const char*>(client_stats), 0));
grpc_event_engine::experimental::Slice(grpc_slice_from_static_buffer(
reinterpret_cast<const char*>(client_stats), 0)));
// NOLINTEND(bugprone-string-constructor)
// Update calls-started.
client_stats->AddCallStarted();
@ -785,10 +793,8 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
// may get refreshed between when we return this pick and when the
// initial metadata goes out on the wire.
if (!subchannel_wrapper->lb_token().empty()) {
char* lb_token = static_cast<char*>(
args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1));
strcpy(lb_token, subchannel_wrapper->lb_token().c_str());
args.initial_metadata->Add(LbTokenMetadata::key(), lb_token);
complete_pick->metadata_mutations.Add(
LbTokenMetadata::key(), subchannel_wrapper->lb_token().Ref());
}
// Unwrap subchannel to pass up to the channel.
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
@ -811,13 +817,11 @@ RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
absl::StrFormat("[grpclb %p] no TokenAndClientStatsArg for address %s",
parent(), addr_str.value_or("N/A").c_str()));
}
std::string lb_token = arg->lb_token();
RefCountedPtr<GrpcLbClientStats> client_stats = arg->client_stats();
return MakeRefCounted<SubchannelWrapper>(
parent()->channel_control_helper()->CreateSubchannel(
address, per_address_args, args),
parent()->RefAsSubclass<GrpcLb>(DEBUG_LOCATION, "SubchannelWrapper"),
std::move(lb_token), std::move(client_stats));
arg->lb_token().Ref(), arg->client_stats());
}
void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
@ -1542,7 +1546,8 @@ class GrpcLb::NullLbTokenEndpointIterator final
private:
std::shared_ptr<EndpointAddressesIterator> parent_it_;
RefCountedPtr<TokenAndClientStatsArg> empty_token_ =
MakeRefCounted<TokenAndClientStatsArg>("", nullptr);
MakeRefCounted<TokenAndClientStatsArg>(
grpc_event_engine::experimental::Slice(), nullptr);
};
absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {

@ -23,9 +23,9 @@
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
@ -33,6 +33,7 @@
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/slice.h>
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/port_platform.h>
@ -116,29 +117,35 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
public:
virtual ~MetadataInterface() = default;
//////////////////////////////////////////////////////////////////////////
// TODO(ctiller): DO NOT MAKE THIS A PUBLIC API YET
// This needs some API design to ensure we can add/remove/replace metadata
// keys... we're deliberately not doing so to save some time whilst
// cleaning up the internal metadata representation, but we should add
// something back before making this a public API.
//////////////////////////////////////////////////////////////////////////
/// Adds a key/value pair.
/// Does NOT take ownership of \a key or \a value.
/// Implementations must ensure that the key and value remain alive
/// until the call ends. If desired, they may be allocated via
/// CallState::Alloc().
virtual void Add(absl::string_view key, absl::string_view value) = 0;
/// Produce a vector of metadata key/value strings for tests.
virtual std::vector<std::pair<std::string, std::string>>
TestOnlyCopyToVector() = 0;
virtual absl::optional<absl::string_view> Lookup(
absl::string_view key, std::string* buffer) const = 0;
};
/// A list of metadata mutations to be returned along with a PickResult.
class MetadataMutations {
public:
/// Adds a key/value pair. If the key is already present, the new
/// value will be appended with a comma delimiter.
void Add(absl::string_view key, absl::string_view value) {
Add(key, grpc_event_engine::experimental::Slice::FromCopiedString(value));
}
void Add(absl::string_view key,
grpc_event_engine::experimental::Slice value) {
additions_.push_back({key, std::move(value)});
}
private:
friend class MetadataMutationHandler;
// Avoid allocation if up to 3 additions per LB pick. Most expected
// use cases should be no more than 2, so this gives us a bit of slack.
// But it should be cheap to increase this value if we start seeing use
// cases with more than 3 additions.
absl::InlinedVector<
std::pair<absl::string_view, grpc_event_engine::experimental::Slice>, 3>
additions_;
};
/// Arguments used when picking a subchannel for a call.
struct PickArgs {
/// The path of the call. Indicates the RPC service and method name.
@ -210,11 +217,16 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// be used.
std::unique_ptr<SubchannelCallTrackerInterface> subchannel_call_tracker;
/// Metadata mutations to be applied to the call.
MetadataMutations metadata_mutations;
explicit Complete(
RefCountedPtr<SubchannelInterface> sc,
std::unique_ptr<SubchannelCallTrackerInterface> tracker = nullptr)
std::unique_ptr<SubchannelCallTrackerInterface> tracker = nullptr,
MetadataMutations md = MetadataMutations())
: subchannel(std::move(sc)),
subchannel_call_tracker(std::move(tracker)) {}
subchannel_call_tracker(std::move(tracker)),
metadata_mutations(std::move(md)) {}
};
/// Pick cannot be completed until something changes on the control

@ -108,6 +108,7 @@
#include "src/core/util/json/json_args.h"
#include "src/core/util/json/json_object_loader.h"
#include "src/core/util/json/json_writer.h"
#include "src/core/util/upb_utils.h"
#include "src/proto/grpc/lookup/v1/rls.upb.h"
using ::grpc_event_engine::experimental::EventEngine;
@ -315,12 +316,12 @@ class RlsLb final : public LoadBalancingPolicy {
struct ResponseInfo {
absl::Status status;
std::vector<std::string> targets;
std::string header_data;
grpc_event_engine::experimental::Slice header_data;
std::string ToString() const {
return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}",
status.ToString(), absl::StrJoin(targets, ","),
header_data);
header_data.as_string_view());
}
};
@ -464,7 +465,7 @@ class RlsLb final : public LoadBalancingPolicy {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
return data_expiration_time_;
}
const std::string& header_data() const
const grpc_event_engine::experimental::Slice& header_data() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
return header_data_;
}
@ -543,7 +544,8 @@ class RlsLb final : public LoadBalancingPolicy {
// RLS response states
std::vector<RefCountedPtr<ChildPolicyWrapper>> child_policy_wrappers_
ABSL_GUARDED_BY(&RlsLb::mu_);
std::string header_data_ ABSL_GUARDED_BY(&RlsLb::mu_);
grpc_event_engine::experimental::Slice header_data_
ABSL_GUARDED_BY(&RlsLb::mu_);
Timestamp data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) =
Timestamp::InfPast();
Timestamp stale_time_ ABSL_GUARDED_BY(&RlsLb::mu_) = Timestamp::InfPast();
@ -702,7 +704,7 @@ class RlsLb final : public LoadBalancingPolicy {
RefCountedPtr<RlsChannel> rls_channel,
std::unique_ptr<BackOff> backoff_state,
grpc_lookup_v1_RouteLookupRequest_Reason reason,
std::string stale_header_data);
grpc_event_engine::experimental::Slice stale_header_data);
~RlsRequest() override;
// Shuts down the request. If the request is still in flight, it is
@ -730,7 +732,7 @@ class RlsLb final : public LoadBalancingPolicy {
RefCountedPtr<RlsChannel> rls_channel_;
std::unique_ptr<BackOff> backoff_state_;
grpc_lookup_v1_RouteLookupRequest_Reason reason_;
std::string stale_header_data_;
grpc_event_engine::experimental::Slice stale_header_data_;
// RLS call state.
Timestamp deadline_;
@ -1259,19 +1261,17 @@ LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
child_policy_wrappers_.size(),
ConnectivityStateName(child_policy_wrapper->connectivity_state()));
}
// Add header data.
// Note that even if the target we're using is in TRANSIENT_FAILURE,
// the pick might still succeed (e.g., if the child is ring_hash), so
// we need to pass the right header info down in all cases.
if (!header_data_.empty()) {
char* copied_header_data =
static_cast<char*>(args.call_state->Alloc(header_data_.length() + 1));
strcpy(copied_header_data, header_data_.c_str());
args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
}
auto pick_result = child_policy_wrapper->Pick(args);
lb_policy_->MaybeExportPickCount(kMetricTargetPicks,
child_policy_wrapper->target(), pick_result);
// Add header data.
if (!header_data_.empty()) {
auto* complete_pick =
absl::get_if<PickResult::Complete>(&pick_result.result);
if (complete_pick != nullptr) {
complete_pick->metadata_mutations.Add(kRlsHeaderKey, header_data_.Ref());
}
}
return pick_result;
}
@ -1682,11 +1682,11 @@ void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key,
std::unique_ptr<BackOff> backoff_state;
grpc_lookup_v1_RouteLookupRequest_Reason reason =
grpc_lookup_v1_RouteLookupRequest_REASON_MISS;
std::string stale_header_data;
grpc_event_engine::experimental::Slice stale_header_data;
if (stale_entry != nullptr) {
backoff_state = stale_entry->TakeBackoffState();
reason = grpc_lookup_v1_RouteLookupRequest_REASON_STALE;
stale_header_data = stale_entry->header_data();
stale_header_data = stale_entry->header_data().Ref();
}
lb_policy_->request_map_.emplace(
key, MakeOrphanable<RlsRequest>(
@ -1708,11 +1708,12 @@ void RlsLb::RlsChannel::ResetBackoff() {
// RlsLb::RlsRequest
//
RlsLb::RlsRequest::RlsRequest(RefCountedPtr<RlsLb> lb_policy, RequestKey key,
RefCountedPtr<RlsChannel> rls_channel,
std::unique_ptr<BackOff> backoff_state,
grpc_lookup_v1_RouteLookupRequest_Reason reason,
std::string stale_header_data)
RlsLb::RlsRequest::RlsRequest(
RefCountedPtr<RlsLb> lb_policy, RequestKey key,
RefCountedPtr<RlsChannel> rls_channel,
std::unique_ptr<BackOff> backoff_state,
grpc_lookup_v1_RouteLookupRequest_Reason reason,
grpc_event_engine::experimental::Slice stale_header_data)
: InternallyRefCounted<RlsRequest>(
GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "RlsRequest" : nullptr),
lb_policy_(std::move(lb_policy)),
@ -1890,8 +1891,7 @@ grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() {
grpc_lookup_v1_RouteLookupRequest_set_reason(req, reason_);
if (!stale_header_data_.empty()) {
grpc_lookup_v1_RouteLookupRequest_set_stale_header_data(
req, upb_StringView_FromDataAndSize(stale_header_data_.data(),
stale_header_data_.size()));
req, StdStringToUpbString(stale_header_data_.as_string_view()));
}
size_t len;
char* buf =
@ -1934,7 +1934,8 @@ RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() {
upb_StringView header_data_strview =
grpc_lookup_v1_RouteLookupResponse_header_data(response);
response_info.header_data =
std::string(header_data_strview.data, header_data_strview.size);
grpc_event_engine::experimental::Slice::FromCopiedBuffer(
header_data_strview.data, header_data_strview.size);
return response_info;
}

@ -14,16 +14,14 @@
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_XDS_GRPC_UPB_UTILS_H
#define GRPC_SRC_CORE_XDS_GRPC_UPB_UTILS_H
#ifndef GRPC_SRC_CORE_UTIL_UPB_UTILS_H
#define GRPC_SRC_CORE_UTIL_UPB_UTILS_H
#include <string>
#include "absl/strings/string_view.h"
#include "upb/base/string_view.h"
#include <grpc/support/port_platform.h>
namespace grpc_core {
// Works for both std::string and absl::string_view.
@ -42,4 +40,4 @@ inline std::string UpbStringToStdString(const upb_StringView& str) {
} // namespace grpc_core
#endif // GRPC_SRC_CORE_XDS_GRPC_UPB_UTILS_H
#endif // GRPC_SRC_CORE_UTIL_UPB_UTILS_H

@ -55,7 +55,7 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/telemetry/metrics.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/grpc/xds_bootstrap_grpc.h"
#include "src/core/xds/grpc/xds_transport_grpc.h"
#include "src/core/xds/xds_client/xds_api.h"

@ -66,7 +66,7 @@
#include "src/core/lib/matchers/matchers.h"
#include "src/core/load_balancing/lb_policy_registry.h"
#include "src/core/util/json/json_writer.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/grpc/xds_common_types.h"
#include "src/core/xds/grpc/xds_lb_policy_registry.h"
#include "src/core/xds/xds_client/xds_client.h"

@ -45,7 +45,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/util/json/json_reader.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/grpc/xds_bootstrap_grpc.h"
#include "src/core/xds/xds_client/xds_client.h"

@ -51,7 +51,7 @@
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/util/string.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/grpc/xds_health_status.h"
#include "src/core/xds/xds_client/xds_resource_type.h"

@ -50,7 +50,7 @@
#include "src/core/util/json/json.h"
#include "src/core/util/json/json_writer.h"
#include "src/core/util/string.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/grpc/xds_audit_logger_registry.h"
#include "src/core/xds/grpc/xds_bootstrap_grpc.h"
#include "src/core/xds/xds_client/xds_client.h"

@ -39,7 +39,7 @@
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/util/json/json.h"
#include "src/core/util/json/json_writer.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/grpc/xds_common_types.h"
#include "src/core/xds/grpc/xds_http_filters.h"

@ -55,7 +55,7 @@
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/matchers/matchers.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/grpc/xds_common_types.h"
#include "src/core/xds/xds_client/xds_resource_type.h"

@ -70,7 +70,7 @@
#include "src/core/util/json/json.h"
#include "src/core/util/json/json_writer.h"
#include "src/core/util/string.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/grpc/xds_cluster_specifier_plugin.h"
#include "src/core/xds/grpc/xds_common_types.h"
#include "src/core/xds/grpc/xds_http_filters.h"

@ -48,7 +48,7 @@
#include <grpc/support/time.h>
#include "src/core/util/json/json.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/xds_client/xds_client.h"
// IWYU pragma: no_include "upb/msg_internal.h"

@ -53,7 +53,7 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/uri/uri_parser.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/xds_client/xds_api.h"
#include "src/core/xds/xds_client/xds_bootstrap.h"
#include "src/core/xds/xds_client/xds_client_stats.h"

@ -27,6 +27,7 @@ CORE_SOURCE_FILES = [
'src/core/client_channel/direct_channel.cc',
'src/core/client_channel/dynamic_filters.cc',
'src/core/client_channel/global_subchannel_pool.cc',
'src/core/client_channel/lb_metadata.cc',
'src/core/client_channel/load_balanced_call_destination.cc',
'src/core/client_channel/local_subchannel_pool.cc',
'src/core/client_channel/retry_filter.cc',

@ -620,20 +620,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
explicit FakeMetadata(std::map<std::string, std::string> metadata)
: metadata_(std::move(metadata)) {}
const std::map<std::string, std::string>& metadata() const {
return metadata_;
}
private:
void Add(absl::string_view key, absl::string_view value) override {
metadata_[std::string(key)] = std::string(value);
}
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
override {
return {}; // Not used.
}
absl::optional<absl::string_view> Lookup(
absl::string_view key, std::string* /*buffer*/) const override {
auto it = metadata_.find(std::string(key));

@ -333,10 +333,12 @@ grpc_cc_library(
"//:uri_parser",
"//src/core:channel_args",
"//src/core:delegating_helper",
"//src/core:down_cast",
"//src/core:error",
"//src/core:grpc_backend_metric_data",
"//src/core:json",
"//src/core:json_util",
"//src/core:lb_metadata",
"//src/core:lb_policy",
"//src/core:lb_policy_factory",
"//src/core:lb_policy_registry",

@ -30,9 +30,11 @@
#include <grpc/grpc.h>
#include <grpc/support/json.h>
#include "src/core/client_channel/lb_metadata.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/down_cast.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
@ -129,7 +131,8 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy {
// Report args seen.
PickArgsSeen args_seen;
args_seen.path = std::string(args.path);
args_seen.metadata = args.initial_metadata->TestOnlyCopyToVector();
args_seen.metadata =
DownCast<LbMetadata*>(args.initial_metadata)->TestOnlyCopyToVector();
cb_(args_seen);
// Do pick.
return delegate_picker_->Pick(args);
@ -269,7 +272,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
args_seen.status = args.status;
args_seen.backend_metric_data =
args.backend_metric_accessor->GetBackendMetricData();
args_seen.metadata = args.trailing_metadata->TestOnlyCopyToVector();
args_seen.metadata =
DownCast<LbMetadata*>(args.trailing_metadata)->TestOnlyCopyToVector();
cb_(args_seen);
}

@ -81,7 +81,7 @@ TEST(MetadataMapTest, SimpleOps) {
// EXPECT_EQ it later.
class FakeEncoder {
public:
std::string output() { return output_; }
const std::string& output() { return output_; }
void Encode(const Slice& key, const Slice& value) {
output_ += absl::StrCat("UNKNOWN METADATUM: key=", key.as_string_view(),
@ -128,6 +128,26 @@ TEST(MetadataMapTest, NonEncodableTrait) {
EXPECT_EQ(map.DebugString(), "GrpcStreamNetworkState: not sent on wire");
}
TEST(MetadataMapTest, NonTraitKeyWithMultipleValues) {
FakeEncoder encoder;
TimeoutOnlyMetadataMap map;
const absl::string_view kKey = "key";
map.Append(kKey, Slice::FromStaticString("value1"),
[](absl::string_view error, const Slice& value) {
LOG(ERROR) << error << " value:" << value.as_string_view();
});
map.Append(kKey, Slice::FromStaticString("value2"),
[](absl::string_view error, const Slice& value) {
LOG(ERROR) << error << " value:" << value.as_string_view();
});
map.Encode(&encoder);
EXPECT_EQ(encoder.output(),
"UNKNOWN METADATUM: key=key value=value1\n"
"UNKNOWN METADATUM: key=key value=value2\n");
std::string buffer;
EXPECT_EQ(map.GetStringValue(kKey, &buffer), "value1,value2");
}
TEST(DebugStringBuilderTest, OneAddAfterRedaction) {
metadata_detail::DebugStringBuilder b;
b.AddAfterRedaction(ContentTypeMetadata::key(), "AddValue01");

@ -45,7 +45,7 @@
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/matchers/matchers.h"
#include "src/core/util/json/json_writer.h"
#include "src/core/xds/grpc/upb_utils.h"
#include "src/core/util/upb_utils.h"
#include "src/core/xds/grpc/xds_bootstrap_grpc.h"
#include "src/core/xds/xds_client/xds_bootstrap.h"
#include "src/core/xds/xds_client/xds_client.h"

@ -111,12 +111,17 @@ class RpcBehaviorLbPolicy : public LoadBalancingPolicy {
rpc_behavior_(rpc_behavior) {}
PickResult Pick(PickArgs args) override {
char* rpc_behavior_copy = static_cast<char*>(
args.call_state->Alloc(rpc_behavior_.length() + 1));
strcpy(rpc_behavior_copy, rpc_behavior_.c_str());
args.initial_metadata->Add(kRpcBehaviorMetadataKey, rpc_behavior_copy);
// Do pick.
return delegate_picker_->Pick(args);
auto pick_result = delegate_picker_->Pick(args);
// Add metadata.
auto* complete_pick =
absl::get_if<PickResult::Complete>(&pick_result.result);
if (complete_pick != nullptr) {
complete_pick->metadata_mutations.Add(kRpcBehaviorMetadataKey,
rpc_behavior_);
}
// Return result.
return pick_result;
}
private:

@ -1115,6 +1115,8 @@ src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/dynamic_filters.h \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/global_subchannel_pool.h \
src/core/client_channel/lb_metadata.cc \
src/core/client_channel/lb_metadata.h \
src/core/client_channel/load_balanced_call_destination.cc \
src/core/client_channel/load_balanced_call_destination.h \
src/core/client_channel/local_subchannel_pool.cc \
@ -2951,6 +2953,7 @@ src/core/util/time.cc \
src/core/util/time_precise.cc \
src/core/util/time_precise.h \
src/core/util/tmpfile.h \
src/core/util/upb_utils.h \
src/core/util/useful.h \
src/core/util/windows/cpu.cc \
src/core/util/windows/log.cc \
@ -2963,7 +2966,6 @@ src/core/xds/grpc/certificate_provider_store.cc \
src/core/xds/grpc/certificate_provider_store.h \
src/core/xds/grpc/file_watcher_certificate_provider_factory.cc \
src/core/xds/grpc/file_watcher_certificate_provider_factory.h \
src/core/xds/grpc/upb_utils.h \
src/core/xds/grpc/xds_audit_logger_registry.cc \
src/core/xds/grpc/xds_audit_logger_registry.h \
src/core/xds/grpc/xds_bootstrap_grpc.cc \

@ -915,6 +915,8 @@ src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/dynamic_filters.h \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/global_subchannel_pool.h \
src/core/client_channel/lb_metadata.cc \
src/core/client_channel/lb_metadata.h \
src/core/client_channel/load_balanced_call_destination.cc \
src/core/client_channel/load_balanced_call_destination.h \
src/core/client_channel/local_subchannel_pool.cc \
@ -2730,6 +2732,7 @@ src/core/util/time.cc \
src/core/util/time_precise.cc \
src/core/util/time_precise.h \
src/core/util/tmpfile.h \
src/core/util/upb_utils.h \
src/core/util/useful.h \
src/core/util/windows/cpu.cc \
src/core/util/windows/log.cc \
@ -2742,7 +2745,6 @@ src/core/xds/grpc/certificate_provider_store.cc \
src/core/xds/grpc/certificate_provider_store.h \
src/core/xds/grpc/file_watcher_certificate_provider_factory.cc \
src/core/xds/grpc/file_watcher_certificate_provider_factory.h \
src/core/xds/grpc/upb_utils.h \
src/core/xds/grpc/xds_audit_logger_registry.cc \
src/core/xds/grpc/xds_audit_logger_registry.h \
src/core/xds/grpc/xds_bootstrap_grpc.cc \

@ -24,7 +24,10 @@ os.chdir(os.path.join(os.path.dirname(sys.argv[0]), "../../.."))
# map of banned function signature to allowlist
BANNED_EXCEPT = {
"grpc_slice_from_static_buffer(": ["src/core/lib/slice/slice.cc"],
"grpc_slice_from_static_buffer(": [
"src/core/lib/slice/slice.cc",
"src/core/load_balancing/grpclb/grpclb.cc",
],
"grpc_resource_quota_ref(": ["src/core/lib/resource_quota/api.cc"],
"grpc_resource_quota_unref(": [
"src/core/lib/resource_quota/api.cc",

Loading…
Cancel
Save