[channel_args] Use c++ channel args during channel init (#32300)

* [channel_args] Use c++ channel args during channel init

Previously we were converting to C and then back to C++ for each
filter... this ought to save some CPU time during connection
establishment.

* Automated change: Fix sanity tests

* cpp channel filters

* Automated change: Fix sanity tests

* iwyu

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/32305/head
Craig Tiller 2 years ago committed by GitHub
parent 117457a767
commit 93b3b5adc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      src/core/ext/filters/client_channel/client_channel.cc
  2. 23
      src/core/ext/filters/client_channel/retry_filter.cc
  3. 8
      src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
  4. 2
      src/core/ext/filters/message_size/message_size_filter.cc
  5. 7
      src/core/ext/filters/rbac/rbac_filter.cc
  6. 3
      src/core/lib/channel/channel_stack.cc
  7. 2
      src/core/lib/channel/channel_stack.h
  8. 3
      src/core/lib/channel/connected_channel.cc
  9. 3
      src/core/lib/channel/promise_based_filter.h
  10. 6
      src/cpp/common/channel_filter.cc
  11. 5
      src/cpp/common/channel_filter.h
  12. 5
      src/cpp/ext/filters/census/client_filter.cc
  13. 11
      test/core/channel/channel_stack_test.cc

@ -285,9 +285,8 @@ class DynamicTerminationFilter {
const grpc_channel_info* /*info*/) {}
private:
explicit DynamicTerminationFilter(const grpc_channel_args* args)
: chand_(grpc_channel_args_find_pointer<ClientChannel>(
args, GRPC_ARG_CLIENT_CHANNEL)) {}
explicit DynamicTerminationFilter(const ChannelArgs& args)
: chand_(args.GetObject<ClientChannel>()) {}
ClientChannel* chand_;
};
@ -975,7 +974,7 @@ RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
ClientChannel::ClientChannel(grpc_channel_element_args* args,
grpc_error_handle* error)
: channel_args_(ChannelArgs::FromC(args->channel_args)),
: channel_args_(args->channel_args),
deadline_checking_enabled_(grpc_deadline_checking_enabled(channel_args_)),
owning_stack_(args->channel_stack),
client_channel_factory_(channel_args_.GetObject<ClientChannelFactory>()),

