c++-ize channel stack builder (#28660)

* c++-ize channel stack builder

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* fixes

* comment

* move functions out of line

* Automated change: Fix sanity tests

* review feedback

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/28672/head
Craig Tiller 3 years ago committed by GitHub
parent e523825c75
commit e4107caf26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/ext/filters/client_channel/client_channel_plugin.cc
  2. 8
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 9
      src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
  4. 17
      src/core/ext/filters/client_channel/subchannel.cc
  5. 11
      src/core/ext/filters/client_idle/client_idle_filter.cc
  6. 8
      src/core/ext/filters/deadline/deadline_filter.cc
  7. 9
      src/core/ext/filters/http/client_authority_filter.cc
  8. 23
      src/core/ext/filters/http/http_filters_plugin.cc
  9. 11
      src/core/ext/filters/max_age/max_age_filter.cc
  10. 27
      src/core/ext/filters/message_size/message_size_filter.cc
  11. 31
      src/core/ext/xds/xds_channel_stack_modifier.cc
  12. 7
      src/core/ext/xds/xds_channel_stack_modifier.h
  13. 286
      src/core/lib/channel/channel_stack_builder.cc
  14. 237
      src/core/lib/channel/channel_stack_builder.h
  15. 12
      src/core/lib/channel/connected_channel.cc
  16. 2
      src/core/lib/channel/connected_channel.h
  17. 12
      src/core/lib/surface/builtins.cc
  18. 38
      src/core/lib/surface/channel.cc
  19. 2
      src/core/lib/surface/channel.h
  20. 2
      src/core/lib/surface/channel_init.cc
  21. 8
      src/core/lib/surface/channel_init.h
  22. 28
      src/core/lib/surface/init_secure.cc
  23. 9
      src/cpp/common/channel_filter.cc
  24. 34
      test/core/channel/channel_stack_builder_test.cc
  25. 18
      test/core/channel/minimal_stack_is_minimal_test.cc
  26. 7
      test/core/end2end/tests/filter_causes_close.cc
  27. 14
      test/core/end2end/tests/filter_context.cc
  28. 14
      test/core/end2end/tests/filter_init_fails.cc
  29. 14
      test/core/end2end/tests/filter_latency.cc
  30. 14
      test/core/end2end/tests/filter_status_code.cc
  31. 9
      test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc
  32. 9
      test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
  33. 12
      test/core/end2end/tests/retry_send_op_fails.cc
  34. 63
      test/core/xds/xds_channel_stack_modifier_test.cc
  35. 13
      test/cpp/microbenchmarks/bm_call_create.cc

@ -64,9 +64,9 @@ void BuildClientChannelConfiguration(CoreConfiguration::Builder* builder) {
RegisterHttpConnectHandshaker(builder); RegisterHttpConnectHandshaker(builder);
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](grpc_channel_stack_builder* builder) { [](ChannelStackBuilder* builder) {
return grpc_channel_stack_builder_append_filter( builder->AppendFilter(&ClientChannel::kFilterVtable, nullptr);
builder, &ClientChannel::kFilterVtable, nullptr, nullptr); return true;
}); });
} }

@ -1843,9 +1843,8 @@ namespace grpc_core {
void RegisterGrpcLbLoadReportingFilter(CoreConfiguration::Builder* builder) { void RegisterGrpcLbLoadReportingFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](grpc_channel_stack_builder* builder) { [](ChannelStackBuilder* builder) {
const grpc_channel_args* args = const grpc_channel_args* args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
const grpc_arg* channel_arg = const grpc_arg* channel_arg =
grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME); grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING && if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
@ -1855,8 +1854,7 @@ void RegisterGrpcLbLoadReportingFilter(CoreConfiguration::Builder* builder) {
// this filter at the very top of the subchannel stack, since that // this filter at the very top of the subchannel stack, since that
// will minimize the number of metadata elements that the filter // will minimize the number of metadata elements that the filter
// needs to iterate through to find the ClientStats object. // needs to iterate through to find the ClientStats object.
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&grpc_client_load_reporting_filter, nullptr);
builder, &grpc_client_load_reporting_filter, nullptr, nullptr);
} }
return true; return true;
}); });

@ -141,16 +141,15 @@ void RegisterServiceConfigChannelArgFilter(
CoreConfiguration::Builder* builder) { CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](grpc_channel_stack_builder* builder) { [](ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = const grpc_channel_args* channel_args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
if (grpc_channel_args_want_minimal_stack(channel_args) || if (grpc_channel_args_want_minimal_stack(channel_args) ||
grpc_channel_args_find_string(channel_args, grpc_channel_args_find_string(channel_args,
GRPC_ARG_SERVICE_CONFIG) == nullptr) { GRPC_ARG_SERVICE_CONFIG) == nullptr) {
return true; return true;
} }
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&ServiceConfigChannelArgFilter, nullptr);
builder, &ServiceConfigChannelArgFilter, nullptr, nullptr); return true;
}); });
} }

@ -964,21 +964,16 @@ void ConnectionDestroy(void* arg, grpc_error_handle /*error*/) {
bool Subchannel::PublishTransportLocked() { bool Subchannel::PublishTransportLocked() {
// Construct channel stack. // Construct channel stack.
grpc_channel_stack_builder* builder = ChannelStackBuilder builder("subchannel");
grpc_channel_stack_builder_create("subchannel"); builder.SetChannelArgs(connecting_result_.channel_args)
grpc_channel_stack_builder_set_channel_arguments( .SetTransport(connecting_result_.transport);
builder, connecting_result_.channel_args);
grpc_channel_stack_builder_set_transport(builder,
connecting_result_.transport);
if (!CoreConfiguration::Get().channel_init().CreateStack( if (!CoreConfiguration::Get().channel_init().CreateStack(
builder, GRPC_CLIENT_SUBCHANNEL)) { &builder, GRPC_CLIENT_SUBCHANNEL)) {
grpc_channel_stack_builder_destroy(builder);
return false; return false;
} }
grpc_channel_stack* stk; grpc_channel_stack* stk;
grpc_error_handle error = grpc_channel_stack_builder_finish( grpc_error_handle error = builder.Build(0, 1, ConnectionDestroy, nullptr,
builder, 0, 1, ConnectionDestroy, nullptr, reinterpret_cast<void**>(&stk));
reinterpret_cast<void**>(&stk));
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
grpc_transport_destroy(connecting_result_.transport); grpc_transport_destroy(connecting_result_.transport);
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,

@ -250,16 +250,13 @@ const grpc_channel_filter grpc_client_idle_filter = {
void RegisterClientIdleFilter(CoreConfiguration::Builder* builder) { void RegisterClientIdleFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](grpc_channel_stack_builder* builder) { [](ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = const grpc_channel_args* channel_args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
if (!grpc_channel_args_want_minimal_stack(channel_args) && if (!grpc_channel_args_want_minimal_stack(channel_args) &&
GetClientIdleTimeout(channel_args) != INT_MAX) { GetClientIdleTimeout(channel_args) != INT_MAX) {
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&grpc_client_idle_filter, nullptr);
builder, &grpc_client_idle_filter, nullptr, nullptr);
} else {
return true;
} }
return true;
}); });
} }
} // namespace grpc_core } // namespace grpc_core

@ -378,11 +378,9 @@ void RegisterDeadlineFilter(CoreConfiguration::Builder* builder) {
const grpc_channel_filter* filter) { const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[filter](grpc_channel_stack_builder* builder) { [filter](ChannelStackBuilder* builder) {
if (grpc_deadline_checking_enabled( if (grpc_deadline_checking_enabled(builder->channel_args())) {
grpc_channel_stack_builder_get_channel_arguments(builder))) { builder->PrependFilter(filter, nullptr);
return grpc_channel_stack_builder_prepend_filter(builder, filter,
nullptr, nullptr);
} }
return true; return true;
}); });

@ -73,9 +73,8 @@ namespace {
const grpc_channel_filter grpc_client_authority_filter = const grpc_channel_filter grpc_client_authority_filter =
MakePromiseBasedFilter<ClientAuthorityFilter>(); MakePromiseBasedFilter<ClientAuthorityFilter>();
bool add_client_authority_filter(grpc_channel_stack_builder* builder) { bool add_client_authority_filter(ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = const grpc_channel_args* channel_args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
const grpc_arg* disable_client_authority_filter_arg = grpc_channel_args_find( const grpc_arg* disable_client_authority_filter_arg = grpc_channel_args_find(
channel_args, GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER); channel_args, GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER);
if (disable_client_authority_filter_arg != nullptr) { if (disable_client_authority_filter_arg != nullptr) {
@ -85,8 +84,8 @@ bool add_client_authority_filter(grpc_channel_stack_builder* builder) {
return true; return true;
} }
} }
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&grpc_client_authority_filter, nullptr);
builder, &grpc_client_authority_filter, nullptr, nullptr); return true;
} }
} // namespace } // namespace

