From 32b43f016817fd8a1ed74d07ec2740a72017b58a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 23 Sep 2022 11:31:03 -0700 Subject: [PATCH] [channel_args] Make channel_stack_init take new type (#30841) * [channel_args] Make channel_stack_init take new type * fix * fix * fix * Automated change: Fix sanity tests Co-authored-by: ctiller --- .../filters/client_channel/client_channel.cc | 2 +- .../filters/client_channel/dynamic_filters.cc | 63 ++++++------------- .../filters/client_channel/dynamic_filters.h | 15 +++-- src/core/lib/channel/channel_stack.cc | 7 ++- src/core/lib/channel/channel_stack.h | 4 +- .../lib/channel/channel_stack_builder_impl.cc | 4 +- src/core/lib/surface/channel_stack_type.cc | 4 ++ src/core/lib/surface/channel_stack_type.h | 2 + test/core/channel/channel_stack_test.cc | 12 +--- test/cpp/microbenchmarks/bm_call_create.cc | 4 +- 10 files changed, 46 insertions(+), 71 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 3846f86aeda..b68cdeb2a00 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1460,7 +1460,7 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { filters.push_back(&DynamicTerminationFilter::kFilterVtable); } RefCountedPtr dynamic_filters = - DynamicFilters::Create(new_args.ToC().get(), std::move(filters)); + DynamicFilters::Create(new_args, std::move(filters)); GPR_ASSERT(dynamic_filters != nullptr); // Grab data plane lock to update service config. // diff --git a/src/core/ext/filters/client_channel/dynamic_filters.cc b/src/core/ext/filters/client_channel/dynamic_filters.cc index be1d2589f5e..a653f177524 100644 --- a/src/core/ext/filters/client_channel/dynamic_filters.cc +++ b/src/core/ext/filters/client_channel/dynamic_filters.cc @@ -24,15 +24,16 @@ #include #include -#include "absl/status/status.h" +#include "absl/status/statusor.h" -#include #include #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder_impl.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/alloc.h" +#include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/surface/lame_client.h" // Conversion between call and call stack. @@ -63,8 +64,8 @@ DynamicFilters::Call::Call(Args args, grpc_error_handle* error) args.arena, /* arena */ args.call_combiner /* call_combiner */ }; - *error = grpc_call_stack_init(channel_stack_->channel_stack_, 1, Destroy, - this, &call_args); + *error = grpc_call_stack_init(channel_stack_->channel_stack_.get(), 1, + Destroy, this, &call_args); if (GPR_UNLIKELY(!GRPC_ERROR_IS_NONE(*error))) { gpr_log(GPR_ERROR, "error: %s", grpc_error_std_string(*error).c_str()); return; @@ -136,58 +137,30 @@ void DynamicFilters::Call::IncrementRefCount(const DebugLocation& /*location*/, namespace { -void DestroyChannelStack(void* arg, grpc_error_handle /*error*/) { - grpc_channel_stack* channel_stack = static_cast(arg); - grpc_channel_stack_destroy(channel_stack); - gpr_free(channel_stack); -} - -std::pair CreateChannelStack( - const grpc_channel_args* args, - std::vector filters) { - // Allocate memory for channel stack. - const size_t channel_stack_size = - grpc_channel_stack_size(filters.data(), filters.size()); - grpc_channel_stack* channel_stack = - reinterpret_cast(gpr_zalloc(channel_stack_size)); - // Initialize stack. - grpc_error_handle error = grpc_channel_stack_init( - /*initial_refs=*/1, DestroyChannelStack, channel_stack, filters.data(), - filters.size(), args, "DynamicFilters", channel_stack); - if (!GRPC_ERROR_IS_NONE(error)) { - gpr_log(GPR_ERROR, "error initializing client internal stack: %s", - grpc_error_std_string(error).c_str()); - grpc_channel_stack_destroy(channel_stack); - gpr_free(channel_stack); - return {nullptr, error}; +absl::StatusOr> CreateChannelStack( + const ChannelArgs& args, std::vector filters) { + ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC); + builder.SetChannelArgs(args); + for (auto filter : filters) { + builder.AppendFilter(filter); } - return {channel_stack, GRPC_ERROR_NONE}; + return builder.Build(); } } // namespace RefCountedPtr DynamicFilters::Create( - const grpc_channel_args* args, - std::vector filters) { + const ChannelArgs& args, std::vector filters) { // Attempt to create channel stack from requested filters. auto p = CreateChannelStack(args, std::move(filters)); - if (!GRPC_ERROR_IS_NONE(p.second)) { + if (!p.ok()) { // Channel stack creation failed with requested filters. // Create with lame filter instead. - grpc_error_handle error = p.second; - grpc_arg error_arg = MakeLameClientErrorArg(&error); - grpc_channel_args* new_args = - grpc_channel_args_copy_and_add(args, &error_arg, 1); - GRPC_ERROR_UNREF(error); - p = CreateChannelStack(new_args, {&LameClientFilter::kFilter}); - GPR_ASSERT(GRPC_ERROR_IS_NONE(p.second)); - grpc_channel_args_destroy(new_args); + auto error = p.status(); + p = CreateChannelStack(args.Set(MakeLameClientErrorArg(&error)), + {&LameClientFilter::kFilter}); } - return MakeRefCounted(p.first); -} - -DynamicFilters::~DynamicFilters() { - GRPC_CHANNEL_STACK_UNREF(channel_stack_, "~DynamicFilters"); + return MakeRefCounted(std::move(p.value())); } RefCountedPtr DynamicFilters::CreateCall( diff --git a/src/core/ext/filters/client_channel/dynamic_filters.h b/src/core/ext/filters/client_channel/dynamic_filters.h index 3f812a7f015..cd235f74f4c 100644 --- a/src/core/ext/filters/client_channel/dynamic_filters.h +++ b/src/core/ext/filters/client_channel/dynamic_filters.h @@ -19,12 +19,14 @@ #include +#include #include -#include #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" +#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/gpr/time_precise.h" #include "src/core/lib/gprpp/debug_location.h" @@ -90,18 +92,15 @@ class DynamicFilters : public RefCounted { }; static RefCountedPtr Create( - const grpc_channel_args* args, - std::vector filters); + const ChannelArgs& args, std::vector filters); - explicit DynamicFilters(grpc_channel_stack* channel_stack) - : channel_stack_(channel_stack) {} - - ~DynamicFilters() override; + explicit DynamicFilters(RefCountedPtr channel_stack) + : channel_stack_(std::move(channel_stack)) {} RefCountedPtr CreateCall(Call::Args args, grpc_error_handle* error); private: - grpc_channel_stack* channel_stack_; + RefCountedPtr channel_stack_; }; } // namespace grpc_core diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc index 60f1d51c598..0c933e9168b 100644 --- a/src/core/lib/channel/channel_stack.cc +++ b/src/core/lib/channel/channel_stack.cc @@ -22,10 +22,12 @@ #include +#include #include #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/alloc.h" grpc_core::TraceFlag grpc_trace_channel(false, "channel"); @@ -104,7 +106,7 @@ grpc_call_element* grpc_call_stack_element(grpc_call_stack* call_stack, grpc_error_handle grpc_channel_stack_init( int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg, const grpc_channel_filter** filters, size_t filter_count, - const grpc_channel_args* channel_args, const char* name, + const grpc_core::ChannelArgs& channel_args, const char* name, grpc_channel_stack* stack) { if (grpc_trace_channel_stack.enabled()) { gpr_log(GPR_INFO, "CHANNEL_STACK: init %s", name); @@ -133,9 +135,10 @@ grpc_error_handle grpc_channel_stack_init( /* init per-filter data */ grpc_error_handle first_error = GRPC_ERROR_NONE; + auto c_channel_args = channel_args.ToC(); for (i = 0; i < filter_count; i++) { args.channel_stack = stack; - args.channel_args = channel_args; + args.channel_args = c_channel_args.get(); args.is_first = i == 0; args.is_last = i == (filter_count - 1); elems[i].filter = filters[i]; diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index b555263b8ba..717035efbe1 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -56,6 +56,7 @@ #include #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/debug/trace.h" @@ -272,7 +273,8 @@ size_t grpc_channel_stack_size(const grpc_channel_filter** filters, grpc_error_handle grpc_channel_stack_init( int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg, const grpc_channel_filter** filters, size_t filter_count, - const grpc_channel_args* args, const char* name, grpc_channel_stack* stack); + const grpc_core::ChannelArgs& args, const char* name, + grpc_channel_stack* stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_channel_stack* stack); diff --git a/src/core/lib/channel/channel_stack_builder_impl.cc b/src/core/lib/channel/channel_stack_builder_impl.cc index 74e0634925e..cffec908e0d 100644 --- a/src/core/lib/channel/channel_stack_builder_impl.cc +++ b/src/core/lib/channel/channel_stack_builder_impl.cc @@ -72,8 +72,8 @@ ChannelStackBuilderImpl::Build() { grpc_channel_stack_destroy(stk); gpr_free(stk); }, - channel_stack, stack->data(), stack->size(), final_args.ToC().get(), - name(), channel_stack); + channel_stack, stack->data(), stack->size(), final_args, name(), + channel_stack); if (!GRPC_ERROR_IS_NONE(error)) { grpc_channel_stack_destroy(channel_stack); diff --git a/src/core/lib/surface/channel_stack_type.cc b/src/core/lib/surface/channel_stack_type.cc index 767ab4b85d9..d140628a169 100644 --- a/src/core/lib/surface/channel_stack_type.cc +++ b/src/core/lib/surface/channel_stack_type.cc @@ -30,6 +30,8 @@ bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type) { return true; case GRPC_CLIENT_DIRECT_CHANNEL: return true; + case GRPC_CLIENT_DYNAMIC: + return true; case GRPC_SERVER_CHANNEL: return false; case GRPC_NUM_CHANNEL_STACK_TYPES: @@ -50,6 +52,8 @@ const char* grpc_channel_stack_type_string(grpc_channel_stack_type type) { return "CLIENT_LAME_CHANNEL"; case GRPC_CLIENT_DIRECT_CHANNEL: return "CLIENT_DIRECT_CHANNEL"; + case GRPC_CLIENT_DYNAMIC: + return "CLIENT_DYNAMIC_CHANNEL"; case GRPC_NUM_CHANNEL_STACK_TYPES: break; } diff --git a/src/core/lib/surface/channel_stack_type.h b/src/core/lib/surface/channel_stack_type.h index 67d1fa86196..433230d67bd 100644 --- a/src/core/lib/surface/channel_stack_type.h +++ b/src/core/lib/surface/channel_stack_type.h @@ -27,6 +27,8 @@ typedef enum { // bottom-half of a client channel: everything that happens post-load // balancing (bound to a specific transport) GRPC_CLIENT_SUBCHANNEL, + // dynamic part of a client channel + GRPC_CLIENT_DYNAMIC, // a permanently broken client channel GRPC_CLIENT_LAME_CHANNEL, // a directly connected client channel (without load-balancing, directly talks diff --git a/test/core/channel/channel_stack_test.cc b/test/core/channel/channel_stack_test.cc index a1847d36a89..e61a1f3ead7 100644 --- a/test/core/channel/channel_stack_test.cc +++ b/test/core/channel/channel_stack_test.cc @@ -97,26 +97,18 @@ TEST(ChannelStackTest, CreateChannelStack) { grpc_call_stack* call_stack; grpc_channel_element* channel_elem; grpc_call_element* call_elem; - grpc_arg arg; - grpc_channel_args chan_args; int* channel_data; int* call_data; grpc_core::ExecCtx exec_ctx; grpc_slice path = grpc_slice_from_static_string("/service/method"); - arg.type = GRPC_ARG_INTEGER; - arg.key = const_cast("test_key"); - arg.value.integer = 42; - - chan_args.num_args = 1; - chan_args.args = &arg; - channel_stack = static_cast( gpr_malloc(grpc_channel_stack_size(&filters, 1))); ASSERT_TRUE(GRPC_LOG_IF_ERROR( "grpc_channel_stack_init", grpc_channel_stack_init(1, free_channel, channel_stack, &filters, 1, - &chan_args, "test", channel_stack))); + grpc_core::ChannelArgs().Set("test_key", 42), + "test", channel_stack))); EXPECT_EQ(channel_stack->count, 1); channel_elem = grpc_channel_stack_element(channel_stack, 0); channel_data = static_cast(channel_elem->channel_data); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 210ba8c25ed..2bb793247a3 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -539,8 +539,8 @@ static void BM_IsolatedFilter(benchmark::State& state) { "channel_stack_init", grpc_channel_stack_init(1, FilterDestroy, channel_stack, filters.empty() ? nullptr : &filters[0], - filters.size(), channel_args.ToC().get(), - "CHANNEL", channel_stack))); + filters.size(), channel_args, "CHANNEL", + channel_stack))); grpc_core::ExecCtx::Get()->Flush(); grpc_call_stack* call_stack = static_cast(gpr_zalloc(channel_stack->call_stack_size));