@ -168,36 +168,33 @@ class RetryFilter {
const grpc_channel_info* /*info*/) {}
private:
static size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
return static_cast<size_t>(grpc_channel_args_find_integer(
args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE,
{DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
static size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) {
return Clamp(args.GetInt(GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE)
.value_or(DEFAULT_PER_RPC_RETRY_BUFFER_SIZE),
0, INT_MAX);
}
RetryFilter(const grpc_channel_args* args, grpc_error_handle* error)
: client_channel_(grpc_channel_args_find_pointer<ClientChannel>(
args, GRPC_ARG_CLIENT_CHANNEL)),
RetryFilter(const ChannelArgs& args, grpc_error_handle* error)
: client_channel_(args.GetObject<ClientChannel>()),
per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)),
service_config_parser_index_(
internal::RetryServiceConfigParser::ParserIndex()) {
// Get retry throttling parameters from service config.
auto* service_config = grpc_channel_args_find_pointer<ServiceConfig>(
args, GRPC_ARG_SERVICE_CONFIG_OBJ);
auto* service_config = args.GetObject<ServiceConfig>();
if (service_config == nullptr) return;
const auto* config = static_cast<const RetryGlobalConfig*>(
service_config->GetGlobalParsedConfig(
RetryServiceConfigParser::ParserIndex()));
if (config == nullptr) return;
// Get server name from target URI.
const char* server_uri =
grpc_channel_args_find_string(args, GRPC_ARG_SERVER_URI);
if (server_uri == nullptr) {
auto server_uri = args.GetString(GRPC_ARG_SERVER_URI);
if (!server_uri.has_value()) {
*error = GRPC_ERROR_CREATE(
"server URI channel arg missing or wrong type in client channel "
"filter");
return;
}
absl::StatusOr<URI> uri = URI::Parse(server_uri);
absl::StatusOr<URI> uri = URI::Parse(*server_uri);
if (!uri.ok() || uri->path().empty()) {
*error =
GRPC_ERROR_CREATE("could not extract server name from target URI");

@ -54,11 +54,11 @@ class ServiceConfigChannelArgChannelData {
public:
explicit ServiceConfigChannelArgChannelData(
const grpc_channel_element_args* args) {
const char* service_config_str = grpc_channel_args_find_string(
args->channel_args, GRPC_ARG_SERVICE_CONFIG);
if (service_config_str != nullptr) {
auto service_config_str =
args->channel_args.GetOwnedString(GRPC_ARG_SERVICE_CONFIG);
if (service_config_str.has_value()) {
auto service_config = ServiceConfigImpl::Create(
ChannelArgs::FromC(args->channel_args), service_config_str);
args->channel_args, service_config_str->c_str());
if (!service_config.ok()) {
gpr_log(GPR_ERROR, "%s", service_config.status().ToString().c_str());
} else {

@ -305,7 +305,7 @@ static grpc_error_handle message_size_init_channel_elem(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (chand) channel_data();
chand->limits = grpc_core::MessageSizeParsedConfig::GetFromChannelArgs(
grpc_core::ChannelArgs::FromC(args->channel_args));
args->channel_args);
return absl::OkStatus();
}

@ -23,6 +23,7 @@
#include "absl/status/status.h"
#include <grpc/grpc_security.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
@ -36,6 +37,7 @@
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/transport/transport_fwd.h"
#include "src/core/lib/transport/transport_impl.h"
namespace grpc_core {
@ -144,12 +146,11 @@ RbacFilter::RbacFilter(size_t index,
grpc_error_handle RbacFilter::Init(grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(elem->filter == &kFilterVtable);
auto* auth_context = grpc_find_auth_context_in_args(args->channel_args);
auto* auth_context = args->channel_args.GetObject<grpc_auth_context>();
if (auth_context == nullptr) {
return GRPC_ERROR_CREATE("No auth context found");
}
auto* transport = grpc_channel_args_find_pointer<grpc_transport>(
args->channel_args, GRPC_ARG_TRANSPORT);
auto* transport = args->channel_args.GetObject<grpc_transport>();
if (transport == nullptr) {
// This should never happen since the transport is always set on the server
// side.

@ -139,10 +139,9 @@ grpc_error_handle grpc_channel_stack_init(
// init per-filter data
grpc_error_handle first_error;
auto c_channel_args = channel_args.ToC();
for (i = 0; i < filter_count; i++) {
args.channel_stack = stack;
args.channel_args = c_channel_args.get();
args.channel_args = channel_args;
args.is_first = i == 0;
args.is_last = i == (filter_count - 1);
elems[i].filter = filters[i];

@ -76,7 +76,7 @@
struct grpc_channel_element_args {
grpc_channel_stack* channel_stack;
const grpc_channel_args* channel_args;
grpc_core::ChannelArgs channel_args;
int is_first;
int is_last;
};

@ -228,8 +228,7 @@ static grpc_error_handle connected_channel_init_channel_elem(
grpc_channel_element* elem, grpc_channel_element_args* args) {
channel_data* cd = static_cast<channel_data*>(elem->channel_data);
GPR_ASSERT(args->is_last);
cd->transport = grpc_channel_args_find_pointer<grpc_transport>(
args->channel_args, GRPC_ARG_TRANSPORT);
cd->transport = args->channel_args.GetObject<grpc_transport>();
return absl::OkStatus();
}

@ -44,7 +44,6 @@
#include <grpc/support/log.h>
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
@ -873,7 +872,7 @@ struct ChannelFilterWithFlagsMethods {
static absl::Status InitChannelElem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(args->is_last == ((kFlags & kFilterIsLast) != 0));
auto status = F::Create(ChannelArgs::FromC(args->channel_args),
auto status = F::Create(args->channel_args,
ChannelFilter::Args(args->channel_stack, elem));
if (!status.ok()) {
static_assert(

@ -18,8 +18,6 @@
#include "src/cpp/common/channel_filter.h"
#include <memory>
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
@ -73,12 +71,12 @@ namespace internal {
void RegisterChannelFilter(
grpc_channel_stack_type stack_type, int priority,
std::function<bool(const grpc_channel_args&)> include_filter,
std::function<bool(const grpc_core::ChannelArgs&)> include_filter,
const grpc_channel_filter* filter) {
auto maybe_add_filter = [include_filter,
filter](grpc_core::ChannelStackBuilder* builder) {
if (include_filter != nullptr) {
if (!include_filter(*builder->channel_args().ToC())) {
if (!include_filter(builder->channel_args())) {
return true;
}
}

@ -33,6 +33,7 @@
#include <grpc/support/atm.h>
#include <grpcpp/support/config.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
@ -313,7 +314,7 @@ class ChannelFilter final {
void RegisterChannelFilter(
grpc_channel_stack_type stack_type, int priority,
std::function<bool(const grpc_channel_args&)> include_filter,
std::function<bool(const grpc_core::ChannelArgs&)> include_filter,
const grpc_channel_filter* filter);
} // namespace internal
@ -331,7 +332,7 @@ void RegisterChannelFilter(
template <typename ChannelDataType, typename CallDataType>
void RegisterChannelFilter(
const char* name, grpc_channel_stack_type stack_type, int priority,
std::function<bool(const grpc_channel_args&)> include_filter) {
std::function<bool(const grpc_core::ChannelArgs&)> include_filter) {
using FilterType = internal::ChannelFilter<ChannelDataType, CallDataType>;
static const grpc_channel_filter filter = {
FilterType::StartTransportStreamOpBatch,

@ -73,9 +73,8 @@ constexpr uint32_t
grpc_error_handle OpenCensusClientChannelData::Init(
grpc_channel_element* /*elem*/, grpc_channel_element_args* args) {
bool observability_enabled = grpc_core::ChannelArgs::FromC(args->channel_args)
.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY)
.value_or(true);
bool observability_enabled =
args->channel_args.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY).value_or(true);
// Only run the Post-Init Registry if observability is enabled to avoid
// running into a cyclic loop for exporter channels.
if (observability_enabled) {

@ -18,11 +18,10 @@
#include "src/core/lib/channel/channel_stack.h"
#include <limits.h>
#include <string>
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include "gtest/gtest.h"
#include <grpc/support/alloc.h>
@ -36,12 +35,10 @@
static grpc_error_handle channel_init_func(grpc_channel_element* elem,
grpc_channel_element_args* args) {
int test_value = grpc_channel_args_find_integer(args->channel_args,
"test_key", {-1, 0, INT_MAX});
int test_value = args->channel_args.GetInt("test_key").value_or(-1);
EXPECT_EQ(test_value, 42);
auto* ee = grpc_channel_args_find_pointer<
grpc_event_engine::experimental::EventEngine>(
args->channel_args, GRPC_INTERNAL_ARG_EVENT_ENGINE);
auto* ee = args->channel_args
.GetObject<grpc_event_engine::experimental::EventEngine>();
EXPECT_NE(ee, nullptr);
EXPECT_TRUE(args->is_first);
EXPECT_TRUE(args->is_last);

Loading…
Cancel
Save