@ -30,8 +30,8 @@
#include "src/core/lib/transport/transport_impl.h" #include "src/core/lib/transport/transport_impl.h"
static bool is_building_http_like_transport( static bool is_building_http_like_transport(
grpc_channel_stack_builder* builder) { grpc_core::ChannelStackBuilder* builder) {
grpc_transport* t = grpc_channel_stack_builder_get_transport(builder); grpc_transport* t = builder->transport();
return t != nullptr && strstr(t->vtable->name, "http"); return t != nullptr && strstr(t->vtable->name, "http");
} }
@ -44,27 +44,26 @@ void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[enable_in_minimal_stack, control_channel_arg, [enable_in_minimal_stack, control_channel_arg,
filter](grpc_channel_stack_builder* builder) { filter](ChannelStackBuilder* builder) {
if (!is_building_http_like_transport(builder)) return true; if (!is_building_http_like_transport(builder)) return true;
const grpc_channel_args* channel_args = const grpc_channel_args* channel_args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
bool enable = grpc_channel_arg_get_bool( bool enable = grpc_channel_arg_get_bool(
grpc_channel_args_find(channel_args, control_channel_arg), grpc_channel_args_find(channel_args, control_channel_arg),
enable_in_minimal_stack || enable_in_minimal_stack ||
!grpc_channel_args_want_minimal_stack(channel_args)); !grpc_channel_args_want_minimal_stack(channel_args));
if (!enable) return true; if (enable) builder->PrependFilter(filter, nullptr);
return grpc_channel_stack_builder_prepend_filter(builder, filter, return true;
nullptr, nullptr);
}); });
}; };
auto required = [builder](grpc_channel_stack_type channel_type, auto required = [builder](grpc_channel_stack_type channel_type,
const grpc_channel_filter* filter) { const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[filter](grpc_channel_stack_builder* builder) { [filter](ChannelStackBuilder* builder) {
if (!is_building_http_like_transport(builder)) return true; if (is_building_http_like_transport(builder)) {
return grpc_channel_stack_builder_prepend_filter(builder, filter, builder->PrependFilter(filter, nullptr);
nullptr, nullptr); }
return true;
}); });
}; };
optional(GRPC_CLIENT_SUBCHANNEL, false, optional(GRPC_CLIENT_SUBCHANNEL, false,

@ -539,9 +539,8 @@ namespace grpc_core {
void RegisterMaxAgeFilter(CoreConfiguration::Builder* builder) { void RegisterMaxAgeFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](grpc_channel_stack_builder* builder) { [](ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = const grpc_channel_args* channel_args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
bool enable = grpc_channel_arg_get_integer( bool enable = grpc_channel_arg_get_integer(
grpc_channel_args_find( grpc_channel_args_find(
channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS), channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS),
@ -551,11 +550,9 @@ void RegisterMaxAgeFilter(CoreConfiguration::Builder* builder) {
channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS),
MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX; MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX;
if (enable) { if (enable) {
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&grpc_max_age_filter, nullptr);
builder, &grpc_max_age_filter, nullptr, nullptr);
} else {
return true;
} }
return true;
}); });
} }
} // namespace grpc_core } // namespace grpc_core

