Revert "Revert "Transport channel arg (#28802)" (#28818)" (#28820)

This reverts commit 2532cf5321.
reviewable/pr25586/r27
Craig Tiller 3 years ago committed by GitHub
parent 08181286e3
commit 4169f24dcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/core/ext/filters/client_channel/dynamic_filters.cc
  2. 9
      src/core/ext/filters/http/client/http_client_filter.cc
  3. 8
      src/core/ext/filters/rbac/rbac_filter.cc
  4. 5
      src/core/lib/channel/channel_stack.cc
  5. 7
      src/core/lib/channel/channel_stack.h
  6. 25
      src/core/lib/channel/channel_stack_builder.cc
  7. 2
      test/core/channel/channel_stack_test.cc
  8. 25
      test/cpp/microbenchmarks/bm_call_create.cc

@ -140,8 +140,7 @@ std::pair<grpc_channel_stack*, grpc_error_handle> CreateChannelStack(
// Initialize stack. // Initialize stack.
grpc_error_handle error = grpc_channel_stack_init( grpc_error_handle error = grpc_channel_stack_init(
/*initial_refs=*/1, DestroyChannelStack, channel_stack, filters.data(), /*initial_refs=*/1, DestroyChannelStack, channel_stack, filters.data(),
filters.size(), args, /*optional_transport=*/nullptr, "DynamicFilters", filters.size(), args, "DynamicFilters", channel_stack);
channel_stack);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "error initializing client internal stack: %s", gpr_log(GPR_ERROR, "error initializing client internal stack: %s",
grpc_error_std_string(error).c_str()); grpc_error_std_string(error).c_str());

@ -32,6 +32,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
@ -489,12 +490,14 @@ static grpc_error_handle http_client_init_channel_elem(
channel_data* chand = static_cast<channel_data*>(elem->channel_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (chand) channel_data(); new (chand) channel_data();
GPR_ASSERT(!args->is_last); GPR_ASSERT(!args->is_last);
GPR_ASSERT(args->optional_transport != nullptr); auto* transport = grpc_channel_args_find_pointer<grpc_transport>(
args->channel_args, GRPC_ARG_TRANSPORT);
GPR_ASSERT(transport != nullptr);
chand->static_scheme = scheme_from_args(args->channel_args); chand->static_scheme = scheme_from_args(args->channel_args);
chand->max_payload_size_for_get = chand->max_payload_size_for_get =
max_payload_size_from_args(args->channel_args); max_payload_size_from_args(args->channel_args);
chand->user_agent = grpc_core::Slice(user_agent_from_args( chand->user_agent = grpc_core::Slice(
args->channel_args, args->optional_transport->vtable->name)); user_agent_from_args(args->channel_args, transport->vtable->name));
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }

@ -134,15 +134,17 @@ grpc_error_handle RbacFilter::Init(grpc_channel_element* elem,
if (auth_context == nullptr) { if (auth_context == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No auth context found"); return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No auth context found");
} }
if (args->optional_transport == nullptr) { auto* transport = grpc_channel_args_find_pointer<grpc_transport>(
args->channel_args, GRPC_ARG_TRANSPORT);
if (transport == nullptr) {
// This should never happen since the transport is always set on the server // This should never happen since the transport is always set on the server
// side. // side.
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No transport configured"); return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No transport configured");
} }
new (elem->channel_data) RbacFilter( new (elem->channel_data) RbacFilter(
grpc_channel_stack_filter_instance_number(args->channel_stack, elem), grpc_channel_stack_filter_instance_number(args->channel_stack, elem),
EvaluateArgs::PerChannelArgs( EvaluateArgs::PerChannelArgs(auth_context,
auth_context, grpc_transport_get_endpoint(args->optional_transport))); grpc_transport_get_endpoint(transport)));
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }

@ -104,8 +104,8 @@ grpc_call_element* grpc_call_stack_element(grpc_call_stack* call_stack,
grpc_error_handle grpc_channel_stack_init( grpc_error_handle grpc_channel_stack_init(
int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg, int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
const grpc_channel_filter** filters, size_t filter_count, const grpc_channel_filter** filters, size_t filter_count,
const grpc_channel_args* channel_args, grpc_transport* optional_transport, const grpc_channel_args* channel_args, const char* name,
const char* name, grpc_channel_stack* stack) { grpc_channel_stack* stack) {
if (grpc_trace_channel_stack.enabled()) { if (grpc_trace_channel_stack.enabled()) {
gpr_log(GPR_INFO, "CHANNEL_STACK: init %s", name); gpr_log(GPR_INFO, "CHANNEL_STACK: init %s", name);
for (size_t i = 0; i < filter_count; i++) { for (size_t i = 0; i < filter_count; i++) {
@ -134,7 +134,6 @@ grpc_error_handle grpc_channel_stack_init(
for (i = 0; i < filter_count; i++) { for (i = 0; i < filter_count; i++) {
args.channel_stack = stack; args.channel_stack = stack;
args.channel_args = channel_args; args.channel_args = channel_args;
args.optional_transport = optional_transport;
args.is_first = i == 0; args.is_first = i == 0;
args.is_last = i == (filter_count - 1); args.is_last = i == (filter_count - 1);
elems[i].filter = filters[i]; elems[i].filter = filters[i];

@ -68,11 +68,11 @@ typedef struct grpc_call_element grpc_call_element;
typedef struct grpc_channel_stack grpc_channel_stack; typedef struct grpc_channel_stack grpc_channel_stack;
typedef struct grpc_call_stack grpc_call_stack; typedef struct grpc_call_stack grpc_call_stack;
#define GRPC_ARG_TRANSPORT "grpc.internal.transport"
struct grpc_channel_element_args { struct grpc_channel_element_args {
grpc_channel_stack* channel_stack; grpc_channel_stack* channel_stack;
const grpc_channel_args* channel_args; const grpc_channel_args* channel_args;
/** Transport, iff it is known */
grpc_transport* optional_transport;
int is_first; int is_first;
int is_last; int is_last;
}; };
@ -237,8 +237,7 @@ size_t grpc_channel_stack_size(const grpc_channel_filter** filters,
grpc_error_handle grpc_channel_stack_init( grpc_error_handle grpc_channel_stack_init(
int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg, int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
const grpc_channel_filter** filters, size_t filter_count, const grpc_channel_filter** filters, size_t filter_count,
const grpc_channel_args* args, grpc_transport* optional_transport, const grpc_channel_args* args, const char* name, grpc_channel_stack* stack);
const char* name, grpc_channel_stack* stack);
/* Destroy a channel stack */ /* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_channel_stack* stack); void grpc_channel_stack_destroy(grpc_channel_stack* stack);

@ -25,6 +25,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
@ -80,10 +81,32 @@ grpc_error_handle ChannelStackBuilder::Build(size_t prefix_bytes,
// fetch a pointer to the channel stack // fetch a pointer to the channel stack
grpc_channel_stack* channel_stack = reinterpret_cast<grpc_channel_stack*>( grpc_channel_stack* channel_stack = reinterpret_cast<grpc_channel_stack*>(
static_cast<char*>(*result) + prefix_bytes); static_cast<char*>(*result) + prefix_bytes);
const grpc_channel_args* final_args;
if (transport_ != nullptr) {
static const grpc_arg_pointer_vtable vtable = {
// copy
[](void* p) { return p; },
// destroy
[](void*) {},
// cmp
[](void* a, void* b) { return QsortCompare(a, b); },
};
grpc_arg arg = grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_TRANSPORT), transport_, &vtable);
final_args = grpc_channel_args_copy_and_add(args_, &arg, 1);
} else {
final_args = args_;
}
// and initialize it // and initialize it
grpc_error_handle error = grpc_channel_stack_init( grpc_error_handle error = grpc_channel_stack_init(
initial_refs, destroy, destroy_arg == nullptr ? *result : destroy_arg, initial_refs, destroy, destroy_arg == nullptr ? *result : destroy_arg,
filters.data(), filters.size(), args_, transport_, name_, channel_stack); filters.data(), filters.size(), final_args, name_, channel_stack);
if (final_args != args_) {
grpc_channel_args_destroy(final_args);
}
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
grpc_channel_stack_destroy(channel_stack); grpc_channel_stack_destroy(channel_stack);

@ -112,7 +112,7 @@ static void test_create_channel_stack(void) {
GPR_ASSERT(GRPC_LOG_IF_ERROR( GPR_ASSERT(GRPC_LOG_IF_ERROR(
"grpc_channel_stack_init", "grpc_channel_stack_init",
grpc_channel_stack_init(1, free_channel, channel_stack, &filters, 1, grpc_channel_stack_init(1, free_channel, channel_stack, &filters, 1,
&chan_args, nullptr, "test", channel_stack))); &chan_args, "test", channel_stack)));
GPR_ASSERT(channel_stack->count == 1); GPR_ASSERT(channel_stack->count == 1);
channel_elem = grpc_channel_stack_element(channel_stack, 0); channel_elem = grpc_channel_stack_element(channel_stack, 0);
channel_data = static_cast<int*>(channel_elem->channel_data); channel_data = static_cast<int*>(channel_elem->channel_data);

@ -451,6 +451,19 @@ static const grpc_transport_vtable phony_transport_vtable = {0,
static grpc_transport phony_transport = {&phony_transport_vtable}; static grpc_transport phony_transport = {&phony_transport_vtable};
grpc_arg Arg() {
static const grpc_arg_pointer_vtable vtable = {
// copy
[](void* p) { return p; },
// destroy
[](void*) {},
// cmp
[](void* a, void* b) { return grpc_core::QsortCompare(a, b); },
};
return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_TRANSPORT),
&phony_transport, &vtable);
}
} // namespace phony_transport } // namespace phony_transport
class NoOp { class NoOp {
@ -508,7 +521,10 @@ static void BM_IsolatedFilter(benchmark::State& state) {
&fake_client_channel_factory), &fake_client_channel_factory),
StringArg(GRPC_ARG_SERVER_URI, "localhost"), StringArg(GRPC_ARG_SERVER_URI, "localhost"),
}; };
grpc_channel_args channel_args = {args.size(), &args[0]}; if (fixture.flags & REQUIRES_TRANSPORT) {
args.push_back(phony_transport::Arg());
}
grpc_channel_args channel_args = {args.size(), args.data()};
std::vector<const grpc_channel_filter*> filters; std::vector<const grpc_channel_filter*> filters;
if (fixture.filter != nullptr) { if (fixture.filter != nullptr) {
@ -528,11 +544,8 @@ static void BM_IsolatedFilter(benchmark::State& state) {
"channel_stack_init", "channel_stack_init",
grpc_channel_stack_init(1, FilterDestroy, channel_stack, grpc_channel_stack_init(1, FilterDestroy, channel_stack,
filters.empty() ? nullptr : &filters[0], filters.empty() ? nullptr : &filters[0],
filters.size(), &channel_args, filters.size(), &channel_args, "CHANNEL",
fixture.flags & REQUIRES_TRANSPORT channel_stack)));
? &phony_transport::phony_transport
: nullptr,
"CHANNEL", channel_stack)));
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
grpc_call_stack* call_stack = grpc_call_stack* call_stack =
static_cast<grpc_call_stack*>(gpr_zalloc(channel_stack->call_stack_size)); static_cast<grpc_call_stack*>(gpr_zalloc(channel_stack->call_stack_size));

Loading…
Cancel
Save