[channel_args] Make channel_stack_init take new type (#30841)

* [channel_args] Make channel_stack_init take new type

* fix

* fix

* fix

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30994/head
Craig Tiller 2 years ago committed by GitHub
parent 747d016397
commit 32b43f0168
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/filters/client_channel/client_channel.cc
  2. 63
      src/core/ext/filters/client_channel/dynamic_filters.cc
  3. 15
      src/core/ext/filters/client_channel/dynamic_filters.h
  4. 7
      src/core/lib/channel/channel_stack.cc
  5. 4
      src/core/lib/channel/channel_stack.h
  6. 4
      src/core/lib/channel/channel_stack_builder_impl.cc
  7. 4
      src/core/lib/surface/channel_stack_type.cc
  8. 2
      src/core/lib/surface/channel_stack_type.h
  9. 12
      test/core/channel/channel_stack_test.cc
  10. 4
      test/cpp/microbenchmarks/bm_call_create.cc

@ -1460,7 +1460,7 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
filters.push_back(&DynamicTerminationFilter::kFilterVtable);
}
RefCountedPtr<DynamicFilters> dynamic_filters =
DynamicFilters::Create(new_args.ToC().get(), std::move(filters));
DynamicFilters::Create(new_args, std::move(filters));
GPR_ASSERT(dynamic_filters != nullptr);
// Grab data plane lock to update service config.
//

@ -24,15 +24,16 @@
#include <string>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/lame_client.h"
// Conversion between call and call stack.
@ -63,8 +64,8 @@ DynamicFilters::Call::Call(Args args, grpc_error_handle* error)
args.arena, /* arena */
args.call_combiner /* call_combiner */
};
*error = grpc_call_stack_init(channel_stack_->channel_stack_, 1, Destroy,
this, &call_args);
*error = grpc_call_stack_init(channel_stack_->channel_stack_.get(), 1,
Destroy, this, &call_args);
if (GPR_UNLIKELY(!GRPC_ERROR_IS_NONE(*error))) {
gpr_log(GPR_ERROR, "error: %s", grpc_error_std_string(*error).c_str());
return;
@ -136,58 +137,30 @@ void DynamicFilters::Call::IncrementRefCount(const DebugLocation& /*location*/,
namespace {
void DestroyChannelStack(void* arg, grpc_error_handle /*error*/) {
grpc_channel_stack* channel_stack = static_cast<grpc_channel_stack*>(arg);
grpc_channel_stack_destroy(channel_stack);
gpr_free(channel_stack);
}
std::pair<grpc_channel_stack*, grpc_error_handle> CreateChannelStack(
const grpc_channel_args* args,
std::vector<const grpc_channel_filter*> filters) {
// Allocate memory for channel stack.
const size_t channel_stack_size =
grpc_channel_stack_size(filters.data(), filters.size());
grpc_channel_stack* channel_stack =
reinterpret_cast<grpc_channel_stack*>(gpr_zalloc(channel_stack_size));
// Initialize stack.
grpc_error_handle error = grpc_channel_stack_init(
/*initial_refs=*/1, DestroyChannelStack, channel_stack, filters.data(),
filters.size(), args, "DynamicFilters", channel_stack);
if (!GRPC_ERROR_IS_NONE(error)) {
gpr_log(GPR_ERROR, "error initializing client internal stack: %s",
grpc_error_std_string(error).c_str());
grpc_channel_stack_destroy(channel_stack);
gpr_free(channel_stack);
return {nullptr, error};
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> CreateChannelStack(
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters) {
ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC);
builder.SetChannelArgs(args);
for (auto filter : filters) {
builder.AppendFilter(filter);
}
return {channel_stack, GRPC_ERROR_NONE};
return builder.Build();
}
} // namespace
RefCountedPtr<DynamicFilters> DynamicFilters::Create(
const grpc_channel_args* args,
std::vector<const grpc_channel_filter*> filters) {
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters) {
// Attempt to create channel stack from requested filters.
auto p = CreateChannelStack(args, std::move(filters));
if (!GRPC_ERROR_IS_NONE(p.second)) {
if (!p.ok()) {
// Channel stack creation failed with requested filters.
// Create with lame filter instead.
grpc_error_handle error = p.second;
grpc_arg error_arg = MakeLameClientErrorArg(&error);
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(args, &error_arg, 1);
GRPC_ERROR_UNREF(error);
p = CreateChannelStack(new_args, {&LameClientFilter::kFilter});
GPR_ASSERT(GRPC_ERROR_IS_NONE(p.second));
grpc_channel_args_destroy(new_args);
auto error = p.status();
p = CreateChannelStack(args.Set(MakeLameClientErrorArg(&error)),
{&LameClientFilter::kFilter});
}
return MakeRefCounted<DynamicFilters>(p.first);
}
DynamicFilters::~DynamicFilters() {
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "~DynamicFilters");
return MakeRefCounted<DynamicFilters>(std::move(p.value()));
}
RefCountedPtr<DynamicFilters::Call> DynamicFilters::CreateCall(

@ -19,12 +19,14 @@
#include <grpc/support/port_platform.h>
#include <utility>
#include <vector>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/slice.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -90,18 +92,15 @@ class DynamicFilters : public RefCounted<DynamicFilters> {
};
static RefCountedPtr<DynamicFilters> Create(
const grpc_channel_args* args,
std::vector<const grpc_channel_filter*> filters);
const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters);
explicit DynamicFilters(grpc_channel_stack* channel_stack)
: channel_stack_(channel_stack) {}
~DynamicFilters() override;
explicit DynamicFilters(RefCountedPtr<grpc_channel_stack> channel_stack)
: channel_stack_(std::move(channel_stack)) {}
RefCountedPtr<Call> CreateCall(Call::Args args, grpc_error_handle* error);
private:
grpc_channel_stack* channel_stack_;
RefCountedPtr<grpc_channel_stack> channel_stack_;
};
} // namespace grpc_core