@ -344,21 +344,20 @@ const grpc_channel_filter grpc_message_size_filter = {
// Used for GRPC_CLIENT_SUBCHANNEL // Used for GRPC_CLIENT_SUBCHANNEL
static bool maybe_add_message_size_filter_subchannel( static bool maybe_add_message_size_filter_subchannel(
grpc_channel_stack_builder* builder) { grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = const grpc_channel_args* channel_args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
if (grpc_channel_args_want_minimal_stack(channel_args)) { if (grpc_channel_args_want_minimal_stack(channel_args)) {
return true; return true;
} }
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&grpc_message_size_filter, nullptr);
builder, &grpc_message_size_filter, nullptr, nullptr); return true;
} }
// Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the filter // Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the filter
// only if message size limits or service config is specified. // only if message size limits or service config is specified.
static bool maybe_add_message_size_filter(grpc_channel_stack_builder* builder) { static bool maybe_add_message_size_filter(
const grpc_channel_args* channel_args = grpc_core::ChannelStackBuilder* builder) {
grpc_channel_stack_builder_get_channel_arguments(builder); const grpc_channel_args* channel_args = builder->channel_args();
if (grpc_channel_args_want_minimal_stack(channel_args)) { if (grpc_channel_args_want_minimal_stack(channel_args)) {
return true; return true;
} }
@ -371,15 +370,9 @@ static bool maybe_add_message_size_filter(grpc_channel_stack_builder* builder) {
const grpc_arg* a = const grpc_arg* a =
grpc_channel_args_find(channel_args, GRPC_ARG_SERVICE_CONFIG); grpc_channel_args_find(channel_args, GRPC_ARG_SERVICE_CONFIG);
const char* svc_cfg_str = grpc_channel_arg_get_string(a); const char* svc_cfg_str = grpc_channel_arg_get_string(a);
if (svc_cfg_str != nullptr) { if (svc_cfg_str != nullptr) enable = true;
enable = true; if (enable) builder->PrependFilter(&grpc_message_size_filter, nullptr);
} return true;
if (enable) {
return grpc_channel_stack_builder_prepend_filter(
builder, &grpc_message_size_filter, nullptr, nullptr);
} else {
return true;
}
} }
void grpc_message_size_filter_init(void) { void grpc_message_size_filter_init(void) {

@ -49,35 +49,32 @@ const char* kXdsChannelStackModifierChannelArgName =
} // namespace } // namespace
bool XdsChannelStackModifier::ModifyChannelStack( bool XdsChannelStackModifier::ModifyChannelStack(ChannelStackBuilder* builder) {
grpc_channel_stack_builder* builder) {
// Insert the filters after the census filter if present. // Insert the filters after the census filter if present.
grpc_channel_stack_builder_iterator* it = auto it = builder->mutable_stack()->begin();
grpc_channel_stack_builder_create_iterator_at_first(builder); while (it != builder->mutable_stack()->end()) {
while (grpc_channel_stack_builder_move_next(it)) { const char* filter_name_at_it = it->filter->name;
if (grpc_channel_stack_builder_iterator_is_end(it)) break;
const char* filter_name_at_it =
grpc_channel_stack_builder_iterator_filter_name(it);
if (strcmp("census_server", filter_name_at_it) == 0 || if (strcmp("census_server", filter_name_at_it) == 0 ||
strcmp("opencensus_server", filter_name_at_it) == 0) { strcmp("opencensus_server", filter_name_at_it) == 0) {
break; break;
} }
++it;
} }
if (grpc_channel_stack_builder_iterator_is_end(it)) { if (it == builder->mutable_stack()->end()) {
// No census filter found. Reset iterator to the beginning. This will result // No census filter found. Reset iterator to the beginning. This will result
// in prepending the list of xDS HTTP filters to the current stack. Note // in prepending the list of xDS HTTP filters to the current stack. Note
// that this stage is run before the stage that adds the top server filter, // that this stage is run before the stage that adds the top server filter,
// resulting in these filters being finally placed after the `server` // resulting in these filters being finally placed after the `server`
// filter. // filter.
grpc_channel_stack_builder_iterator_destroy(it); it = builder->mutable_stack()->begin();
it = grpc_channel_stack_builder_create_iterator_at_first(builder); } else {
++it;
} }
GPR_ASSERT(grpc_channel_stack_builder_move_next(it));
for (const grpc_channel_filter* filter : filters_) { for (const grpc_channel_filter* filter : filters_) {
GPR_ASSERT(grpc_channel_stack_builder_add_filter_before(it, filter, nullptr, it = builder->mutable_stack()->insert(
nullptr)); it, ChannelStackBuilder::StackEntry{filter, nullptr});
++it;
} }
grpc_channel_stack_builder_iterator_destroy(it);
return true; return true;
} }
@ -98,10 +95,10 @@ XdsChannelStackModifier::GetFromChannelArgs(const grpc_channel_args& args) {
void RegisterXdsChannelStackModifier(CoreConfiguration::Builder* builder) { void RegisterXdsChannelStackModifier(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX, [](grpc_channel_stack_builder* builder) { GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
RefCountedPtr<XdsChannelStackModifier> channel_stack_modifier = RefCountedPtr<XdsChannelStackModifier> channel_stack_modifier =
XdsChannelStackModifier::GetFromChannelArgs( XdsChannelStackModifier::GetFromChannelArgs(
*grpc_channel_stack_builder_get_channel_arguments(builder)); *builder->channel_args());
if (channel_stack_modifier != nullptr) { if (channel_stack_modifier != nullptr) {
return channel_stack_modifier->ModifyChannelStack(builder); return channel_stack_modifier->ModifyChannelStack(builder);
} }

@ -30,15 +30,16 @@
namespace grpc_core { namespace grpc_core {
// XdsChannelStackModifier allows for inserting xDS HTTP filters into the // XdsChannelStackModifier allows for inserting xDS HTTP filters into the
// channel stack. It is registered to mutate the `grpc_channel_stack_builder` // channel stack. It is registered to mutate the
// object via ChannelInit::Builder::RegisterStage. // `ChannelStackBuilder` object via
// ChannelInit::Builder::RegisterStage.
class XdsChannelStackModifier : public RefCounted<XdsChannelStackModifier> { class XdsChannelStackModifier : public RefCounted<XdsChannelStackModifier> {
public: public:
explicit XdsChannelStackModifier( explicit XdsChannelStackModifier(
std::vector<const grpc_channel_filter*> filters) std::vector<const grpc_channel_filter*> filters)
: filters_(std::move(filters)) {} : filters_(std::move(filters)) {}
// Returns true on success, false otherwise. // Returns true on success, false otherwise.
bool ModifyChannelStack(grpc_channel_stack_builder* builder); bool ModifyChannelStack(ChannelStackBuilder* builder);
grpc_arg MakeChannelArg() const; grpc_arg MakeChannelArg() const;
static RefCountedPtr<XdsChannelStackModifier> GetFromChannelArgs( static RefCountedPtr<XdsChannelStackModifier> GetFromChannelArgs(
const grpc_channel_args& args); const grpc_channel_args& args);

@ -25,252 +25,55 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
typedef struct filter_node { namespace grpc_core {
struct filter_node* next;
struct filter_node* prev;
const grpc_channel_filter* filter;
grpc_post_filter_create_init_func init;
void* init_arg;
} filter_node;
struct grpc_channel_stack_builder {
// sentinel nodes for filters that have been added
filter_node begin;
filter_node end;
// various set/get-able parameters
grpc_channel_args* args;
grpc_transport* transport;
char* target;
const char* name;
};
struct grpc_channel_stack_builder_iterator {
grpc_channel_stack_builder* builder;
filter_node* node;
};
grpc_channel_stack_builder* grpc_channel_stack_builder_create(
const char* name) {
grpc_channel_stack_builder* b =
grpc_core::Zalloc<grpc_channel_stack_builder>();
b->begin.filter = nullptr;
b->end.filter = nullptr;
b->begin.next = &b->end;
b->begin.prev = &b->end;
b->end.next = &b->begin;
b->end.prev = &b->begin;
b->name = name;
return b;
}
void grpc_channel_stack_builder_set_target(grpc_channel_stack_builder* b,
const char* target) {
gpr_free(b->target);
b->target = gpr_strdup(target);
}
std::string grpc_channel_stack_builder_get_target(
grpc_channel_stack_builder* b) {
return b->target == nullptr ? std::string("unknown") : std::string(b->target);
}
static grpc_channel_stack_builder_iterator* create_iterator_at_filter_node(
grpc_channel_stack_builder* builder, filter_node* node) {
grpc_channel_stack_builder_iterator* it =
static_cast<grpc_channel_stack_builder_iterator*>(
gpr_malloc(sizeof(*it)));
it->builder = builder;
it->node = node;
return it;
}
void grpc_channel_stack_builder_iterator_destroy(
grpc_channel_stack_builder_iterator* it) {
gpr_free(it);
}
grpc_channel_stack_builder_iterator*
grpc_channel_stack_builder_create_iterator_at_first(
grpc_channel_stack_builder* builder) {
return create_iterator_at_filter_node(builder, &builder->begin);
}
grpc_channel_stack_builder_iterator*
grpc_channel_stack_builder_create_iterator_at_last(
grpc_channel_stack_builder* builder) {
return create_iterator_at_filter_node(builder, &builder->end);
}
bool grpc_channel_stack_builder_iterator_is_end(
grpc_channel_stack_builder_iterator* iterator) {
return iterator->node == &iterator->builder->end;
}
const char* grpc_channel_stack_builder_iterator_filter_name(
grpc_channel_stack_builder_iterator* iterator) {
if (iterator->node->filter == nullptr) return nullptr;
return iterator->node->filter->name;
}
bool grpc_channel_stack_builder_move_next(
grpc_channel_stack_builder_iterator* iterator) {
if (iterator->node == &iterator->builder->end) return false;
iterator->node = iterator->node->next;
return true;
}
bool grpc_channel_stack_builder_move_prev(
grpc_channel_stack_builder_iterator* iterator) {
if (iterator->node == &iterator->builder->begin) return false;
iterator->node = iterator->node->prev;
return true;
}
grpc_channel_stack_builder_iterator* grpc_channel_stack_builder_iterator_find(
grpc_channel_stack_builder* builder, const char* filter_name) {
GPR_ASSERT(filter_name != nullptr);
grpc_channel_stack_builder_iterator* it =
grpc_channel_stack_builder_create_iterator_at_first(builder);
while (grpc_channel_stack_builder_move_next(it)) {
if (grpc_channel_stack_builder_iterator_is_end(it)) break;
const char* filter_name_at_it =
grpc_channel_stack_builder_iterator_filter_name(it);
if (strcmp(filter_name, filter_name_at_it) == 0) break;
}
return it;
}
bool grpc_channel_stack_builder_move_prev( ChannelStackBuilder::~ChannelStackBuilder() {
grpc_channel_stack_builder_iterator* iterator); grpc_channel_args_destroy(args_);
void grpc_channel_stack_builder_set_channel_arguments(
grpc_channel_stack_builder* builder, const grpc_channel_args* args) {
if (builder->args != nullptr) {
grpc_channel_args_destroy(builder->args);
}
builder->args = grpc_channel_args_copy(args);
} }
const grpc_channel_args* grpc_channel_stack_builder_get_channel_arguments( ChannelStackBuilder& ChannelStackBuilder::SetTarget(const char* target) {
grpc_channel_stack_builder* builder) { if (target == nullptr) {
return builder->args; target_.clear();
} } else {
target_ = target;
void grpc_channel_stack_builder_set_transport(
grpc_channel_stack_builder* builder, grpc_transport* transport) {
GPR_ASSERT(builder->transport == nullptr);
builder->transport = transport;
}
grpc_transport* grpc_channel_stack_builder_get_transport(
grpc_channel_stack_builder* builder) {
return builder->transport;
}
bool grpc_channel_stack_builder_append_filter(
grpc_channel_stack_builder* builder, const grpc_channel_filter* filter,
grpc_post_filter_create_init_func post_init_func, void* user_data) {
grpc_channel_stack_builder_iterator* it =
grpc_channel_stack_builder_create_iterator_at_last(builder);
bool ok = grpc_channel_stack_builder_add_filter_before(
it, filter, post_init_func, user_data);
grpc_channel_stack_builder_iterator_destroy(it);
return ok;
}
bool grpc_channel_stack_builder_remove_filter(
grpc_channel_stack_builder* builder, const char* filter_name) {
grpc_channel_stack_builder_iterator* it =
grpc_channel_stack_builder_iterator_find(builder, filter_name);
if (grpc_channel_stack_builder_iterator_is_end(it)) {
grpc_channel_stack_builder_iterator_destroy(it);
return false;
} }
it->node->prev->next = it->node->next; return *this;
it->node->next->prev = it->node->prev;
gpr_free(it->node);
grpc_channel_stack_builder_iterator_destroy(it);
return true;
} }
bool grpc_channel_stack_builder_prepend_filter( ChannelStackBuilder& ChannelStackBuilder::SetChannelArgs(
grpc_channel_stack_builder* builder, const grpc_channel_filter* filter, const grpc_channel_args* args) {
grpc_post_filter_create_init_func post_init_func, void* user_data) { grpc_channel_args_destroy(args_);
grpc_channel_stack_builder_iterator* it = args_ = grpc_channel_args_copy(args);
grpc_channel_stack_builder_create_iterator_at_first(builder); return *this;
bool ok = grpc_channel_stack_builder_add_filter_after(
it, filter, post_init_func, user_data);
grpc_channel_stack_builder_iterator_destroy(it);
return ok;
} }
static void add_after(filter_node* before, const grpc_channel_filter* filter, void ChannelStackBuilder::PrependFilter(const grpc_channel_filter* filter,
grpc_post_filter_create_init_func post_init_func, PostInitFunc post_init) {
void* user_data) { stack_.insert(stack_.begin(), {filter, std::move(post_init)});
filter_node* new_node =
static_cast<filter_node*>(gpr_malloc(sizeof(*new_node)));
new_node->next = before->next;
new_node->prev = before;
new_node->next->prev = new_node->prev->next = new_node;
new_node->filter = filter;
new_node->init = post_init_func;
new_node->init_arg = user_data;
} }
bool grpc_channel_stack_builder_add_filter_before( void ChannelStackBuilder::AppendFilter(const grpc_channel_filter* filter,
grpc_channel_stack_builder_iterator* iterator, PostInitFunc post_init) {
const grpc_channel_filter* filter, stack_.push_back({filter, std::move(post_init)});
grpc_post_filter_create_init_func post_init_func, void* user_data) {
if (iterator->node == &iterator->builder->begin) return false;
add_after(iterator->node->prev, filter, post_init_func, user_data);
return true;
} }
bool grpc_channel_stack_builder_add_filter_after( grpc_error_handle ChannelStackBuilder::Build(size_t prefix_bytes,
grpc_channel_stack_builder_iterator* iterator, int initial_refs,
const grpc_channel_filter* filter, grpc_iomgr_cb_func destroy,
grpc_post_filter_create_init_func post_init_func, void* user_data) { void* destroy_arg, void** result) {
if (iterator->node == &iterator->builder->end) return false;
add_after(iterator->node, filter, post_init_func, user_data);
return true;
}
void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder* builder) {
filter_node* p = builder->begin.next;
while (p != &builder->end) {
filter_node* next = p->next;
gpr_free(p);
p = next;
}
if (builder->args != nullptr) {
grpc_channel_args_destroy(builder->args);
}
gpr_free(builder->target);
gpr_free(builder);
}
grpc_error_handle grpc_channel_stack_builder_finish(
grpc_channel_stack_builder* builder, size_t prefix_bytes, int initial_refs,
grpc_iomgr_cb_func destroy, void* destroy_arg, void** result) {
// count the number of filters
size_t num_filters = 0;
for (filter_node* p = builder->begin.next; p != &builder->end; p = p->next) {
num_filters++;
}
// create an array of filters // create an array of filters
const grpc_channel_filter** filters = std::vector<const grpc_channel_filter*> filters;
static_cast<const grpc_channel_filter**>( filters.reserve(stack_.size());
gpr_malloc(sizeof(*filters) * num_filters)); for (const auto& elem : stack_) {
size_t i = 0; filters.push_back(elem.filter);
for (filter_node* p = builder->begin.next; p != &builder->end; p = p->next) {
filters[i++] = p->filter;
} }
// calculate the size of the channel stack // calculate the size of the channel stack
size_t channel_stack_size = grpc_channel_stack_size(filters, num_filters); size_t channel_stack_size =
grpc_channel_stack_size(filters.data(), filters.size());
// allocate memory, with prefix_bytes followed by channel_stack_size // allocate memory, with prefix_bytes followed by channel_stack_size
*result = gpr_zalloc(prefix_bytes + channel_stack_size); *result = gpr_zalloc(prefix_bytes + channel_stack_size);
@ -280,28 +83,25 @@ grpc_error_handle grpc_channel_stack_builder_finish(
// and initialize it // and initialize it
grpc_error_handle error = grpc_channel_stack_init( grpc_error_handle error = grpc_channel_stack_init(
initial_refs, destroy, destroy_arg == nullptr ? *result : destroy_arg, initial_refs, destroy, destroy_arg == nullptr ? *result : destroy_arg,
filters, num_filters, builder->args, builder->transport, builder->name, filters.data(), filters.size(), args_, transport_, name_.c_str(),
channel_stack); channel_stack);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
grpc_channel_stack_destroy(channel_stack); grpc_channel_stack_destroy(channel_stack);
gpr_free(*result); gpr_free(*result);
*result = nullptr; *result = nullptr;
} else { return error;
// run post-initialization functions
i = 0;
for (filter_node* p = builder->begin.next; p != &builder->end;
p = p->next) {
if (p->init != nullptr) {
p->init(channel_stack, grpc_channel_stack_element(channel_stack, i),
p->init_arg);
}
i++;
}
} }
grpc_channel_stack_builder_destroy(builder); // run post-initialization functions
gpr_free(const_cast<grpc_channel_filter**>(filters)); for (size_t i = 0; i < filters.size(); i++) {
if (stack_[i].post_init != nullptr) {
stack_[i].post_init(channel_stack,
grpc_channel_stack_element(channel_stack, i));
}
}
return error; return GRPC_ERROR_NONE;
} }
} // namespace grpc_core

@ -1,20 +1,16 @@
/* // Copyright 2016 gRPC authors.
* //
* Copyright 2016 gRPC authors. // Licensed under the Apache License, Version 2.0 (the "License");
* // you may not use this file except in compliance with the License.
* Licensed under the Apache License, Version 2.0 (the "License"); // You may obtain a copy of the License at
* you may not use this file except in compliance with the License. //
* You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0
* //
* http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software
* // distributed under the License is distributed on an "AS IS" BASIS,
* Unless required by applicable law or agreed to in writing, software // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H #ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H
#define GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H #define GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H
@ -26,130 +22,83 @@
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
/// grpc_channel_stack_builder offers a programmatic interface to selected namespace grpc_core {
/// and order channel filters
typedef struct grpc_channel_stack_builder grpc_channel_stack_builder; // Build a channel stack.
typedef struct grpc_channel_stack_builder_iterator // Allows interested parties to add filters to the stack, and to query an
grpc_channel_stack_builder_iterator; // in-progress build.
// Carries some useful context for the channel stack, such as a target string
/// Create a new channel stack builder. // and a transport.
/// \a name must be statically allocated. class ChannelStackBuilder {
grpc_channel_stack_builder* grpc_channel_stack_builder_create(const char* name); public:
// A function that will be called after the channel stack is successfully
/// Set the target uri // built.
void grpc_channel_stack_builder_set_target(grpc_channel_stack_builder* b, using PostInitFunc = std::function<void(grpc_channel_stack* channel_stack,
const char* target); grpc_channel_element* elem)>;
std::string grpc_channel_stack_builder_get_target( // One filter in the currently building stack.
grpc_channel_stack_builder* b); struct StackEntry {
const grpc_channel_filter* filter;
/// Attach \a transport to the builder (does not take ownership) PostInitFunc post_init;
void grpc_channel_stack_builder_set_transport( };
grpc_channel_stack_builder* builder, grpc_transport* transport);
// Initialize with a name.
/// Fetch attached transport explicit ChannelStackBuilder(std::string name) : name_(std::move(name)) {}
grpc_transport* grpc_channel_stack_builder_get_transport(
grpc_channel_stack_builder* builder); ~ChannelStackBuilder();
/// Set channel arguments: copies args // Set the target string.
void grpc_channel_stack_builder_set_channel_arguments( ChannelStackBuilder& SetTarget(const char* target);
grpc_channel_stack_builder* builder, const grpc_channel_args* args);
// Query the target.
/// Return a borrowed pointer to the channel arguments absl::string_view target() const { return target_; }
const grpc_channel_args* grpc_channel_stack_builder_get_channel_arguments(
grpc_channel_stack_builder* builder); // Set the transport.
ChannelStackBuilder& SetTransport(grpc_transport* transport) {
/// Begin iterating over already defined filters in the builder at the beginning GPR_ASSERT(transport_ == nullptr);
grpc_channel_stack_builder_iterator* transport_ = transport;
grpc_channel_stack_builder_create_iterator_at_first( return *this;
grpc_channel_stack_builder* builder); }
/// Begin iterating over already defined filters in the builder at the end // Query the transport.
grpc_channel_stack_builder_iterator* grpc_transport* transport() const { return transport_; }
grpc_channel_stack_builder_create_iterator_at_last(
grpc_channel_stack_builder* builder); // Set channel args (takes a copy of them).
ChannelStackBuilder& SetChannelArgs(const grpc_channel_args* args);
/// Is an iterator at the first element?
bool grpc_channel_stack_builder_iterator_is_first( // Query the channel args.
grpc_channel_stack_builder_iterator* iterator); const grpc_channel_args* channel_args() const { return args_; }
/// Is an iterator at the end? // Mutable vector of proposed stack entries.
bool grpc_channel_stack_builder_iterator_is_end( std::vector<StackEntry>* mutable_stack() { return &stack_; }
grpc_channel_stack_builder_iterator* iterator);
// Helper to add a filter to the front of the stack.
/// What is the name of the filter at this iterator position? void PrependFilter(const grpc_channel_filter* filter, PostInitFunc post_init);
const char* grpc_channel_stack_builder_iterator_filter_name(
grpc_channel_stack_builder_iterator* iterator); // Helper to add a filter to the end of the stack.
void AppendFilter(const grpc_channel_filter* filter, PostInitFunc post_init);
/// Move an iterator to the next item
bool grpc_channel_stack_builder_move_next( // Build the channel stack.
grpc_channel_stack_builder_iterator* iterator); // After success, *result holds the new channel stack,
// prefix_bytes are allocated before the channel stack,
/// Move an iterator to the previous item // initial_refs, destroy, destroy_arg are as per grpc_channel_stack_init
bool grpc_channel_stack_builder_move_prev( // On failure, *result is nullptr.
grpc_channel_stack_builder_iterator* iterator); grpc_error_handle Build(size_t prefix_bytes, int initial_refs,
grpc_iomgr_cb_func destroy, void* destroy_arg,
/// Return an iterator at \a filter_name, or at the end of the list if not void** result);
/// found.
grpc_channel_stack_builder_iterator* grpc_channel_stack_builder_iterator_find( private:
grpc_channel_stack_builder* builder, const char* filter_name); // The name of the stack
const std::string name_;
typedef void (*grpc_post_filter_create_init_func)( // The target
grpc_channel_stack* channel_stack, grpc_channel_element* elem, void* arg); std::string target_;
// The transport
/// Add \a filter to the stack, after \a iterator. grpc_transport* transport_ = nullptr;
/// Call \a post_init_func(..., \a user_data) once the channel stack is // Channel args
/// created. const grpc_channel_args* args_ = nullptr;
bool grpc_channel_stack_builder_add_filter_after( // The in-progress stack
grpc_channel_stack_builder_iterator* iterator, std::vector<StackEntry> stack_;
const grpc_channel_filter* filter, };
grpc_post_filter_create_init_func post_init_func, } // namespace grpc_core
void* user_data) GRPC_MUST_USE_RESULT;
#endif // GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H
/// Add \a filter to the stack, before \a iterator.
/// Call \a post_init_func(..., \a user_data) once the channel stack is
/// created.
bool grpc_channel_stack_builder_add_filter_before(
grpc_channel_stack_builder_iterator* iterator,
const grpc_channel_filter* filter,
grpc_post_filter_create_init_func post_init_func,
void* user_data) GRPC_MUST_USE_RESULT;
/// Add \a filter to the beginning of the filter list.
/// Call \a post_init_func(..., \a user_data) once the channel stack is
/// created.
bool grpc_channel_stack_builder_prepend_filter(
grpc_channel_stack_builder* builder, const grpc_channel_filter* filter,
grpc_post_filter_create_init_func post_init_func,
void* user_data) GRPC_MUST_USE_RESULT;
/// Add \a filter to the end of the filter list.
/// Call \a post_init_func(..., \a user_data) once the channel stack is
/// created.
bool grpc_channel_stack_builder_append_filter(
grpc_channel_stack_builder* builder, const grpc_channel_filter* filter,
grpc_post_filter_create_init_func post_init_func,
void* user_data) GRPC_MUST_USE_RESULT;
/// Remove any filter whose name is \a filter_name from \a builder. Returns true
/// if \a filter_name was not found.
bool grpc_channel_stack_builder_remove_filter(
grpc_channel_stack_builder* builder, const char* filter_name);
/// Terminate iteration and destroy \a iterator
void grpc_channel_stack_builder_iterator_destroy(
grpc_channel_stack_builder_iterator* iterator);
/// Destroy the builder, return the freshly minted channel stack in \a result.
/// Allocates \a prefix_bytes bytes before the channel stack
/// Returns the base pointer of the allocated block
/// \a initial_refs, \a destroy, \a destroy_arg are as per
/// grpc_channel_stack_init
grpc_error_handle grpc_channel_stack_builder_finish(
grpc_channel_stack_builder* builder, size_t prefix_bytes, int initial_refs,
grpc_iomgr_cb_func destroy, void* destroy_arg, void** result);
/// Destroy the builder without creating a channel stack
void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder* builder);
#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H */

@ -233,11 +233,15 @@ static void bind_transport(grpc_channel_stack* channel_stack,
grpc_transport_stream_size(static_cast<grpc_transport*>(t)); grpc_transport_stream_size(static_cast<grpc_transport*>(t));
} }
bool grpc_add_connected_filter(grpc_channel_stack_builder* builder) { bool grpc_add_connected_filter(grpc_core::ChannelStackBuilder* builder) {
grpc_transport* t = grpc_channel_stack_builder_get_transport(builder); grpc_transport* t = builder->transport();
GPR_ASSERT(t != nullptr); GPR_ASSERT(t != nullptr);
return grpc_channel_stack_builder_append_filter( builder->AppendFilter(
builder, &grpc_connected_filter, bind_transport, t); &grpc_connected_filter,
[t](grpc_channel_stack* stk, grpc_channel_element* elem) {
bind_transport(stk, elem, t);
});
return true;
} }
grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem) { grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem) {

@ -25,7 +25,7 @@
extern const grpc_channel_filter grpc_connected_filter; extern const grpc_channel_filter grpc_connected_filter;
bool grpc_add_connected_filter(grpc_channel_stack_builder* builder); bool grpc_add_connected_filter(grpc_core::ChannelStackBuilder* builder);
/* Debug helper to dig the transport stream out of a call element */ /* Debug helper to dig the transport stream out of a call element */
grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem); grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem);

@ -35,14 +35,14 @@ void RegisterBuiltins(CoreConfiguration::Builder* builder) {
grpc_add_connected_filter); grpc_add_connected_filter);
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_CLIENT_LAME_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, GRPC_CLIENT_LAME_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](grpc_channel_stack_builder* builder) { [](ChannelStackBuilder* builder) {
return grpc_channel_stack_builder_append_filter( builder->AppendFilter(&grpc_lame_filter, nullptr);
builder, &grpc_lame_filter, nullptr, nullptr); return true;
}); });
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX, [](grpc_channel_stack_builder* builder) { GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&Server::kServerTopFilter, nullptr);
builder, &Server::kServerTopFilter, nullptr, nullptr); return true;
}); });
} }

@ -58,21 +58,20 @@
static void destroy_channel(void* arg, grpc_error_handle error); static void destroy_channel(void* arg, grpc_error_handle error);
grpc_channel* grpc_channel_create_with_builder( grpc_channel* grpc_channel_create_with_builder(
grpc_channel_stack_builder* builder, grpc_core::ChannelStackBuilder* builder,
grpc_channel_stack_type channel_stack_type, grpc_error_handle* error) { grpc_channel_stack_type channel_stack_type, grpc_error_handle* error) {
std::string target = grpc_channel_stack_builder_get_target(builder); std::string target(builder->target());
grpc_channel_args* args = grpc_channel_args_copy( grpc_channel_args* args = grpc_channel_args_copy(builder->channel_args());
grpc_channel_stack_builder_get_channel_arguments(builder));
grpc_channel* channel; grpc_channel* channel;
if (channel_stack_type == GRPC_SERVER_CHANNEL) { if (channel_stack_type == GRPC_SERVER_CHANNEL) {
GRPC_STATS_INC_SERVER_CHANNELS_CREATED(); GRPC_STATS_INC_SERVER_CHANNELS_CREATED();
} else { } else {
GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(); GRPC_STATS_INC_CLIENT_CHANNELS_CREATED();
} }
std::string name = grpc_channel_stack_builder_get_target(builder); std::string name(builder->target());
grpc_error_handle builder_error = grpc_channel_stack_builder_finish( grpc_error_handle builder_error =
builder, sizeof(grpc_channel), 1, destroy_channel, nullptr, builder->Build(sizeof(grpc_channel), 1, destroy_channel, nullptr,
reinterpret_cast<void**>(&channel)); reinterpret_cast<void**>(&channel));
if (builder_error != GRPC_ERROR_NONE) { if (builder_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "channel stack builder failed: %s", gpr_log(GPR_ERROR, "channel stack builder failed: %s",
grpc_error_std_string(builder_error).c_str()); grpc_error_std_string(builder_error).c_str());
@ -187,9 +186,8 @@ int channelz_node_cmp(void* p1, void* p2) {
const grpc_arg_pointer_vtable channelz_node_arg_vtable = { const grpc_arg_pointer_vtable channelz_node_arg_vtable = {
channelz_node_copy, channelz_node_destroy, channelz_node_cmp}; channelz_node_copy, channelz_node_destroy, channelz_node_cmp};
void CreateChannelzNode(grpc_channel_stack_builder* builder) { void CreateChannelzNode(grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* args = const grpc_channel_args* args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
// Check whether channelz is enabled. // Check whether channelz is enabled.
const bool channelz_enabled = grpc_channel_args_find_bool( const bool channelz_enabled = grpc_channel_args_find_bool(
args, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT); args, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT);
@ -201,7 +199,7 @@ void CreateChannelzNode(grpc_channel_stack_builder* builder) {
const bool is_internal_channel = grpc_channel_args_find_bool( const bool is_internal_channel = grpc_channel_args_find_bool(
args, GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, false); args, GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, false);
// Create the channelz node. // Create the channelz node.
std::string target = grpc_channel_stack_builder_get_target(builder); std::string target(builder->target());
grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_node = grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_node =
grpc_core::MakeRefCounted<grpc_core::channelz::ChannelNode>( grpc_core::MakeRefCounted<grpc_core::channelz::ChannelNode>(
target.c_str(), channel_tracer_max_memory, is_internal_channel); target.c_str(), channel_tracer_max_memory, is_internal_channel);
@ -216,7 +214,7 @@ void CreateChannelzNode(grpc_channel_stack_builder* builder) {
const char* args_to_remove[] = {GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL}; const char* args_to_remove[] = {GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
grpc_channel_stack_builder_set_channel_arguments(builder, new_args); builder->SetChannelArgs(new_args);
grpc_channel_args_destroy(new_args); grpc_channel_args_destroy(new_args);
} }
@ -244,7 +242,7 @@ grpc_channel* grpc_channel_create(const char* target,
// grpc_shutdown() when the channel is actually destroyed, thus // grpc_shutdown() when the channel is actually destroyed, thus
// ensuring that shutdown is deferred until that point. // ensuring that shutdown is deferred until that point.
grpc_init(); grpc_init();
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create( grpc_core::ChannelStackBuilder builder(
grpc_channel_stack_type_string(channel_stack_type)); grpc_channel_stack_type_string(channel_stack_type));
const grpc_core::UniquePtr<char> default_authority = const grpc_core::UniquePtr<char> default_authority =
get_default_authority(input_args); get_default_authority(input_args);
@ -257,23 +255,21 @@ grpc_channel* grpc_channel_create(const char* target,
args = channel_args_mutator(target, args, channel_stack_type); args = channel_args_mutator(target, args, channel_stack_type);
} }
} }
grpc_channel_stack_builder_set_channel_arguments(builder, args); builder.SetChannelArgs(args).SetTarget(target).SetTransport(
optional_transport);
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args);
grpc_channel_stack_builder_set_target(builder, target);
grpc_channel_stack_builder_set_transport(builder, optional_transport);
if (!grpc_core::CoreConfiguration::Get().channel_init().CreateStack( if (!grpc_core::CoreConfiguration::Get().channel_init().CreateStack(
builder, channel_stack_type)) { &builder, channel_stack_type)) {
grpc_channel_stack_builder_destroy(builder);
grpc_shutdown(); // Since we won't call destroy_channel(). grpc_shutdown(); // Since we won't call destroy_channel().
return nullptr; return nullptr;
} }
// We only need to do this for clients here. For servers, this will be // We only need to do this for clients here. For servers, this will be
// done in src/core/lib/surface/server.cc. // done in src/core/lib/surface/server.cc.
if (grpc_channel_stack_type_is_client(channel_stack_type)) { if (grpc_channel_stack_type_is_client(channel_stack_type)) {
CreateChannelzNode(builder); CreateChannelzNode(&builder);
} }
grpc_channel* channel = grpc_channel* channel =
grpc_channel_create_with_builder(builder, channel_stack_type, error); grpc_channel_create_with_builder(&builder, channel_stack_type, error);
if (channel == nullptr) { if (channel == nullptr) {
grpc_shutdown(); // Since we won't call destroy_channel(). grpc_shutdown(); // Since we won't call destroy_channel().
} }

@ -44,7 +44,7 @@ void grpc_channel_destroy_internal(grpc_channel* channel);
/// Creates a grpc_channel with a builder. See the description of /// Creates a grpc_channel with a builder. See the description of
/// \a grpc_channel_create for variable definitions. /// \a grpc_channel_create for variable definitions.
grpc_channel* grpc_channel_create_with_builder( grpc_channel* grpc_channel_create_with_builder(
grpc_channel_stack_builder* builder, grpc_core::ChannelStackBuilder* builder,
grpc_channel_stack_type channel_stack_type, grpc_channel_stack_type channel_stack_type,
grpc_error_handle* error = nullptr); grpc_error_handle* error = nullptr);

@ -45,7 +45,7 @@ ChannelInit ChannelInit::Builder::Build() {
return result; return result;
} }
bool ChannelInit::CreateStack(grpc_channel_stack_builder* builder, bool ChannelInit::CreateStack(ChannelStackBuilder* builder,
grpc_channel_stack_type type) const { grpc_channel_stack_type type) const {
for (const auto& stage : slots_[type]) { for (const auto& stage : slots_[type]) {
if (!stage(builder)) return false; if (!stage(builder)) return false;

@ -33,15 +33,15 @@
/// It also provides a universal entry path to run those mutators to build /// It also provides a universal entry path to run those mutators to build
/// a channel stack for various subsystems. /// a channel stack for various subsystems.
typedef struct grpc_channel_stack_builder grpc_channel_stack_builder;
namespace grpc_core { namespace grpc_core {
class ChannelStackBuilder;
class ChannelInit { class ChannelInit {
public: public:
/// One stage of mutation: call functions against \a builder to influence the /// One stage of mutation: call functions against \a builder to influence the
/// finally constructed channel stack /// finally constructed channel stack
using Stage = std::function<bool(grpc_channel_stack_builder* builder)>; using Stage = std::function<bool(ChannelStackBuilder* builder)>;
class Builder { class Builder {
public: public:
@ -74,7 +74,7 @@ class ChannelInit {
/// Construct a channel stack of some sort: see channel_stack.h for details /// Construct a channel stack of some sort: see channel_stack.h for details
/// \a type is the type of channel stack to create /// \a type is the type of channel stack to create
/// \a builder is the channel stack builder to build into. /// \a builder is the channel stack builder to build into.
bool CreateStack(grpc_channel_stack_builder* builder, bool CreateStack(ChannelStackBuilder* builder,
grpc_channel_stack_type type) const; grpc_channel_stack_type type) const;
private: private:

@ -38,14 +38,13 @@
void grpc_security_pre_init(void) {} void grpc_security_pre_init(void) {}
static bool maybe_prepend_client_auth_filter( static bool maybe_prepend_client_auth_filter(
grpc_channel_stack_builder* builder) { grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* args = const grpc_channel_args* args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
if (args) { if (args) {
for (size_t i = 0; i < args->num_args; i++) { for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(GRPC_ARG_SECURITY_CONNECTOR, args->args[i].key)) { if (0 == strcmp(GRPC_ARG_SECURITY_CONNECTOR, args->args[i].key)) {
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&grpc_client_auth_filter, nullptr);
builder, &grpc_client_auth_filter, nullptr, nullptr); break;
} }
} }
} }
@ -53,14 +52,13 @@ static bool maybe_prepend_client_auth_filter(
} }
static bool maybe_prepend_server_auth_filter( static bool maybe_prepend_server_auth_filter(
grpc_channel_stack_builder* builder) { grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* args = const grpc_channel_args* args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
if (args) { if (args) {
for (size_t i = 0; i < args->num_args; i++) { for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(GRPC_SERVER_CREDENTIALS_ARG, args->args[i].key)) { if (0 == strcmp(GRPC_SERVER_CREDENTIALS_ARG, args->args[i].key)) {
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&grpc_server_auth_filter, nullptr);
builder, &grpc_server_auth_filter, nullptr, nullptr); break;
} }
} }
} }
@ -68,16 +66,14 @@ static bool maybe_prepend_server_auth_filter(
} }
static bool maybe_prepend_sdk_server_authz_filter( static bool maybe_prepend_sdk_server_authz_filter(
grpc_channel_stack_builder* builder) { grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* args = const grpc_channel_args* args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
const auto* provider = const auto* provider =
grpc_channel_args_find_pointer<grpc_authorization_policy_provider>( grpc_channel_args_find_pointer<grpc_authorization_policy_provider>(
args, GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER); args, GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER);
if (provider != nullptr) { if (provider != nullptr) {
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&grpc_core::SdkServerAuthzFilter::kFilterVtable,
builder, &grpc_core::SdkServerAuthzFilter::kFilterVtable, nullptr, nullptr);
nullptr);
} }
return true; return true;
} }

@ -71,14 +71,13 @@ void RegisterChannelFilter(
std::function<bool(const grpc_channel_args&)> include_filter, std::function<bool(const grpc_channel_args&)> include_filter,
const grpc_channel_filter* filter) { const grpc_channel_filter* filter) {
auto maybe_add_filter = [include_filter, auto maybe_add_filter = [include_filter,
filter](grpc_channel_stack_builder* builder) { filter](grpc_core::ChannelStackBuilder* builder) {
if (include_filter != nullptr) { if (include_filter != nullptr) {
const grpc_channel_args* args = const grpc_channel_args* args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
if (!include_filter(*args)) return true; if (!include_filter(*args)) return true;
} }
return grpc_channel_stack_builder_prepend_filter(builder, filter, nullptr, builder->PrependFilter(filter, nullptr);
nullptr); return true;
}; };
grpc_core::CoreConfiguration::RegisterBuilder( grpc_core::CoreConfiguration::RegisterBuilder(
[stack_type, priority, [stack_type, priority,

@ -54,13 +54,6 @@ void CallDestroyFunc(grpc_call_element* /*elem*/,
bool g_replacement_fn_called = false; bool g_replacement_fn_called = false;
bool g_original_fn_called = false; bool g_original_fn_called = false;
void SetArgOnceFn(grpc_channel_stack* /*channel_stack*/,
grpc_channel_element* /*elem*/, void* arg) {
bool* called = static_cast<bool*>(arg);
// Make sure this function is only called once per arg.
GPR_ASSERT(*called == false);
*called = true;
}
TEST(ChannelStackBuilderTest, ReplaceFilter) { TEST(ChannelStackBuilderTest, ReplaceFilter) {
grpc_channel* channel = grpc_channel* channel =
@ -101,18 +94,29 @@ const grpc_channel_filter original_filter = {
grpc_channel_next_get_info, grpc_channel_next_get_info,
"filter_name"}; "filter_name"};
bool AddReplacementFilter(grpc_channel_stack_builder* builder) { bool AddReplacementFilter(ChannelStackBuilder* builder) {
// Get rid of any other version of the filter, as determined by having the // Get rid of any other version of the filter, as determined by having the
// same name. // same name.
GPR_ASSERT(grpc_channel_stack_builder_remove_filter(builder, auto* stk = builder->mutable_stack();
replacement_filter.name)); stk->erase(std::remove_if(stk->begin(), stk->end(),
return grpc_channel_stack_builder_prepend_filter( [](const ChannelStackBuilder::StackEntry& entry) {
builder, &replacement_filter, SetArgOnceFn, &g_replacement_fn_called); return strcmp(entry.filter->name,
"filter_name") == 0;
}),
stk->end());
builder->PrependFilter(&replacement_filter,
[](grpc_channel_stack*, grpc_channel_element*) {
g_replacement_fn_called = true;
});
return true;
} }
bool AddOriginalFilter(grpc_channel_stack_builder* builder) { bool AddOriginalFilter(ChannelStackBuilder* builder) {
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&original_filter,
builder, &original_filter, SetArgOnceFn, &g_original_fn_called); [](grpc_channel_stack*, grpc_channel_element*) {
g_original_fn_called = true;
});
return true;
} }
} // namespace } // namespace

@ -126,22 +126,20 @@ static int check_stack(const char* file, int line, const char* transport_name,
grpc_channel_args* init_args, grpc_channel_args* init_args,
unsigned channel_stack_type, ...) { unsigned channel_stack_type, ...) {
// create phony channel stack // create phony channel stack
grpc_channel_stack_builder* builder = grpc_core::ChannelStackBuilder builder("test");
grpc_channel_stack_builder_create("test");
grpc_transport_vtable fake_transport_vtable; grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
fake_transport_vtable.name = transport_name; fake_transport_vtable.name = transport_name;
grpc_transport fake_transport = {&fake_transport_vtable}; grpc_transport fake_transport = {&fake_transport_vtable};
grpc_channel_stack_builder_set_target(builder, "foo.test.google.fr");
grpc_channel_args* channel_args = grpc_channel_args_copy(init_args); grpc_channel_args* channel_args = grpc_channel_args_copy(init_args);
builder.SetTarget("foo.test.google.fr").SetChannelArgs(channel_args);
if (transport_name != nullptr) { if (transport_name != nullptr) {
grpc_channel_stack_builder_set_transport(builder, &fake_transport); builder.SetTransport(&fake_transport);
} }
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_channel_stack_builder_set_channel_arguments(builder, channel_args);
GPR_ASSERT(grpc_core::CoreConfiguration::Get().channel_init().CreateStack( GPR_ASSERT(grpc_core::CoreConfiguration::Get().channel_init().CreateStack(
builder, (grpc_channel_stack_type)channel_stack_type)); &builder, (grpc_channel_stack_type)channel_stack_type));
} }
// build up our expectation list // build up our expectation list
@ -158,15 +156,12 @@ static int check_stack(const char* file, int line, const char* transport_name,
// build up our "got" list // build up our "got" list
parts.clear(); parts.clear();
grpc_channel_stack_builder_iterator* it = for (const auto& entry : *builder.mutable_stack()) {
grpc_channel_stack_builder_create_iterator_at_first(builder); const char* name = entry.filter->name;
while (grpc_channel_stack_builder_move_next(it)) {
const char* name = grpc_channel_stack_builder_iterator_filter_name(it);
if (name == nullptr) continue; if (name == nullptr) continue;
parts.push_back(name); parts.push_back(name);
} }
std::string got = absl::StrJoin(parts, ", "); std::string got = absl::StrJoin(parts, ", ");
grpc_channel_stack_builder_iterator_destroy(it);
// figure out result, log if there's an error // figure out result, log if there's an error
int result = 0; int result = 0;
@ -206,7 +201,6 @@ static int check_stack(const char* file, int line, const char* transport_name,
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_channel_stack_builder_destroy(builder);
grpc_channel_args_destroy(channel_args); grpc_channel_args_destroy(channel_args);
} }

@ -256,9 +256,10 @@ void filter_causes_close(grpc_end2end_test_config config) {
[](grpc_core::CoreConfiguration::Builder* builder) { [](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::BuildCoreConfiguration(builder); grpc_core::BuildCoreConfiguration(builder);
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, 0, [](grpc_channel_stack_builder* builder) { GRPC_SERVER_CHANNEL, 0,
return grpc_channel_stack_builder_prepend_filter( [](grpc_core::ChannelStackBuilder* builder) {
builder, &test_filter, nullptr, nullptr); builder->PrependFilter(&test_filter, nullptr);
return true;
}); });
}, },
[config] { test_request(config); }); [config] { test_request(config); });

@ -281,20 +281,16 @@ void filter_context(grpc_end2end_test_config config) {
for (auto type : {GRPC_CLIENT_CHANNEL, GRPC_CLIENT_SUBCHANNEL, for (auto type : {GRPC_CLIENT_CHANNEL, GRPC_CLIENT_SUBCHANNEL,
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_SERVER_CHANNEL}) { GRPC_CLIENT_DIRECT_CHANNEL, GRPC_SERVER_CHANNEL}) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
type, INT_MAX, [](grpc_channel_stack_builder* builder) { type, INT_MAX, [](grpc_core::ChannelStackBuilder* builder) {
// Want to add the filter as close to the end as possible, to // Want to add the filter as close to the end as possible, to
// make sure that all of the filters work well together. // make sure that all of the filters work well together.
// However, we can't add it at the very end, because the // However, we can't add it at the very end, because the
// connected channel filter must be the last one. So we add it // connected channel filter must be the last one. So we add it
// right before the last one. // right before the last one.
grpc_channel_stack_builder_iterator* it = auto it = builder->mutable_stack()->end();
grpc_channel_stack_builder_create_iterator_at_last(builder); --it;
GPR_ASSERT(grpc_channel_stack_builder_move_prev(it)); builder->mutable_stack()->insert(it, {&test_filter, nullptr});
const bool retval = return true;
grpc_channel_stack_builder_add_filter_before(
it, &test_filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
return retval;
}); });
} }
}, },

@ -490,21 +490,17 @@ void filter_init_fails(grpc_end2end_test_config config) {
auto register_stage = [builder](grpc_channel_stack_type type, auto register_stage = [builder](grpc_channel_stack_type type,
bool* enable) { bool* enable) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
type, INT_MAX, [enable](grpc_channel_stack_builder* builder) { type, INT_MAX, [enable](grpc_core::ChannelStackBuilder* builder) {
if (!*enable) return true; if (!*enable) return true;
// Want to add the filter as close to the end as possible, // Want to add the filter as close to the end as possible,
// to make sure that all of the filters work well together. // to make sure that all of the filters work well together.
// However, we can't add it at the very end, because either the // However, we can't add it at the very end, because either the
// client_channel filter or connected_channel filter must be the // client_channel filter or connected_channel filter must be the
// last one. So we add it right before the last one. // last one. So we add it right before the last one.
grpc_channel_stack_builder_iterator* it = auto it = builder->mutable_stack()->end();
grpc_channel_stack_builder_create_iterator_at_last(builder); --it;
GPR_ASSERT(grpc_channel_stack_builder_move_prev(it)); builder->mutable_stack()->insert(it, {&test_filter, nullptr});
const bool retval = return true;
grpc_channel_stack_builder_add_filter_before(
it, &test_filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
return retval;
}); });
}; };
register_stage(GRPC_SERVER_CHANNEL, &g_enable_server_channel_filter); register_stage(GRPC_SERVER_CHANNEL, &g_enable_server_channel_filter);

@ -313,20 +313,16 @@ void filter_latency(grpc_end2end_test_config config) {
auto register_stage = [builder](grpc_channel_stack_type type, auto register_stage = [builder](grpc_channel_stack_type type,
const grpc_channel_filter* filter) { const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
type, INT_MAX, [filter](grpc_channel_stack_builder* builder) { type, INT_MAX, [filter](grpc_core::ChannelStackBuilder* builder) {
// Want to add the filter as close to the end as possible, to // Want to add the filter as close to the end as possible, to
// make sure that all of the filters work well together. // make sure that all of the filters work well together.
// However, we can't add it at the very end, because the // However, we can't add it at the very end, because the
// connected channel filter must be the last one. So we add it // connected channel filter must be the last one. So we add it
// right before the last one. // right before the last one.
grpc_channel_stack_builder_iterator* it = auto it = builder->mutable_stack()->end();
grpc_channel_stack_builder_create_iterator_at_last(builder); --it;
GPR_ASSERT(grpc_channel_stack_builder_move_prev(it)); builder->mutable_stack()->insert(it, {filter, nullptr});
const bool retval = return true;
grpc_channel_stack_builder_add_filter_before(
it, filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
return retval;
}); });
}; };
register_stage(GRPC_CLIENT_CHANNEL, &test_client_filter); register_stage(GRPC_CLIENT_CHANNEL, &test_client_filter);

@ -360,20 +360,16 @@ void filter_status_code(grpc_end2end_test_config config) {
auto register_stage = [builder](grpc_channel_stack_type type, auto register_stage = [builder](grpc_channel_stack_type type,
const grpc_channel_filter* filter) { const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
type, INT_MAX, [filter](grpc_channel_stack_builder* builder) { type, INT_MAX, [filter](grpc_core::ChannelStackBuilder* builder) {
// Want to add the filter as close to the end as possible, to // Want to add the filter as close to the end as possible, to
// make sure that all of the filters work well together. // make sure that all of the filters work well together.
// However, we can't add it at the very end, because the // However, we can't add it at the very end, because the
// connected_channel/client_channel filter must be the last one. // connected_channel/client_channel filter must be the last one.
// So we add it right before the last one. // So we add it right before the last one.
grpc_channel_stack_builder_iterator* it = auto it = builder->mutable_stack()->end();
grpc_channel_stack_builder_create_iterator_at_last(builder); --it;
GPR_ASSERT(grpc_channel_stack_builder_move_prev(it)); builder->mutable_stack()->insert(it, {filter, nullptr});
const bool retval = return true;
grpc_channel_stack_builder_add_filter_before(
it, filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
return retval;
}); });
}; };
register_stage(GRPC_CLIENT_CHANNEL, &test_client_filter); register_stage(GRPC_CLIENT_CHANNEL, &test_client_filter);

@ -306,16 +306,15 @@ grpc_channel_filter FailSendOpsFilter::kFilterVtable = {
"FailSendOpsFilter", "FailSendOpsFilter",
}; };
bool MaybeAddFilter(grpc_channel_stack_builder* builder) { bool MaybeAddFilter(grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries). // Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = const grpc_channel_args* args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, true)) { if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, true)) {
return true; return true;
} }
// Install filter. // Install filter.
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&FailSendOpsFilter::kFilterVtable, nullptr);
builder, &FailSendOpsFilter::kFilterVtable, nullptr, nullptr); return true;
} }
} // namespace } // namespace