@ -22,10 +22,12 @@
#include <stdint.h>
#include <memory>
#include <utility>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/alloc.h"
grpc_core::TraceFlag grpc_trace_channel(false, "channel");
@ -104,7 +106,7 @@ grpc_call_element* grpc_call_stack_element(grpc_call_stack* call_stack,
grpc_error_handle grpc_channel_stack_init(
int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
const grpc_channel_filter** filters, size_t filter_count,
const grpc_channel_args* channel_args, const char* name,
const grpc_core::ChannelArgs& channel_args, const char* name,
grpc_channel_stack* stack) {
if (grpc_trace_channel_stack.enabled()) {
gpr_log(GPR_INFO, "CHANNEL_STACK: init %s", name);
@ -133,9 +135,10 @@ grpc_error_handle grpc_channel_stack_init(
/* init per-filter data */
grpc_error_handle first_error = GRPC_ERROR_NONE;
auto c_channel_args = channel_args.ToC();
for (i = 0; i < filter_count; i++) {
args.channel_stack = stack;
args.channel_args = channel_args;
args.channel_args = c_channel_args.get();
args.is_first = i == 0;
args.is_last = i == (filter_count - 1);
elems[i].filter = filters[i];

@ -56,6 +56,7 @@
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/debug/trace.h"
@ -272,7 +273,8 @@ size_t grpc_channel_stack_size(const grpc_channel_filter** filters,
grpc_error_handle grpc_channel_stack_init(
int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
const grpc_channel_filter** filters, size_t filter_count,
const grpc_channel_args* args, const char* name, grpc_channel_stack* stack);
const grpc_core::ChannelArgs& args, const char* name,
grpc_channel_stack* stack);
/* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_channel_stack* stack);

@ -72,8 +72,8 @@ ChannelStackBuilderImpl::Build() {
grpc_channel_stack_destroy(stk);
gpr_free(stk);
},
channel_stack, stack->data(), stack->size(), final_args.ToC().get(),
name(), channel_stack);
channel_stack, stack->data(), stack->size(), final_args, name(),
channel_stack);
if (!GRPC_ERROR_IS_NONE(error)) {
grpc_channel_stack_destroy(channel_stack);

@ -30,6 +30,8 @@ bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type) {
return true;
case GRPC_CLIENT_DIRECT_CHANNEL:
return true;
case GRPC_CLIENT_DYNAMIC:
return true;
case GRPC_SERVER_CHANNEL:
return false;
case GRPC_NUM_CHANNEL_STACK_TYPES:
@ -50,6 +52,8 @@ const char* grpc_channel_stack_type_string(grpc_channel_stack_type type) {
return "CLIENT_LAME_CHANNEL";
case GRPC_CLIENT_DIRECT_CHANNEL:
return "CLIENT_DIRECT_CHANNEL";
case GRPC_CLIENT_DYNAMIC:
return "CLIENT_DYNAMIC_CHANNEL";
case GRPC_NUM_CHANNEL_STACK_TYPES:
break;
}

@ -27,6 +27,8 @@ typedef enum {
// bottom-half of a client channel: everything that happens post-load
// balancing (bound to a specific transport)
GRPC_CLIENT_SUBCHANNEL,
// dynamic part of a client channel
GRPC_CLIENT_DYNAMIC,
// a permanently broken client channel
GRPC_CLIENT_LAME_CHANNEL,
// a directly connected client channel (without load-balancing, directly talks

@ -97,26 +97,18 @@ TEST(ChannelStackTest, CreateChannelStack) {
grpc_call_stack* call_stack;
grpc_channel_element* channel_elem;
grpc_call_element* call_elem;
grpc_arg arg;
grpc_channel_args chan_args;
int* channel_data;
int* call_data;
grpc_core::ExecCtx exec_ctx;
grpc_slice path = grpc_slice_from_static_string("/service/method");
arg.type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>("test_key");
arg.value.integer = 42;
chan_args.num_args = 1;
chan_args.args = &arg;
channel_stack = static_cast<grpc_channel_stack*>(
gpr_malloc(grpc_channel_stack_size(&filters, 1)));
ASSERT_TRUE(GRPC_LOG_IF_ERROR(
"grpc_channel_stack_init",
grpc_channel_stack_init(1, free_channel, channel_stack, &filters, 1,
&chan_args, "test", channel_stack)));
grpc_core::ChannelArgs().Set("test_key", 42),
"test", channel_stack)));
EXPECT_EQ(channel_stack->count, 1);
channel_elem = grpc_channel_stack_element(channel_stack, 0);
channel_data = static_cast<int*>(channel_elem->channel_data);

@ -539,8 +539,8 @@ static void BM_IsolatedFilter(benchmark::State& state) {
"channel_stack_init",
grpc_channel_stack_init(1, FilterDestroy, channel_stack,
filters.empty() ? nullptr : &filters[0],
filters.size(), channel_args.ToC().get(),
"CHANNEL", channel_stack)));
filters.size(), channel_args, "CHANNEL",
channel_stack)));
grpc_core::ExecCtx::Get()->Flush();
grpc_call_stack* call_stack =
static_cast<grpc_call_stack*>(gpr_zalloc(channel_stack->call_stack_size));

Loading…
Cancel
Save