@ -338,16 +338,15 @@ grpc_channel_filter InjectStatusFilter::kFilterVtable = {
"InjectStatusFilter", "InjectStatusFilter",
}; };
bool AddFilter(grpc_channel_stack_builder* builder) { bool AddFilter(grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries). // Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = const grpc_channel_args* args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, true)) { if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, true)) {
return true; return true;
} }
// Install filter. // Install filter.
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&InjectStatusFilter::kFilterVtable, nullptr);
builder, &InjectStatusFilter::kFilterVtable, nullptr, nullptr); return true;
} }
} // namespace } // namespace

@ -365,18 +365,18 @@ void retry_send_op_fails(grpc_end2end_test_config config) {
[](grpc_core::CoreConfiguration::Builder* builder) { [](grpc_core::CoreConfiguration::Builder* builder) {
grpc_core::BuildCoreConfiguration(builder); grpc_core::BuildCoreConfiguration(builder);
builder->channel_init()->RegisterStage( builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, 0, [](grpc_channel_stack_builder* builder) { GRPC_CLIENT_SUBCHANNEL, 0,
[](grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries). // Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = const grpc_channel_args* args = builder->channel_args();
grpc_channel_stack_builder_get_channel_arguments(builder);
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
true)) { true)) {
return true; return true;
} }
// Install filter. // Install filter.
return grpc_channel_stack_builder_prepend_filter( builder->PrependFilter(&FailFirstSendOpFilter::kFilterVtable,
builder, &FailFirstSendOpFilter::kFilterVtable, nullptr, nullptr);
nullptr); return true;
}); });
}, },
[config] { test_retry_send_op_fails(config); }); [config] { test_retry_send_op_fails(config); });

@ -77,33 +77,27 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) {
auto channel_stack_modifier = MakeRefCounted<XdsChannelStackModifier>( auto channel_stack_modifier = MakeRefCounted<XdsChannelStackModifier>(
std::vector<const grpc_channel_filter*>{&test_filter_1, &test_filter_2}); std::vector<const grpc_channel_filter*>{&test_filter_1, &test_filter_2});
grpc_arg arg = channel_stack_modifier->MakeChannelArg(); grpc_arg arg = channel_stack_modifier->MakeChannelArg();
// Create a phony grpc_channel_stack_builder object // Create a phony ChannelStackBuilder object
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
grpc_channel_stack_builder* builder = ChannelStackBuilder builder("test");
grpc_channel_stack_builder_create("test"); builder.SetChannelArgs(args);
grpc_channel_stack_builder_set_channel_arguments(builder, args);
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args);
grpc_transport_vtable fake_transport_vtable; grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
fake_transport_vtable.name = "fake"; fake_transport_vtable.name = "fake";
grpc_transport fake_transport = {&fake_transport_vtable}; grpc_transport fake_transport = {&fake_transport_vtable};
grpc_channel_stack_builder_set_transport(builder, &fake_transport); builder.SetTransport(&fake_transport);
// Construct channel stack and verify that the test filters were successfully // Construct channel stack and verify that the test filters were successfully
// added // added
ASSERT_TRUE(CoreConfiguration::Get().channel_init().CreateStack( ASSERT_TRUE(CoreConfiguration::Get().channel_init().CreateStack(
builder, GRPC_SERVER_CHANNEL)); &builder, GRPC_SERVER_CHANNEL));
grpc_channel_stack_builder_iterator* it = std::vector<std::string> filters;
grpc_channel_stack_builder_create_iterator_at_first(builder); for (const auto& entry : *builder.mutable_stack()) {
ASSERT_TRUE(grpc_channel_stack_builder_move_next(it)); filters.push_back(entry.filter->name);
ASSERT_STREQ(grpc_channel_stack_builder_iterator_filter_name(it), "server"); }
ASSERT_TRUE(grpc_channel_stack_builder_move_next(it)); filters.resize(3);
ASSERT_STREQ(grpc_channel_stack_builder_iterator_filter_name(it), EXPECT_EQ(filters,
kTestFilter1); std::vector<std::string>({"server", kTestFilter1, kTestFilter2}));
ASSERT_TRUE(grpc_channel_stack_builder_move_next(it));
ASSERT_STREQ(grpc_channel_stack_builder_iterator_filter_name(it),
kTestFilter2);
grpc_channel_stack_builder_iterator_destroy(it);
grpc_channel_stack_builder_destroy(builder);
grpc_shutdown(); grpc_shutdown();
} }
@ -122,36 +116,27 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertionAfterCensus) {
auto channel_stack_modifier = MakeRefCounted<XdsChannelStackModifier>( auto channel_stack_modifier = MakeRefCounted<XdsChannelStackModifier>(
std::vector<const grpc_channel_filter*>{&test_filter_1, &test_filter_2}); std::vector<const grpc_channel_filter*>{&test_filter_1, &test_filter_2});
grpc_arg arg = channel_stack_modifier->MakeChannelArg(); grpc_arg arg = channel_stack_modifier->MakeChannelArg();
// Create a phony grpc_channel_stack_builder object // Create a phony ChannelStackBuilder object
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1); grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
grpc_channel_stack_builder* builder = ChannelStackBuilder builder("test");
grpc_channel_stack_builder_create("test"); builder.SetChannelArgs(args);
grpc_channel_stack_builder_set_channel_arguments(builder, args);
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args);
grpc_transport_vtable fake_transport_vtable; grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
fake_transport_vtable.name = "fake"; fake_transport_vtable.name = "fake";
grpc_transport fake_transport = {&fake_transport_vtable}; grpc_transport fake_transport = {&fake_transport_vtable};
grpc_channel_stack_builder_set_transport(builder, &fake_transport); builder.SetTransport(&fake_transport);
// Construct channel stack and verify that the test filters were successfully // Construct channel stack and verify that the test filters were successfully
// added after the census filter // added after the census filter
ASSERT_TRUE(CoreConfiguration::Get().channel_init().CreateStack( ASSERT_TRUE(CoreConfiguration::Get().channel_init().CreateStack(
builder, GRPC_SERVER_CHANNEL)); &builder, GRPC_SERVER_CHANNEL));
grpc_channel_stack_builder_iterator* it = std::vector<std::string> filters;
grpc_channel_stack_builder_create_iterator_at_first(builder); for (const auto& entry : *builder.mutable_stack()) {
ASSERT_TRUE(grpc_channel_stack_builder_move_next(it)); filters.push_back(entry.filter->name);
ASSERT_STREQ(grpc_channel_stack_builder_iterator_filter_name(it), "server"); }
ASSERT_TRUE(grpc_channel_stack_builder_move_next(it)); filters.resize(4);
ASSERT_STREQ(grpc_channel_stack_builder_iterator_filter_name(it), EXPECT_EQ(filters, std::vector<std::string>({"server", "opencensus_server",
"opencensus_server"); kTestFilter1, kTestFilter2}));
ASSERT_TRUE(grpc_channel_stack_builder_move_next(it));
ASSERT_STREQ(grpc_channel_stack_builder_iterator_filter_name(it),
kTestFilter1);
ASSERT_TRUE(grpc_channel_stack_builder_move_next(it));
ASSERT_STREQ(grpc_channel_stack_builder_iterator_filter_name(it),
kTestFilter2);
grpc_channel_stack_builder_iterator_destroy(it);
grpc_channel_stack_builder_destroy(builder);
grpc_shutdown(); grpc_shutdown();
} }

@ -704,16 +704,13 @@ class IsolatedCallFixture : public TrackCounters {
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get() const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning() .channel_args_preconditioning()
.PreconditionChannelArgs(nullptr); .PreconditionChannelArgs(nullptr);
grpc_channel_stack_builder* builder = grpc_core::ChannelStackBuilder builder("phony");
grpc_channel_stack_builder_create("phony"); builder.SetTarget("phony_target");
grpc_channel_stack_builder_set_target(builder, "phony_target"); builder.SetChannelArgs(args);
grpc_channel_stack_builder_set_channel_arguments(builder, args); builder.AppendFilter(&isolated_call_filter::isolated_call_filter, nullptr);
GPR_ASSERT(grpc_channel_stack_builder_append_filter(
builder, &isolated_call_filter::isolated_call_filter, nullptr,
nullptr));
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
channel_ = grpc_channel_create_with_builder(builder, GRPC_CLIENT_CHANNEL, channel_ = grpc_channel_create_with_builder(&builder, GRPC_CLIENT_CHANNEL,
nullptr); nullptr);
} }
cq_ = grpc_completion_queue_create_for_next(nullptr); cq_ = grpc_completion_queue_create_for_next(nullptr);

Loading…
Cancel
Save