Convert grpc_channel to C++ (#29266)

* begin

* tests

* fix

* http

* Filter fuzzer

* progress

* basics

* progress

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* Revert "Revert "HTTP Client Filter --> promises (#29031)" (#29181)"

This reverts commit 6ee276f672.

* stuff

* debug

* minimal reproduction

* progress

* progress

* create call

* progress

* recv trailing metadata

* wakeups

* corpus

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* debug

* fix state machine for c#

* Revert "minimal reproduction"

This reverts commit 4d02d2e730.

* Revert "debug"

This reverts commit 7960842f48.

* Revert "debug"

This reverts commit a6f224e4a1.

* no-logging

* initial-metadata

* Revert "Revert "debug""

This reverts commit 951844e857.

* Better int conversion

* debug

* Fix for Cronet

* Revert "debug"

This reverts commit 4d641c4281.

* Revert "Better int conversion"

This reverts commit 4001b957cb.

* Revert "Revert "Revert "debug"""

This reverts commit d135c61043.

* progress

* progress

* Automated change: Fix sanity tests

* fix, c++ize

* handle transport, use objects

* enable more stuffs

* remove placeholder

* contexts

* fix

* fix

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* x

* exceptional toast

* include idle filters, time

* fix

* namespace

* fixes

* final info

* progress

* cleanup

* progress

* progress

* progress

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* progress

* Set int

* Set int

* Automated change: Fix sanity tests

* fix

* fix

* fix

* fixes

* fixes

* fixes

* fixes

* fix

* fix race

* fix race

* mac fix

* review feedback

* getgetget

* fix ios

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
Co-authored-by: Jan Tattermusch <jtattermusch@google.com>
pull/29468/head
Craig Tiller 3 years ago committed by GitHub
parent 61c69db2bf
commit 8bb45aa3a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      BUILD
  2. 15
      src/core/ext/filters/channel_idle/channel_idle_filter.cc
  3. 44
      src/core/ext/filters/client_channel/channel_connectivity.cc
  4. 4
      src/core/ext/filters/client_channel/client_channel.cc
  5. 2
      src/core/ext/filters/client_channel/client_channel.h
  6. 4
      src/core/ext/filters/client_channel/client_channel_factory.cc
  7. 4
      src/core/ext/filters/client_channel/client_channel_factory.h
  8. 13
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  9. 6
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  10. 7
      src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
  11. 27
      src/core/ext/filters/client_channel/subchannel.cc
  12. 4
      src/core/ext/filters/deadline/deadline_filter.cc
  13. 13
      src/core/ext/filters/http/client_authority_filter.cc
  14. 9
      src/core/ext/filters/http/http_filters_plugin.cc
  15. 3
      src/core/ext/filters/http/message_compress/message_decompress_filter.cc
  16. 8
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  17. 41
      src/core/ext/filters/message_size/message_size_filter.cc
  18. 4
      src/core/ext/filters/message_size/message_size_filter.h
  19. 64
      src/core/ext/transport/binder/client/channel_create_impl.cc
  20. 94
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  21. 21
      src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc
  22. 13
      src/core/ext/transport/inproc/inproc_transport.cc
  23. 5
      src/core/ext/xds/xds_channel_stack_modifier.cc
  24. 6
      src/core/ext/xds/xds_client.cc
  25. 3
      src/core/ext/xds/xds_server_config_fetcher.cc
  26. 59
      src/core/lib/channel/channel_args.cc
  27. 30
      src/core/lib/channel/channel_args.h
  28. 4
      src/core/lib/channel/channel_args_preconditioning.cc
  29. 3
      src/core/lib/channel/channel_args_preconditioning.h
  30. 5
      src/core/lib/channel/channel_stack.cc
  31. 5
      src/core/lib/channel/channel_stack.h
  32. 10
      src/core/lib/channel/channel_stack_builder.cc
  33. 14
      src/core/lib/channel/channel_stack_builder.h
  34. 50
      src/core/lib/channel/channel_stack_builder_impl.cc
  35. 4
      src/core/lib/channel/channel_stack_builder_impl.h
  36. 4
      src/core/lib/channel/channelz.h
  37. 3
      src/core/lib/http/httpcli.cc
  38. 4
      src/core/lib/http/httpcli_security_connector.cc
  39. 3
      src/core/lib/iomgr/endpoint_pair_posix.cc
  40. 3
      src/core/lib/resolver/server_address.cc
  41. 3
      src/core/lib/resolver/server_address.h
  42. 3
      src/core/lib/security/credentials/composite/composite_credentials.h
  43. 11
      src/core/lib/security/credentials/credentials.h
  44. 15
      src/core/lib/security/credentials/google_default/google_default_credentials.cc
  45. 2
      src/core/lib/security/credentials/google_default/google_default_credentials.h
  46. 30
      src/core/lib/surface/call.cc
  47. 3
      src/core/lib/surface/call.h
  48. 396
      src/core/lib/surface/channel.cc
  49. 124
      src/core/lib/surface/channel.h
  50. 27
      src/core/lib/surface/init.cc
  51. 24
      src/core/lib/surface/lame_client.cc
  52. 87
      src/core/lib/surface/server.cc
  53. 16
      src/core/lib/surface/server.h
  54. 8
      src/cpp/client/channel_test_peer.cc
  55. 8
      src/cpp/common/channel_filter.cc
  56. 3
      test/core/bad_client/bad_client.cc
  57. 7
      test/core/bad_connection/close_fd_test.cc
  58. 28
      test/core/channel/minimal_stack_is_minimal_test.cc
  59. 34
      test/core/end2end/fixtures/h2_sockpair+trace.cc
  60. 33
      test/core/end2end/fixtures/h2_sockpair.cc
  61. 33
      test/core/end2end/fixtures/h2_sockpair_1byte.cc
  62. 6
      test/core/end2end/fixtures/http_proxy_fixture.cc
  63. 30
      test/core/end2end/fuzzers/client_fuzzer.cc
  64. 3
      test/core/end2end/fuzzers/server_fuzzer.cc
  65. 6
      test/core/end2end/tests/binary_metadata.cc
  66. 5
      test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc
  67. 6
      test/core/end2end/tests/retry_recv_message_replay.cc
  68. 5
      test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
  69. 6
      test/core/end2end/tests/retry_send_op_fails.cc
  70. 6
      test/core/end2end/tests/retry_transparent_goaway.cc
  71. 6
      test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
  72. 14
      test/core/iomgr/ios/CFStreamTests/CFStreamClientTests.mm
  73. 7
      test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
  74. 3
      test/core/iomgr/tcp_client_posix_test.cc
  75. 15
      test/core/iomgr/tcp_server_posix_test.cc
  76. 3
      test/core/surface/concurrent_connectivity_test.cc
  77. 6
      test/core/surface/secure_channel_create_test.cc
  78. 15
      test/core/transport/binder/end2end/fuzzers/client_fuzzer.cc
  79. 3
      test/core/transport/binder/end2end/fuzzers/server_fuzzer.cc
  80. 20
      test/core/transport/binder/end2end/testing_channel_create.cc
  81. 6
      test/core/transport/chttp2/context_list_test.cc
  82. 3
      test/core/transport/chttp2/settings_timeout_test.cc
  83. 3
      test/core/util/test_tcp_server.cc
  84. 4
      test/core/xds/xds_channel_stack_modifier_test.cc
  85. 11
      test/cpp/microbenchmarks/bm_call_create.cc
  86. 3
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  87. 9
      test/cpp/microbenchmarks/fullstack_fixtures.h
  88. 8
      test/cpp/performance/writes_per_rpc_test.cc

@ -2256,6 +2256,9 @@ grpc_cc_library(
hdrs = [
"src/core/lib/channel/channel_stack_builder.h",
],
external_deps = [
"absl/status:statusor",
],
language = "c++",
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [

@ -273,10 +273,9 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = builder->channel_args();
if (!grpc_channel_args_want_minimal_stack(channel_args) &&
GetClientIdleTimeout(ChannelArgs::FromC(channel_args)) !=
Duration::Infinity()) {
auto channel_args = builder->channel_args();
if (!channel_args.WantMinimalStack() &&
GetClientIdleTimeout(channel_args) != Duration::Infinity()) {
builder->PrependFilter(&grpc_client_idle_filter, nullptr);
}
return true;
@ -284,11 +283,9 @@ void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = builder->channel_args();
if (!grpc_channel_args_want_minimal_stack(channel_args) &&
MaxAgeFilter::Config::FromChannelArgs(
ChannelArgs::FromC(channel_args))
.enable()) {
auto channel_args = builder->channel_args();
if (!channel_args.WantMinimalStack() &&
MaxAgeFilter::Config::FromChannelArgs(channel_args).enable()) {
builder->PrependFilter(
&grpc_max_age_filter,
[](grpc_channel_stack*, grpc_channel_element* elem) {

@ -25,28 +25,33 @@
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/lame_client.h"
namespace grpc_core {
namespace {
bool IsLameChannel(grpc_channel* channel) {
bool IsLameChannel(Channel* channel) {
grpc_channel_element* elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_channel_stack_last_element(channel->channel_stack());
return elem->filter == &grpc_lame_filter;
}
} // namespace
} // namespace grpc_core
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel* channel, int try_to_connect) {
grpc_channel* c_channel, int try_to_connect) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE(
"grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
(channel, try_to_connect));
(c_channel, try_to_connect));
grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel);
// Forward through to the underlying client channel.
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel);
if (GPR_UNLIKELY(client_channel == nullptr)) {
if (IsLameChannel(channel)) return GRPC_CHANNEL_TRANSIENT_FAILURE;
if (grpc_core::IsLameChannel(channel)) {
return GRPC_CHANNEL_TRANSIENT_FAILURE;
}
gpr_log(GPR_ERROR,
"grpc_channel_check_connectivity_state called on something that is "
"not a client channel");
@ -55,11 +60,12 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
return client_channel->CheckConnectivityState(try_to_connect);
}
int grpc_channel_num_external_connectivity_watchers(grpc_channel* channel) {
int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) {
grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel);
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel);
if (client_channel == nullptr) {
if (!IsLameChannel(channel)) {
if (!grpc_core::IsLameChannel(channel)) {
gpr_log(GPR_ERROR,
"grpc_channel_num_external_connectivity_watchers called on "
"something that is not a client channel");
@ -70,7 +76,8 @@ int grpc_channel_num_external_connectivity_watchers(grpc_channel* channel) {
}
int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
return grpc_core::ClientChannel::GetFromChannel(channel) != nullptr;
return grpc_core::ClientChannel::GetFromChannel(
grpc_core::Channel::FromC(channel)) != nullptr;
}
namespace grpc_core {
@ -78,22 +85,25 @@ namespace {
class StateWatcher : public DualRefCounted<StateWatcher> {
public:
StateWatcher(grpc_channel* channel, grpc_completion_queue* cq, void* tag,
StateWatcher(grpc_channel* c_channel, grpc_completion_queue* cq, void* tag,
grpc_connectivity_state last_observed_state,
gpr_timespec deadline)
: channel_(channel), cq_(cq), tag_(tag), state_(last_observed_state) {
: channel_(Channel::FromC(c_channel)->Ref()),
cq_(cq),
tag_(tag),
state_(last_observed_state) {
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
GRPC_CLOSURE_INIT(&on_timeout_, TimeoutComplete, this, nullptr);
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel);
ClientChannel* client_channel =
ClientChannel::GetFromChannel(channel_.get());
if (client_channel == nullptr) {
// If the target URI used to create the channel was invalid, channel
// stack initialization failed, and that caused us to create a lame
// channel. In that case, connectivity state will never change (it
// will always be TRANSIENT_FAILURE), so we don't actually start a
// watch, but we are hiding that fact from the application.
if (IsLameChannel(channel)) {
if (IsLameChannel(channel_.get())) {
// Ref from object creation is held by timer callback.
StartTimer(Timestamp::FromTimespecRoundUp(deadline));
return;
@ -114,10 +124,6 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
&on_complete_, watcher_timer_init_state->closure());
}
~StateWatcher() override {
GRPC_CHANNEL_INTERNAL_UNREF(channel_, "watch_channel_connectivity");
}
private:
// A fire-and-forget object used to delay starting the timer until the
// ClientChannel actually starts the watch.
@ -160,7 +166,7 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
self->timer_fired_ = error == GRPC_ERROR_NONE;
// If this is a client channel (not a lame channel), cancel the watch.
ClientChannel* client_channel =
ClientChannel::GetFromChannel(self->channel_);
ClientChannel::GetFromChannel(self->channel_.get());
if (client_channel != nullptr) {
client_channel->CancelExternalConnectivityWatcher(&self->on_complete_);
}
@ -184,7 +190,7 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
self->WeakUnref();
}
grpc_channel* channel_;
RefCountedPtr<Channel> channel_;
grpc_completion_queue* cq_;
void* tag_;

@ -999,9 +999,9 @@ class ClientChannel::ClientChannelControlHelper
// ClientChannel implementation
//
ClientChannel* ClientChannel::GetFromChannel(grpc_channel* channel) {
ClientChannel* ClientChannel::GetFromChannel(Channel* channel) {
grpc_channel_element* elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
grpc_channel_stack_last_element(channel->channel_stack());
if (elem->filter != &kFilterVtable) return nullptr;
return static_cast<ClientChannel*>(elem->channel_data);
}

@ -91,7 +91,7 @@ class ClientChannel {
// Returns the ClientChannel object from channel, or null if channel
// is not a client channel.
static ClientChannel* GetFromChannel(grpc_channel* channel);
static ClientChannel* GetFromChannel(Channel* channel);
grpc_connectivity_state CheckConnectivityState(bool try_to_connect);

@ -39,6 +39,10 @@ const grpc_arg_pointer_vtable factory_arg_vtable = {
} // namespace
absl::string_view ClientChannelFactory::ChannelArgName() {
return GRPC_ARG_CLIENT_CHANNEL_FACTORY;
}
grpc_arg ClientChannelFactory::CreateChannelArg(ClientChannelFactory* factory) {
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL_FACTORY), factory,

@ -27,12 +27,16 @@ namespace grpc_core {
class ClientChannelFactory {
public:
struct RawPointerChannelArgTag {};
virtual ~ClientChannelFactory() = default;
// Creates a subchannel with the specified args.
virtual RefCountedPtr<Subchannel> CreateSubchannel(
const grpc_resolved_address& address, const grpc_channel_args* args) = 0;
static absl::string_view ChannelArgName();
// Returns a channel arg containing the specified factory.
static grpc_arg CreateChannelArg(ClientChannelFactory* factory);

@ -1484,7 +1484,8 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
// fallback mode even if the fallback timeout has not elapsed.
ClientChannel* client_channel = ClientChannel::GetFromChannel(lb_channel_);
ClientChannel* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(lb_channel_));
GPR_ASSERT(client_channel != nullptr);
// Ref held by callback.
watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
@ -1545,7 +1546,8 @@ void GrpcLb::UpdateBalancerChannelLocked(const grpc_channel_args& args) {
}
void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
ClientChannel* client_channel = ClientChannel::GetFromChannel(lb_channel_);
ClientChannel* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(lb_channel_));
GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_);
}
@ -1870,11 +1872,8 @@ void RegisterGrpcLbLoadReportingFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
const grpc_channel_args* args = builder->channel_args();
const grpc_arg* channel_arg =
grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
strcmp(channel_arg->value.string, "grpclb") == 0) {
if (builder->channel_args().GetString(GRPC_ARG_LB_POLICY_NAME) ==
"grpclb") {
// TODO(roth): When we get around to re-attempting
// https://github.com/grpc/grpc/pull/16214, we should try to keep
// this filter at the very top of the subchannel stack, since that

@ -1568,7 +1568,8 @@ RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
parent_channelz_node_ = parent_channelz_node->Ref();
}
// Start connectivity watch.
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
ClientChannel* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr);
watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
client_channel->AddConnectivityWatcher(
@ -1593,7 +1594,8 @@ void RlsLb::RlsChannel::Orphan() {
}
// Stop connectivity watch.
if (watcher_ != nullptr) {
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
ClientChannel* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_);
watcher_ = nullptr;

@ -143,10 +143,9 @@ void RegisterServiceConfigChannelArgFilter(
builder->channel_init()->RegisterStage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = builder->channel_args();
if (grpc_channel_args_want_minimal_stack(channel_args) ||
grpc_channel_args_find_string(channel_args,
GRPC_ARG_SERVICE_CONFIG) == nullptr) {
auto channel_args = builder->channel_args();
if (channel_args.WantMinimalStack() ||
!channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value()) {
return true;
}
builder->PrependFilter(&ServiceConfigChannelArgFilter, nullptr);

@ -979,28 +979,17 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) {
c.reset(DEBUG_LOCATION, "connecting");
}
namespace {
void ConnectionDestroy(void* arg, grpc_error_handle /*error*/) {
grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg);
grpc_channel_stack_destroy(stk);
gpr_free(stk);
}
} // namespace
bool Subchannel::PublishTransportLocked() {
// Construct channel stack.
ChannelStackBuilderImpl builder("subchannel", GRPC_CLIENT_SUBCHANNEL);
builder.SetChannelArgs(connecting_result_.channel_args)
builder.SetChannelArgs(ChannelArgs::FromC(connecting_result_.channel_args))
.SetTransport(connecting_result_.transport);
if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
return false;
}
grpc_channel_stack* stk;
grpc_error_handle error = builder.Build(0, 1, ConnectionDestroy, nullptr,
reinterpret_cast<void**>(&stk));
if (error != GRPC_ERROR_NONE) {
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> stk = builder.Build();
if (!stk.ok()) {
auto error = absl_status_to_grpc_error(stk.status());
grpc_transport_destroy(connecting_result_.transport);
gpr_log(GPR_ERROR,
"subchannel %p %s: error initializing subchannel stack: %s", this,
@ -1011,14 +1000,10 @@ bool Subchannel::PublishTransportLocked() {
RefCountedPtr<channelz::SocketNode> socket =
std::move(connecting_result_.socket_node);
connecting_result_.Reset();
if (disconnected_) {
grpc_channel_stack_destroy(stk);
gpr_free(stk);
return false;
}
if (disconnected_) return false;
// Publish.
connected_subchannel_.reset(
new ConnectedSubchannel(stk, args_, channelz_node_));
new ConnectedSubchannel(stk->release(), args_, channelz_node_));
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
gpr_log(GPR_INFO, "subchannel %p %s: new connected subchannel at %p", this,
key_.ToString().c_str(), connected_subchannel_.get());

@ -380,7 +380,9 @@ void RegisterDeadlineFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[filter](ChannelStackBuilder* builder) {
if (grpc_deadline_checking_enabled(builder->channel_args())) {
auto args = builder->channel_args();
if (args.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS)
.value_or(!args.WantMinimalStack())) {
builder->PrependFilter(filter, nullptr);
}
return true;

@ -70,15 +70,10 @@ const grpc_channel_filter ClientAuthorityFilter::kFilter =
namespace {
bool add_client_authority_filter(ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = builder->channel_args();
const grpc_arg* disable_client_authority_filter_arg = grpc_channel_args_find(
channel_args, GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER);
if (disable_client_authority_filter_arg != nullptr) {
const bool is_client_authority_filter_disabled =
grpc_channel_arg_get_bool(disable_client_authority_filter_arg, false);
if (is_client_authority_filter_disabled) {
return true;
}
if (builder->channel_args()
.GetBool(GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER)
.value_or(false)) {
return true;
}
builder->PrependFilter(&ClientAuthorityFilter::kFilter, nullptr);
return true;

@ -46,11 +46,10 @@ void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
[enable_in_minimal_stack, control_channel_arg,
filter](ChannelStackBuilder* builder) {
if (!is_building_http_like_transport(builder)) return true;
const grpc_channel_args* channel_args = builder->channel_args();
bool enable = grpc_channel_arg_get_bool(
grpc_channel_args_find(channel_args, control_channel_arg),
enable_in_minimal_stack ||
!grpc_channel_args_want_minimal_stack(channel_args));
auto args = builder->channel_args();
const bool enable = args.GetBool(control_channel_arg)
.value_or(enable_in_minimal_stack ||
!args.WantMinimalStack());
if (enable) builder->PrependFilter(filter, nullptr);
return true;
});

@ -45,7 +45,8 @@ namespace {
class ChannelData {
public:
explicit ChannelData(const grpc_channel_element_args* args)
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(args->channel_args)),
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(
ChannelArgs::FromC(args->channel_args))),
message_size_service_config_parser_index_(
MessageSizeParser::ParserIndex()) {}

@ -237,9 +237,8 @@ ArenaPromise<ServerMetadataHandle> ServerLoadReportingFilter::MakeCallPromise(
}
namespace {
bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
return grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
bool MaybeAddServerLoadReportingFilter(const ChannelArgs& args) {
return args.GetBool(GRPC_ARG_ENABLE_LOAD_REPORTING).value_or(false);
}
const grpc_channel_filter kFilter =
@ -264,8 +263,7 @@ struct ServerLoadReportingFilterStaticRegistrar {
grpc::load_reporter::MeasureOtherCallMetric();
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* cs_builder) {
if (MaybeAddServerLoadReportingFilter(
*cs_builder->channel_args())) {
if (MaybeAddServerLoadReportingFilter(cs_builder->channel_args())) {
cs_builder->PrependFilter(&kFilter, nullptr);
}
return true;

@ -120,18 +120,16 @@ size_t MessageSizeParser::ParserIndex() {
parser_name());
}
int GetMaxRecvSizeFromChannelArgs(const grpc_channel_args* args) {
if (grpc_channel_args_want_minimal_stack(args)) return -1;
return grpc_channel_args_find_integer(
args, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
{GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, -1, INT_MAX});
int GetMaxRecvSizeFromChannelArgs(const ChannelArgs& args) {
if (args.WantMinimalStack()) return -1;
return std::max(-1, args.GetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)
.value_or(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH));
}
int GetMaxSendSizeFromChannelArgs(const grpc_channel_args* args) {
if (grpc_channel_args_want_minimal_stack(args)) return -1;
return grpc_channel_args_find_integer(
args, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH,
{GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, -1, INT_MAX});
int GetMaxSendSizeFromChannelArgs(const ChannelArgs& args) {
if (args.WantMinimalStack()) return -1;
return std::max(-1, args.GetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH)
.value_or(GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH));
}
} // namespace grpc_core
@ -308,7 +306,7 @@ static void message_size_destroy_call_elem(
}
grpc_core::MessageSizeParsedConfig::message_size_limits get_message_size_limits(
const grpc_channel_args* channel_args) {
const grpc_core::ChannelArgs& channel_args) {
grpc_core::MessageSizeParsedConfig::message_size_limits lim;
lim.max_send_size = grpc_core::GetMaxSendSizeFromChannelArgs(channel_args);
lim.max_recv_size = grpc_core::GetMaxRecvSizeFromChannelArgs(channel_args);
@ -321,7 +319,8 @@ static grpc_error_handle message_size_init_channel_elem(
GPR_ASSERT(!args->is_last);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (chand) channel_data();
chand->limits = get_message_size_limits(args->channel_args);
chand->limits = get_message_size_limits(
grpc_core::ChannelArgs::FromC(args->channel_args));
return GRPC_ERROR_NONE;
}
@ -348,8 +347,7 @@ const grpc_channel_filter grpc_message_size_filter = {
// Used for GRPC_CLIENT_SUBCHANNEL
static bool maybe_add_message_size_filter_subchannel(
grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = builder->channel_args();
if (grpc_channel_args_want_minimal_stack(channel_args)) {
if (builder->channel_args().WantMinimalStack()) {
return true;
}
builder->PrependFilter(&grpc_message_size_filter, nullptr);
@ -360,20 +358,15 @@ static bool maybe_add_message_size_filter_subchannel(
// only if message size limits or service config is specified.
static bool maybe_add_message_size_filter(
grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = builder->channel_args();
if (grpc_channel_args_want_minimal_stack(channel_args)) {
auto channel_args = builder->channel_args();
if (channel_args.WantMinimalStack()) {
return true;
}
bool enable = false;
grpc_core::MessageSizeParsedConfig::message_size_limits lim =
get_message_size_limits(channel_args);
if (lim.max_send_size != -1 || lim.max_recv_size != -1) {
enable = true;
}
const grpc_arg* a =
grpc_channel_args_find(channel_args, GRPC_ARG_SERVICE_CONFIG);
const char* svc_cfg_str = grpc_channel_arg_get_string(a);
if (svc_cfg_str != nullptr) enable = true;
const bool enable =
lim.max_send_size != -1 || lim.max_recv_size != -1 ||
channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
if (enable) builder->PrependFilter(&grpc_message_size_filter, nullptr);
return true;
}

@ -65,8 +65,8 @@ class MessageSizeParser : public ServiceConfigParser::Parser {
static absl::string_view parser_name() { return "message_size"; }
};
int GetMaxRecvSizeFromChannelArgs(const grpc_channel_args* args);
int GetMaxSendSizeFromChannelArgs(const grpc_channel_args* args);
int GetMaxRecvSizeFromChannelArgs(const ChannelArgs& args);
int GetMaxSendSizeFromChannelArgs(const ChannelArgs& args);
} // namespace grpc_core

@ -51,23 +51,17 @@ grpc_channel* CreateDirectBinderChannelImplForTesting(
std::move(endpoint_binder), security_policy);
GPR_ASSERT(transport != nullptr);
grpc_arg default_authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
const_cast<char*>("binder.authority"));
args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
grpc_channel_args* final_args =
grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
grpc_error_handle error = GRPC_ERROR_NONE;
grpc_channel* channel = grpc_channel_create_internal(
"binder_target_placeholder", final_args, GRPC_CLIENT_DIRECT_CHANNEL,
transport, &error);
auto channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args)
.Set(GRPC_ARG_DEFAULT_AUTHORITY, "binder.authority");
auto channel =
grpc_core::Channel::Create("binder_target_placeholder", channel_args,
GRPC_CLIENT_DIRECT_CHANNEL, transport);
// TODO(mingcl): Handle error properly
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(channel.ok());
grpc_channel_args_destroy(args);
grpc_channel_args_destroy(final_args);
return channel;
return channel->release()->c_ptr();
}
grpc_channel* CreateClientBinderChannelImpl(const grpc_channel_args* args) {
@ -75,39 +69,23 @@ grpc_channel* CreateClientBinderChannelImpl(const grpc_channel_args* args) {
gpr_once_init(&g_factory_once, FactoryInit);
args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
auto channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args)
.SetObject(g_factory);
// Set channel factory argument
grpc_arg channel_factory_arg =
grpc_core::ClientChannelFactory::CreateChannelArg(g_factory);
const char* arg_to_remove = channel_factory_arg.key;
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, &arg_to_remove, 1, &channel_factory_arg, 1);
auto channel =
grpc_core::Channel::Create("binder_channel_target_placeholder",
channel_args, GRPC_CLIENT_CHANNEL, nullptr);
grpc_error_handle error = GRPC_ERROR_NONE;
grpc_channel* channel = grpc_channel_create_internal(
"binder_channel_target_placeholder", new_args, GRPC_CLIENT_CHANNEL,
nullptr, &error);
// Clean up.
grpc_channel_args_destroy(new_args);
grpc_channel_args_destroy(args);
if (channel == nullptr) {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
channel = grpc_lame_client_channel_create(
"binder_channel_target_placeholder", status,
if (!channel.ok()) {
return grpc_lame_client_channel_create(
"binder_channel_target_placeholder",
static_cast<grpc_status_code>(channel.status().code()),
"Failed to create binder channel");
}
return channel;
return channel->release()->c_ptr();
}
} // namespace internal

@ -46,6 +46,7 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h"
@ -306,7 +307,8 @@ class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
if (channel_credentials == nullptr) {
gpr_log(GPR_ERROR,
"Can't create subchannel: channel credentials missing for secure "
"channel.");
"channel. Got args: %s",
grpc_channel_args_string(args).c_str());
return nullptr;
}
// Make sure security connector does not already exist in args.
@ -344,29 +346,19 @@ class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
}
};
grpc_channel* CreateChannel(const char* target, const grpc_channel_args* args,
grpc_error_handle* error) {
absl::StatusOr<RefCountedPtr<Channel>> CreateChannel(const char* target,
ChannelArgs args) {
if (target == nullptr) {
gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
if (error != nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel target is NULL");
}
return nullptr;
return absl::InvalidArgumentError("channel target is NULL");
}
// Add channel arg containing the server URI.
std::string canonical_target =
CoreConfiguration::Get().resolver_registry().AddDefaultPrefixIfNeeded(
target);
grpc_arg arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVER_URI),
const_cast<char*>(canonical_target.c_str()));
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
grpc_channel* channel = grpc_channel_create_internal(
target, new_args, GRPC_CLIENT_CHANNEL, nullptr, error);
grpc_channel_args_destroy(new_args);
return channel;
return Channel::Create(target,
args.Set(GRPC_ARG_SERVER_URI, canonical_target),
GRPC_CLIENT_CHANNEL, nullptr);
}
} // namespace
@ -389,33 +381,30 @@ void FactoryInit() {
// - perform handshakes
grpc_channel* grpc_channel_create(const char* target,
grpc_channel_credentials* creds,
const grpc_channel_args* args) {
const grpc_channel_args* c_args) {
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_secure_channel_create(target=%s, creds=%p, args=%p)", 3,
(target, (void*)creds, (void*)args));
args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
(target, (void*)creds, (void*)c_args));
grpc_channel* channel = nullptr;
grpc_error_handle error = GRPC_ERROR_NONE;
if (creds != nullptr) {
// Add channel args containing the client channel factory and channel
// credentials.
gpr_once_init(&g_factory_once, FactoryInit);
grpc_arg channel_factory_arg =
grpc_core::ClientChannelFactory::CreateChannelArg(g_factory);
grpc_arg args_to_add[] = {channel_factory_arg,
grpc_channel_credentials_to_arg(creds)};
const char* arg_to_remove = channel_factory_arg.key;
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, &arg_to_remove, 1, args_to_add, GPR_ARRAY_SIZE(args_to_add));
new_args = creds->update_arguments(new_args);
grpc_core::ChannelArgs args =
creds->update_arguments(grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(c_args)
.SetObject(creds->Ref())
.SetObject(g_factory));
// Create channel.
channel = grpc_core::CreateChannel(target, new_args, &error);
// Clean up.
grpc_channel_args_destroy(new_args);
auto r = grpc_core::CreateChannel(target, args);
if (r.ok()) {
channel = r->release()->c_ptr();
} else {
error = absl_status_to_grpc_error(r.status());
}
}
grpc_channel_args_destroy(args);
if (channel == nullptr) {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
@ -444,14 +433,13 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
target, GRPC_STATUS_INTERNAL,
"Failed to create client channel due to invalid creds");
}
grpc_arg default_authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
const_cast<char*>("test.authority"));
args = grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
const grpc_channel_args* final_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
grpc_channel_args_destroy(args);
const grpc_channel_args* final_args =
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args)
.SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority")
.SetObject(creds->Ref())
.ToC();
int flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
@ -460,26 +448,20 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
GPR_ASSERT(transport);
grpc_error_handle error = GRPC_ERROR_NONE;
grpc_channel* channel = grpc_channel_create_internal(
target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport, &error);
auto channel = grpc_core::Channel::Create(
target, grpc_core::ChannelArgs::FromC(final_args),
GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_channel_args_destroy(final_args);
if (channel != nullptr) {
if (channel.ok()) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_core::ExecCtx::Get()->Flush();
return channel->release()->c_ptr();
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
channel = grpc_lame_client_channel_create(
target, status, "Failed to create client channel");
return grpc_lame_client_channel_create(
target, static_cast<grpc_status_code>(channel.status().code()),
"Failed to create client channel");
}
return channel;
}
#else // !GPR_SUPPORT_CHANNELS_FROM_FD

@ -48,24 +48,19 @@ GRPCAPI grpc_channel* grpc_cronet_secure_channel_create(
target);
// Disable client authority filter when using Cronet
grpc_arg disable_client_authority_filter_arg;
disable_client_authority_filter_arg.key =
const_cast<char*>(GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER);
disable_client_authority_filter_arg.type = GRPC_ARG_INTEGER;
disable_client_authority_filter_arg.value.integer = 1;
args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
args, &disable_client_authority_filter_arg, 1);
.PreconditionChannelArgs(args)
.Set(GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER, 1)
.ToC();
grpc_transport* ct =
grpc_create_cronet_transport(engine, target, new_args, reserved);
grpc_create_cronet_transport(engine, target, args, reserved);
grpc_core::ExecCtx exec_ctx;
grpc_channel* channel = grpc_channel_create_internal(
target, new_args, GRPC_CLIENT_DIRECT_CHANNEL, ct, nullptr);
grpc_channel_args_destroy(new_args);
auto channel =
grpc_core::Channel::Create(target, grpc_core::ChannelArgs::FromC(args),
GRPC_CLIENT_DIRECT_CHANNEL, ct);
grpc_channel_args_destroy(args);
return channel;
return channel.ok() ? channel->release()->c_ptr() : nullptr;
}

@ -1264,7 +1264,8 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
args = grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
const grpc_channel_args* client_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
.PreconditionChannelArgs(args)
.ToC();
grpc_channel_args_destroy(args);
grpc_transport* server_transport;
grpc_transport* client_transport;
@ -1276,10 +1277,10 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
server_transport, nullptr, server_args, nullptr);
grpc_channel* channel = nullptr;
if (error == GRPC_ERROR_NONE) {
channel = grpc_channel_create_internal("inproc", client_args,
GRPC_CLIENT_DIRECT_CHANNEL,
client_transport, &error);
if (error != GRPC_ERROR_NONE) {
auto new_channel = grpc_core::Channel::Create(
"inproc", grpc_core::ChannelArgs::FromC(client_args),
GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
if (!new_channel.ok()) {
GPR_ASSERT(!channel);
gpr_log(GPR_ERROR, "Failed to create client channel: %s",
grpc_error_std_string(error).c_str());
@ -1294,6 +1295,8 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
grpc_transport_destroy(server_transport);
channel = grpc_lame_client_channel_create(
nullptr, status, "Failed to create client channel");
} else {
channel = new_channel->release()->c_ptr();
}
} else {
GPR_ASSERT(!channel);

@ -96,9 +96,10 @@ XdsChannelStackModifier::GetFromChannelArgs(const grpc_channel_args& args) {
void RegisterXdsChannelStackModifier(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, INT_MAX, [](ChannelStackBuilder* builder) {
const grpc_channel_args* channel_args = builder->channel_args().ToC();
RefCountedPtr<XdsChannelStackModifier> channel_stack_modifier =
XdsChannelStackModifier::GetFromChannelArgs(
*builder->channel_args());
XdsChannelStackModifier::GetFromChannelArgs(*channel_args);
grpc_channel_args_destroy(channel_args);
if (channel_stack_modifier != nullptr) {
return channel_stack_modifier->ModifyChannelStack(builder);
}

@ -566,7 +566,8 @@ void XdsClient::ChannelState::StartConnectivityWatchLocked() {
absl::UnavailableError("xds client has a lame channel"));
return;
}
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
ClientChannel* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr);
watcher_ = new StateWatcher(WeakRef(DEBUG_LOCATION, "ChannelState+watch"));
client_channel->AddConnectivityWatcher(
@ -578,7 +579,8 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
if (IsLameChannel(channel_)) {
return;
}
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
ClientChannel* client_channel =
ClientChannel::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_);
}

@ -1287,7 +1287,8 @@ grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create(
grpc_core::ExecCtx exec_ctx;
args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
.PreconditionChannelArgs(args)
.ToC();
GRPC_API_TRACE(
"grpc_server_config_fetcher_xds_create(notifier={on_serving_status_"
"update=%p, user_data=%p}, args=%p)",

@ -160,6 +160,43 @@ void* ChannelArgs::GetVoidPointer(absl::string_view name) const {
return absl::get<Pointer>(*v).c_pointer();
}
absl::optional<bool> ChannelArgs::GetBool(absl::string_view name) const {
auto* v = Get(name);
if (v == nullptr) return absl::nullopt;
auto* i = absl::get_if<int>(v);
if (i == nullptr) {
gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
std::string(name).c_str());
return absl::nullopt;
}
switch (*i) {
case 0:
return false;
case 1:
return true;
default:
gpr_log(GPR_ERROR, "%s treated as bool but set to %d (assuming true)",
std::string(name).c_str(), *i);
return true;
}
}
std::string ChannelArgs::ToString() const {
std::vector<std::string> arg_strings;
args_.ForEach([&arg_strings](const std::string& key, const Value& value) {
std::string value_str;
if (auto* i = absl::get_if<int>(&value)) {
value_str = std::to_string(*i);
} else if (auto* s = absl::get_if<std::string>(&value)) {
value_str = *s;
} else if (auto* p = absl::get_if<Pointer>(&value)) {
value_str = absl::StrFormat("%p", p->c_pointer());
}
arg_strings.push_back(absl::StrCat(key, "=", value_str));
});
return absl::StrCat("{", absl::StrJoin(arg_strings, ", "), "}");
}
} // namespace grpc_core
static grpc_arg copy_arg(const grpc_arg* src) {
@ -461,27 +498,7 @@ grpc_arg grpc_channel_arg_pointer_create(
}
std::string grpc_channel_args_string(const grpc_channel_args* args) {
if (args == nullptr) return "";
std::vector<std::string> arg_strings;
for (size_t i = 0; i < args->num_args; ++i) {
const grpc_arg& arg = args->args[i];
std::string arg_string;
switch (arg.type) {
case GRPC_ARG_INTEGER:
arg_string = absl::StrFormat("%s=%d", arg.key, arg.value.integer);
break;
case GRPC_ARG_STRING:
arg_string = absl::StrFormat("%s=%s", arg.key, arg.value.string);
break;
case GRPC_ARG_POINTER:
arg_string = absl::StrFormat("%s=%p", arg.key, arg.value.pointer.p);
break;
default:
arg_string = "arg with unknown type";
}
arg_strings.push_back(arg_string);
}
return absl::StrJoin(arg_strings, ", ");
return grpc_core::ChannelArgs::FromC(args).ToString();
}
namespace grpc_core {

@ -81,6 +81,11 @@ struct ChannelArgTypeTraits<
};
};
// If a type declares some member 'struct RawPointerChannelArgTag {}' then
// we automatically generate a vtable for it that does not do any ownership
// management and compares the type by pointer identity.
// This is intended to be relatively ugly because *most types should worry about
// ownership*.
template <typename T>
struct ChannelArgTypeTraits<T,
absl::void_t<typename T::RawPointerChannelArgTag>> {
@ -191,6 +196,11 @@ class ChannelArgs {
ChannelArgTypeTraits<
absl::remove_cvref_t<decltype(*store_value)>>::VTable()));
}
template <typename T>
GRPC_MUST_USE_RESULT ChannelArgs SetIfUnset(absl::string_view name, T value) {
if (Contains(name)) return *this;
return Set(name, std::move(value));
}
GRPC_MUST_USE_RESULT ChannelArgs Remove(absl::string_view name) const;
bool Contains(absl::string_view name) const { return Get(name) != nullptr; }
@ -203,6 +213,7 @@ class ChannelArgs {
}
absl::optional<Duration> GetDurationFromIntMillis(
absl::string_view name) const;
absl::optional<bool> GetBool(absl::string_view name) const;
// Object based get/set.
// Deal with the common case that we set a pointer to an object under
@ -221,12 +232,26 @@ class ChannelArgs {
T* GetObject() {
return GetPointer<T>(T::ChannelArgName());
}
template <typename T>
RefCountedPtr<T> GetObjectRef() {
auto* p = GetObject<T>();
if (p == nullptr) return nullptr;
return p->Ref();
}
bool operator<(const ChannelArgs& other) const { return args_ < other.args_; }
bool operator==(const ChannelArgs& other) const {
return args_ == other.args_;
}
// Helpers for commonly accessed things
bool WantMinimalStack() const {
return GetBool(GRPC_ARG_MINIMAL_STACK).value_or(false);
}
std::string ToString() const;
private:
explicit ChannelArgs(AVL<std::string, Value> args) : args_(std::move(args)) {}
@ -333,8 +358,9 @@ ChannelArgs ChannelArgsBuiltinPrecondition(const grpc_channel_args* src);
} // namespace grpc_core
// Takes ownership of the old_args
typedef grpc_channel_args* (*grpc_channel_args_client_channel_creation_mutator)(
const char* target, grpc_channel_args* old_args,
typedef grpc_core::ChannelArgs (
*grpc_channel_args_client_channel_creation_mutator)(
const char* target, grpc_core::ChannelArgs old_args,
grpc_channel_stack_type type);
// Should be called only once globaly before grpc is init'ed.

@ -30,13 +30,13 @@ ChannelArgsPreconditioning ChannelArgsPreconditioning::Builder::Build() {
return preconditioning;
}
const grpc_channel_args* ChannelArgsPreconditioning::PreconditionChannelArgs(
ChannelArgs ChannelArgsPreconditioning::PreconditionChannelArgs(
const grpc_channel_args* args) const {
ChannelArgs channel_args = ChannelArgsBuiltinPrecondition(args);
for (auto& stage : stages_) {
channel_args = stage(std::move(channel_args));
}
return channel_args.ToC();
return channel_args;
}
} // namespace grpc_core

@ -49,8 +49,7 @@ class ChannelArgsPreconditioning {
// Take channel args and precondition them.
// Does not take ownership of the channel args passed in.
// Returns a new channel args object that is owned by the caller.
const grpc_channel_args* PreconditionChannelArgs(
const grpc_channel_args* args) const;
ChannelArgs PreconditionChannelArgs(const grpc_channel_args* args) const;
private:
std::vector<Stage> stages_;

@ -113,6 +113,8 @@ grpc_error_handle grpc_channel_stack_init(
}
}
stack->on_destroy.Init([]() {});
size_t call_size =
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
@ -169,6 +171,9 @@ void grpc_channel_stack_destroy(grpc_channel_stack* stack) {
for (i = 0; i < count; i++) {
channel_elems[i].filter->destroy_channel_elem(&channel_elems[i]);
}
(*stack->on_destroy)();
stack->on_destroy.Destroy();
}
grpc_error_handle grpc_call_stack_init(

@ -199,6 +199,11 @@ struct grpc_channel_stack {
/* Memory required for a call stack (computed at channel stack
initialization) */
size_t call_stack_size;
// TODO(ctiller): remove this mechanism... it's a hack to allow
// Channel to be separated from grpc_channel_stack's allocation. As the
// promise conversion continues, we'll reconsider what grpc_channel_stack
// should look like and this can go.
grpc_core::ManualConstructor<std::function<void()>> on_destroy;
// Minimal infrastructure to act like a RefCounted thing without converting
// everything.

@ -31,9 +31,7 @@
namespace grpc_core {
ChannelStackBuilder::~ChannelStackBuilder() {
grpc_channel_args_destroy(args_);
}
ChannelStackBuilder::~ChannelStackBuilder() = default;
ChannelStackBuilder& ChannelStackBuilder::SetTarget(const char* target) {
if (target == nullptr) {
@ -44,10 +42,8 @@ ChannelStackBuilder& ChannelStackBuilder::SetTarget(const char* target) {
return *this;
}
ChannelStackBuilder& ChannelStackBuilder::SetChannelArgs(
const grpc_channel_args* args) {
grpc_channel_args_destroy(args_);
args_ = grpc_channel_args_copy(args);
ChannelStackBuilder& ChannelStackBuilder::SetChannelArgs(ChannelArgs args) {
args_ = std::move(args);
return *this;
}

@ -22,6 +22,7 @@
#include <functional>
#include <vector>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "src/core/lib/channel/channel_args.h"
@ -77,10 +78,10 @@ class ChannelStackBuilder {
grpc_transport* transport() const { return transport_; }
// Set channel args (takes a copy of them).
ChannelStackBuilder& SetChannelArgs(const grpc_channel_args* args);
ChannelStackBuilder& SetChannelArgs(ChannelArgs args);
// Query the channel args.
const grpc_channel_args* channel_args() const { return args_; }
const ChannelArgs& channel_args() const { return args_; }
// Mutable vector of proposed stack entries.
std::vector<StackEntry>* mutable_stack() { return &stack_; }
@ -97,11 +98,9 @@ class ChannelStackBuilder {
// Build the channel stack.
// After success, *result holds the new channel stack,
// prefix_bytes are allocated before the channel stack,
// initial_refs, destroy, destroy_arg are as per grpc_channel_stack_init
// destroy is as per grpc_channel_stack_init
// On failure, *result is nullptr.
virtual grpc_error_handle Build(size_t prefix_bytes, int initial_refs,
grpc_iomgr_cb_func destroy, void* destroy_arg,
void** result) = 0;
virtual absl::StatusOr<RefCountedPtr<grpc_channel_stack>> Build() = 0;
protected:
~ChannelStackBuilder();
@ -118,10 +117,11 @@ class ChannelStackBuilder {
// The transport
grpc_transport* transport_ = nullptr;
// Channel args
const grpc_channel_args* args_ = nullptr;
ChannelArgs args_;
// The in-progress stack
std::vector<StackEntry> stack_;
};
} // namespace grpc_core
#endif // GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H

@ -28,14 +28,12 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_core {
grpc_error_handle ChannelStackBuilderImpl::Build(size_t prefix_bytes,
int initial_refs,
grpc_iomgr_cb_func destroy,
void* destroy_arg,
void** result) {
absl::StatusOr<RefCountedPtr<grpc_channel_stack>>
ChannelStackBuilderImpl::Build() {
auto* stack = mutable_stack();
// create an array of filters
@ -49,13 +47,11 @@ grpc_error_handle ChannelStackBuilderImpl::Build(size_t prefix_bytes,
size_t channel_stack_size =
grpc_channel_stack_size(filters.data(), filters.size());
// allocate memory, with prefix_bytes followed by channel_stack_size
*result = gpr_zalloc(prefix_bytes + channel_stack_size);
// fetch a pointer to the channel stack
grpc_channel_stack* channel_stack = reinterpret_cast<grpc_channel_stack*>(
static_cast<char*>(*result) + prefix_bytes);
// allocate memory
auto* channel_stack =
static_cast<grpc_channel_stack*>(gpr_zalloc(channel_stack_size));
const grpc_channel_args* final_args;
ChannelArgs final_args = channel_args();
if (transport() != nullptr) {
static const grpc_arg_pointer_vtable vtable = {
// copy
@ -65,27 +61,29 @@ grpc_error_handle ChannelStackBuilderImpl::Build(size_t prefix_bytes,
// cmp
[](void* a, void* b) { return QsortCompare(a, b); },
};
grpc_arg arg = grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_TRANSPORT), transport(), &vtable);
final_args = grpc_channel_args_copy_and_add(channel_args(), &arg, 1);
} else {
final_args = channel_args();
final_args = final_args.Set(GRPC_ARG_TRANSPORT,
ChannelArgs::Pointer(transport(), &vtable));
}
// and initialize it
const grpc_channel_args* c_args = final_args.ToC();
grpc_error_handle error = grpc_channel_stack_init(
initial_refs, destroy, destroy_arg == nullptr ? *result : destroy_arg,
filters.data(), filters.size(), final_args, name(), channel_stack);
if (final_args != channel_args()) {
grpc_channel_args_destroy(final_args);
}
1,
[](void* p, grpc_error_handle) {
auto* stk = static_cast<grpc_channel_stack*>(p);
grpc_channel_stack_destroy(stk);
gpr_free(stk);
},
channel_stack, filters.data(), filters.size(), c_args, name(),
channel_stack);
grpc_channel_args_destroy(c_args);
if (error != GRPC_ERROR_NONE) {
grpc_channel_stack_destroy(channel_stack);
gpr_free(*result);
*result = nullptr;
return error;
gpr_free(channel_stack);
auto status = grpc_error_to_absl_status(error);
GRPC_ERROR_UNREF(error);
return status;
}
// run post-initialization functions
@ -96,7 +94,7 @@ grpc_error_handle ChannelStackBuilderImpl::Build(size_t prefix_bytes,
}
}
return GRPC_ERROR_NONE;
return RefCountedPtr<grpc_channel_stack>(channel_stack);
}
} // namespace grpc_core

@ -39,9 +39,7 @@ class ChannelStackBuilderImpl final : public ChannelStackBuilder {
// prefix_bytes are allocated before the channel stack,
// initial_refs, destroy, destroy_arg are as per grpc_channel_stack_init
// On failure, *result is nullptr.
grpc_error_handle Build(size_t prefix_bytes, int initial_refs,
grpc_iomgr_cb_func destroy, void* destroy_arg,
void** result) override;
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> Build() override;
};
} // namespace grpc_core

@ -176,6 +176,10 @@ class ChannelNode : public BaseNode {
ChannelNode(std::string target, size_t channel_tracer_max_nodes,
bool is_internal_channel);
static absl::string_view ChannelArgName() {
return GRPC_ARG_CHANNELZ_CHANNEL_NODE;
}
// Returns the string description of the given connectivity state.
static const char* GetChannelConnectivityStateChangeString(
grpc_connectivity_state state);

@ -156,7 +156,8 @@ HttpRequest::HttpRequest(
deadline_(deadline),
channel_args_(CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(channel_args)),
.PreconditionChannelArgs(channel_args)
.ToC()),
channel_creds_(std::move(channel_creds)),
on_done_(on_done),
resource_quota_(ResourceQuotaFromChannelArgs(channel_args_)),

@ -178,10 +178,6 @@ class HttpRequestSSLCredentials : public grpc_channel_credentials {
return Ref();
}
grpc_channel_args* update_arguments(grpc_channel_args* args) override {
return args;
}
const char* type() const override { return "HttpRequestSSL"; }
private:

@ -62,7 +62,8 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
std::string final_name = absl::StrCat(name, ":client");
const grpc_channel_args* new_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
.PreconditionChannelArgs(args)
.ToC();
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name.c_str(), false),
new_args, "socketpair-server");
final_name = absl::StrCat(name, ":server");

@ -154,8 +154,7 @@ std::string ServerAddress::ToString() const {
addr_str.ok() ? addr_str.value() : addr_str.status().ToString(),
};
if (args_ != nullptr) {
parts.emplace_back(
absl::StrCat("args={", grpc_channel_args_string(args_), "}"));
parts.emplace_back(absl::StrCat("args=", grpc_channel_args_string(args_)));
}
if (!attributes_.empty()) {
std::vector<std::string> attrs;

@ -96,6 +96,9 @@ class ServerAddress {
ServerAddress WithAttribute(const char* key,
std::unique_ptr<AttributeInterface> value) const;
// TODO(ctiller): Prior to making this a public API we should ensure that the
// channel args are not part of the generated string, lest we make that debug
// format load-bearing via Hyrum's law.
std::string ToString() const;
private:

@ -51,7 +51,8 @@ class grpc_composite_channel_credentials : public grpc_channel_credentials {
const char* target, const grpc_channel_args* args,
grpc_channel_args** new_args) override;
grpc_channel_args* update_arguments(grpc_channel_args* args) override {
grpc_core::ChannelArgs update_arguments(
grpc_core::ChannelArgs args) override {
return inner_creds_->update_arguments(args);
}

@ -92,6 +92,15 @@ void grpc_override_well_known_credentials_path_getter(
struct grpc_channel_credentials
: grpc_core::RefCounted<grpc_channel_credentials> {
public:
static absl::string_view ChannelArgName() {
return GRPC_ARG_CHANNEL_CREDENTIALS;
}
static int ChannelArgsCompare(const grpc_channel_credentials* args1,
const grpc_channel_credentials* args2) {
return args1->cmp(args2);
}
// Creates a security connector for the channel. May also create new channel
// args for the channel to be used in place of the passed in const args if
// returned non NULL. In that case the caller is responsible for destroying
@ -115,7 +124,7 @@ struct grpc_channel_credentials
// By default, leave channel args as is. The callee takes ownership
// of the passed-in channel args, and the caller takes ownership
// of the returned channel args.
virtual grpc_channel_args* update_arguments(grpc_channel_args* args) {
virtual grpc_core::ChannelArgs update_arguments(grpc_core::ChannelArgs args) {
return args;
}

@ -125,17 +125,10 @@ grpc_google_default_channel_credentials::create_security_connector(
return sc;
}
grpc_channel_args* grpc_google_default_channel_credentials::update_arguments(
grpc_channel_args* args) {
grpc_channel_args* updated = args;
if (grpc_channel_args_find(args, GRPC_ARG_DNS_ENABLE_SRV_QUERIES) ==
nullptr) {
grpc_arg new_srv_arg = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_DNS_ENABLE_SRV_QUERIES), true);
updated = grpc_channel_args_copy_and_add(args, &new_srv_arg, 1);
grpc_channel_args_destroy(args);
}
return updated;
grpc_core::ChannelArgs
grpc_google_default_channel_credentials::update_arguments(
grpc_core::ChannelArgs args) {
return args.SetIfUnset(GRPC_ARG_DNS_ENABLE_SRV_QUERIES, true);
}
const char* grpc_google_default_channel_credentials::type() const {

@ -56,7 +56,7 @@ class grpc_google_default_channel_credentials
const char* target, const grpc_channel_args* args,
grpc_channel_args** new_args) override;
grpc_channel_args* update_arguments(grpc_channel_args* args) override;
grpc_core::ChannelArgs update_arguments(grpc_core::ChannelArgs args) override;
const char* type() const override;

@ -297,7 +297,7 @@ class FilterStackCall final : public Call {
FilterStackCall(Arena* arena, const grpc_call_create_args& args)
: Call(arena, args.server_transport_data == nullptr, args.send_deadline),
cq_(args.cq),
channel_(args.channel),
channel_(args.channel->Ref()),
stream_op_payload_(context_) {}
static void ReleaseCall(void* call, grpc_error_handle);
@ -328,7 +328,7 @@ class FilterStackCall final : public Call {
CallCombiner call_combiner_;
grpc_completion_queue* cq_;
grpc_polling_entity pollent_;
grpc_channel* channel_;
RefCountedPtr<Channel> channel_;
gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
/** has grpc_call_unref been called */
@ -499,7 +499,7 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
grpc_call** out_call) {
GPR_TIMER_SCOPE("grpc_call_create", 0);
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
Channel* channel = args->channel.get();
auto add_init_error = [](grpc_error_handle* composite,
grpc_error_handle new_err) {
@ -513,16 +513,15 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
Arena* arena;
FilterStackCall* call;
grpc_error_handle error = GRPC_ERROR_NONE;
grpc_channel_stack* channel_stack =
grpc_channel_get_channel_stack(args->channel);
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
grpc_channel_stack* channel_stack = channel->channel_stack();
size_t initial_size = channel->CallSizeEstimate();
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
size_t call_alloc_size =
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)) +
channel_stack->call_stack_size;
std::pair<Arena*, void*> arena_with_call = Arena::CreateWithAlloc(
initial_size, call_alloc_size, &*args->channel->allocator);
initial_size, call_alloc_size, channel->allocator());
arena = arena_with_call.first;
call = new (arena_with_call.second) FilterStackCall(arena, *args);
GPR_DEBUG_ASSERT(FromC(call->c_ptr()) == call);
@ -586,8 +585,7 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
}
if (call->is_client()) {
channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel_);
channelz::ChannelNode* channelz_channel = channel->channelz_node();
if (channelz_channel != nullptr) {
channelz_channel->RecordCallStarted();
}
@ -619,11 +617,10 @@ void FilterStackCall::SetCompletionQueue(grpc_completion_queue* cq) {
void FilterStackCall::ReleaseCall(void* call, grpc_error_handle /*error*/) {
auto* c = static_cast<FilterStackCall*>(call);
grpc_channel* channel = c->channel_;
RefCountedPtr<Channel> channel = std::move(c->channel_);
Arena* arena = c->arena();
c->~FilterStackCall();
grpc_channel_update_call_size_estimate(channel, arena->Destroy());
GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
channel->UpdateCallSizeEstimate(arena->Destroy());
}
void FilterStackCall::DestroyCall(void* call, grpc_error_handle /*error*/) {
@ -702,7 +699,7 @@ void FilterStackCall::ExternalUnref() {
char* FilterStackCall::GetPeer() {
char* peer_string = reinterpret_cast<char*>(gpr_atm_acq_load(&peer_string_));
if (peer_string != nullptr) return gpr_strdup(peer_string);
peer_string = grpc_channel_get_target(channel_);
peer_string = grpc_channel_get_target(channel_->c_ptr());
if (peer_string != nullptr) return peer_string;
return gpr_strdup("unknown");
}
@ -793,8 +790,7 @@ void FilterStackCall::SetFinalStatus(grpc_error_handle error) {
grpc_slice_from_cpp_string(std::move(status_details));
status_error_.set(error);
GRPC_ERROR_UNREF(error);
channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(channel_);
channelz::ChannelNode* channelz_channel = channel_->channelz_node();
if (channelz_channel != nullptr) {
if (*final_op_.client.status != GRPC_STATUS_OK) {
channelz_channel->RecordCallFailed();
@ -1250,7 +1246,7 @@ void FilterStackCall::BatchControl::ValidateFilteredMetadata() {
FilterStackCall* call = call_;
const grpc_compression_options compression_options =
grpc_channel_compression_options(call->channel_);
call->channel_->compression_options();
const grpc_compression_algorithm compression_algorithm =
call->incoming_compression_algorithm_;
if (GPR_UNLIKELY(!CompressionAlgorithmSet::FromUint32(
@ -1432,7 +1428,7 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
level_set = true;
} else {
const grpc_compression_options copts =
grpc_channel_compression_options(channel_);
channel_->compression_options();
if (copts.default_level.is_set) {
level_set = true;
effective_compression_level = copts.default_level.level;

@ -28,13 +28,14 @@
#include "src/core/lib/channel/context.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/server.h"
typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
void* user_data);
typedef struct grpc_call_create_args {
grpc_channel* channel;
grpc_core::RefCountedPtr<grpc_core::Channel> channel;
grpc_core::Server* server;
grpc_call* parent;

@ -25,6 +25,8 @@
#include <stdlib.h>
#include <string.h>
#include <atomic>
#include <grpc/compression.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -47,207 +49,162 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/error_utils.h"
/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS.
* Avoids needing to take a metadata context lock for sending status
* if the status code is <= NUM_CACHED_STATUS_ELEMS.
* Sized to allow the most commonly used codes to fit in
* (OK, Cancelled, Unknown). */
#define NUM_CACHED_STATUS_ELEMS 3
namespace grpc_core {
static void destroy_channel(void* arg, grpc_error_handle error);
Channel::Channel(bool is_client, std::string target, ChannelArgs channel_args,
grpc_compression_options compression_options,
RefCountedPtr<grpc_channel_stack> channel_stack)
: is_client_(is_client),
compression_options_(compression_options),
call_size_estimate_(channel_stack->call_stack_size +
grpc_call_get_initial_size_estimate()),
channelz_node_(channel_args.GetObjectRef<channelz::ChannelNode>()),
allocator_(channel_args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryOwner(target)),
target_(std::move(target)),
channel_stack_(std::move(channel_stack)) {
// We need to make sure that grpc_shutdown() does not shut things down
// until after the channel is destroyed. However, the channel may not
// actually be destroyed by the time grpc_channel_destroy() returns,
// since there may be other existing refs to the channel. If those
// refs are held by things that are visible to the wrapped language
// (such as outstanding calls on the channel), then the wrapped
// language can be responsible for making sure that grpc_shutdown()
// does not run until after those refs are released. However, the
// channel may also have refs to itself held internally for various
// things that need to be cleaned up at channel destruction (e.g.,
// LB policies, subchannels, etc), and because these refs are not
// visible to the wrapped language, it cannot be responsible for
// deferring grpc_shutdown() until after they are released. To
// accommodate that, we call grpc_init() here and then call
// grpc_shutdown() when the channel is actually destroyed, thus
// ensuring that shutdown is deferred until that point.
grpc_init();
auto channelz_node = channelz_node_;
*channel_stack_->on_destroy = [channelz_node]() {
if (channelz_node != nullptr) {
channelz_node->AddTraceEvent(
channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel destroyed"));
}
grpc_shutdown();
};
}
grpc_channel* grpc_channel_create_with_builder(
grpc_core::ChannelStackBuilder* builder,
grpc_channel_stack_type channel_stack_type, grpc_error_handle* error) {
std::string target(builder->target());
grpc_channel_args* args = grpc_channel_args_copy(builder->channel_args());
grpc_channel* channel;
if (channel_stack_type == GRPC_SERVER_CHANNEL) {
absl::StatusOr<RefCountedPtr<Channel>> Channel::CreateWithBuilder(
ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (builder->channel_stack_type() == GRPC_SERVER_CHANNEL) {
GRPC_STATS_INC_SERVER_CHANNELS_CREATED();
} else {
GRPC_STATS_INC_CLIENT_CHANNELS_CREATED();
}
std::string name(builder->target());
grpc_error_handle builder_error =
builder->Build(sizeof(grpc_channel), 1, destroy_channel, nullptr,
reinterpret_cast<void**>(&channel));
if (builder_error != GRPC_ERROR_NONE) {
absl::StatusOr<RefCountedPtr<grpc_channel_stack>> r = builder->Build();
if (!r.ok()) {
auto status = r.status();
gpr_log(GPR_ERROR, "channel stack builder failed: %s",
grpc_error_std_string(builder_error).c_str());
GPR_ASSERT(channel == nullptr);
if (error != nullptr) {
*error = builder_error;
} else {
GRPC_ERROR_UNREF(builder_error);
}
grpc_channel_args_destroy(args);
return nullptr;
status.ToString().c_str());
return status;
}
channel->target.Init(std::move(target));
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
channel->registration_table.Init();
channel->allocator.Init(grpc_core::ResourceQuotaFromChannelArgs(args)
->memory_quota()
->CreateMemoryOwner(name));
gpr_atm_no_barrier_store(
&channel->call_size_estimate,
(gpr_atm)CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size +
grpc_call_get_initial_size_estimate());
grpc_compression_options_init(&channel->compression_options);
for (size_t i = 0; i < args->num_args; i++) {
if (0 ==
strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) {
channel->compression_options.default_level.is_set = true;
channel->compression_options.default_level.level =
static_cast<grpc_compression_level>(grpc_channel_arg_get_integer(
&args->args[i],
{GRPC_COMPRESS_LEVEL_NONE, GRPC_COMPRESS_LEVEL_NONE,
GRPC_COMPRESS_LEVEL_COUNT - 1}));
} else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
channel->compression_options.default_algorithm.is_set = true;
channel->compression_options.default_algorithm.algorithm =
static_cast<grpc_compression_algorithm>(grpc_channel_arg_get_integer(
&args->args[i], {GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_ALGORITHMS_COUNT - 1}));
} else if (0 ==
strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
channel->compression_options.enabled_algorithms_bitset =
static_cast<uint32_t>(args->args[i].value.integer) |
0x1; /* always support no compression */
} else if (0 == strcmp(args->args[i].key, GRPC_ARG_CHANNELZ_CHANNEL_NODE)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
GPR_ASSERT(args->args[i].value.pointer.p != nullptr);
channel->channelz_node = static_cast<grpc_core::channelz::ChannelNode*>(
args->args[i].value.pointer.p)
->Ref();
} else {
gpr_log(GPR_DEBUG,
GRPC_ARG_CHANNELZ_CHANNEL_NODE " should be a pointer");
}
}
}
grpc_channel_args_destroy(args);
return channel;
}
static grpc_core::UniquePtr<char> get_default_authority(
const grpc_channel_args* input_args) {
bool has_default_authority = false;
char* ssl_override = nullptr;
grpc_core::UniquePtr<char> default_authority;
const size_t num_args = input_args != nullptr ? input_args->num_args : 0;
for (size_t i = 0; i < num_args; ++i) {
if (0 == strcmp(input_args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) {
has_default_authority = true;
} else if (0 == strcmp(input_args->args[i].key,
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
ssl_override = grpc_channel_arg_get_string(&input_args->args[i]);
}
grpc_compression_options compression_options;
grpc_compression_options_init(&compression_options);
auto default_level =
channel_args.GetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL);
if (default_level.has_value()) {
compression_options.default_level.is_set = true;
compression_options.default_level.level = Clamp(
static_cast<grpc_compression_level>(*default_level),
GRPC_COMPRESS_LEVEL_NONE,
static_cast<grpc_compression_level>(GRPC_COMPRESS_LEVEL_COUNT - 1));
}
if (!has_default_authority && ssl_override != nullptr) {
default_authority.reset(gpr_strdup(ssl_override));
auto default_algorithm =
channel_args.GetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM);
if (default_algorithm.has_value()) {
compression_options.default_algorithm.is_set = true;
compression_options.default_algorithm.algorithm =
Clamp(static_cast<grpc_compression_algorithm>(*default_algorithm),
GRPC_COMPRESS_NONE,
static_cast<grpc_compression_algorithm>(
GRPC_COMPRESS_ALGORITHMS_COUNT - 1));
}
return default_authority;
}
static grpc_channel_args* build_channel_args(
const grpc_channel_args* input_args, char* default_authority) {
grpc_arg new_args[1];
size_t num_new_args = 0;
if (default_authority != nullptr) {
new_args[num_new_args++] = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY), default_authority);
auto enabled_algorithms_bitset =
channel_args.GetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET);
if (enabled_algorithms_bitset.has_value()) {
compression_options.enabled_algorithms_bitset =
*enabled_algorithms_bitset | 1 /* always support no compression */;
}
return grpc_channel_args_copy_and_add(input_args, new_args, num_new_args);
return RefCountedPtr<Channel>(new Channel(
grpc_channel_stack_type_is_client(builder->channel_stack_type()),
std::string(builder->target()), std::move(channel_args),
compression_options, std::move(*r)));
}
namespace {
void* channelz_node_copy(void* p) {
grpc_core::channelz::ChannelNode* node =
static_cast<grpc_core::channelz::ChannelNode*>(p);
channelz::ChannelNode* node = static_cast<channelz::ChannelNode*>(p);
node->Ref().release();
return p;
}
void channelz_node_destroy(void* p) {
grpc_core::channelz::ChannelNode* node =
static_cast<grpc_core::channelz::ChannelNode*>(p);
channelz::ChannelNode* node = static_cast<channelz::ChannelNode*>(p);
node->Unref();
}
int channelz_node_cmp(void* p1, void* p2) {
return grpc_core::QsortCompare(p1, p2);
}
int channelz_node_cmp(void* p1, void* p2) { return QsortCompare(p1, p2); }
const grpc_arg_pointer_vtable channelz_node_arg_vtable = {
channelz_node_copy, channelz_node_destroy, channelz_node_cmp};
void CreateChannelzNode(grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* args = builder->channel_args();
void CreateChannelzNode(ChannelStackBuilder* builder) {
auto args = builder->channel_args();
// Check whether channelz is enabled.
const bool channelz_enabled = grpc_channel_args_find_bool(
args, GRPC_ARG_ENABLE_CHANNELZ, GRPC_ENABLE_CHANNELZ_DEFAULT);
const bool channelz_enabled = args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT);
if (!channelz_enabled) return;
// Get parameters needed to create the channelz node.
const size_t channel_tracer_max_memory = grpc_channel_args_find_integer(
args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE,
{GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
const bool is_internal_channel = grpc_channel_args_find_bool(
args, GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL, false);
const size_t channel_tracer_max_memory = std::max(
0, args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
.value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT));
const bool is_internal_channel =
args.GetBool(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL).value_or(false);
// Create the channelz node.
std::string target(builder->target());
grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_node =
grpc_core::MakeRefCounted<grpc_core::channelz::ChannelNode>(
RefCountedPtr<channelz::ChannelNode> channelz_node =
MakeRefCounted<channelz::ChannelNode>(
target.c_str(), channel_tracer_max_memory, is_internal_channel);
channelz_node->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
// Add channelz node to channel args.
// We remove the is_internal_channel arg, since we no longer need it.
grpc_arg new_arg = grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_NODE), channelz_node.get(),
&channelz_node_arg_vtable);
const char* args_to_remove[] = {GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
builder->SetChannelArgs(new_args);
grpc_channel_args_destroy(new_args);
builder->SetChannelArgs(
args.Remove(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL)
.Set(GRPC_ARG_CHANNELZ_CHANNEL_NODE,
ChannelArgs::Pointer(channelz_node.release(),
&channelz_node_arg_vtable)));
}
} // namespace
grpc_channel* grpc_channel_create_internal(
const char* target, const grpc_channel_args* input_args,
absl::StatusOr<RefCountedPtr<Channel>> Channel::Create(
const char* target, ChannelArgs args,
grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport, grpc_error_handle* error) {
// We need to make sure that grpc_shutdown() does not shut things down
// until after the channel is destroyed. However, the channel may not
// actually be destroyed by the time grpc_channel_destroy() returns,
// since there may be other existing refs to the channel. If those
// refs are held by things that are visible to the wrapped language
// (such as outstanding calls on the channel), then the wrapped
// language can be responsible for making sure that grpc_shutdown()
// does not run until after those refs are released. However, the
// channel may also have refs to itself held internally for various
// things that need to be cleaned up at channel destruction (e.g.,
// LB policies, subchannels, etc), and because these refs are not
// visible to the wrapped language, it cannot be responsible for
// deferring grpc_shutdown() until after they are released. To
// accommodate that, we call grpc_init() here and then call
// grpc_shutdown() when the channel is actually destroyed, thus
// ensuring that shutdown is deferred until that point.
grpc_init();
grpc_core::ChannelStackBuilderImpl builder(
grpc_transport* optional_transport) {
ChannelStackBuilderImpl builder(
grpc_channel_stack_type_string(channel_stack_type), channel_stack_type);
const grpc_core::UniquePtr<char> default_authority =
get_default_authority(input_args);
grpc_channel_args* args =
build_channel_args(input_args, default_authority.get());
if (!args.GetString(GRPC_ARG_DEFAULT_AUTHORITY).has_value()) {
auto ssl_override = args.GetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
if (ssl_override.has_value()) {
args = args.Set(GRPC_ARG_DEFAULT_AUTHORITY,
std::string(ssl_override.value()));
}
}
if (grpc_channel_stack_type_is_client(channel_stack_type)) {
auto channel_args_mutator =
grpc_channel_args_get_client_channel_creation_mutator();
@ -255,12 +212,10 @@ grpc_channel* grpc_channel_create_internal(
args = channel_args_mutator(target, args, channel_stack_type);
}
}
builder.SetChannelArgs(args).SetTarget(target).SetTransport(
optional_transport);
grpc_channel_args_destroy(args);
if (!grpc_core::CoreConfiguration::Get().channel_init().CreateStack(
&builder)) {
grpc_shutdown(); // Since we won't call destroy_channel().
builder.SetChannelArgs(std::move(args))
.SetTarget(target)
.SetTransport(optional_transport);
if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
return nullptr;
}
// We only need to do this for clients here. For servers, this will be
@ -268,60 +223,43 @@ grpc_channel* grpc_channel_create_internal(
if (grpc_channel_stack_type_is_client(channel_stack_type)) {
CreateChannelzNode(&builder);
}
grpc_channel* channel =
grpc_channel_create_with_builder(&builder, channel_stack_type, error);
if (channel == nullptr) {
grpc_shutdown(); // Since we won't call destroy_channel().
}
return channel;
}
size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) {
#define ROUND_UP_SIZE 256
/* We round up our current estimate to the NEXT value of ROUND_UP_SIZE.
This ensures:
1. a consistent size allocation when our estimate is drifting slowly
(which is common) - which tends to help most allocators reuse memory
2. a small amount of allowed growth over the estimate without hitting
the arena size doubling case, reducing overall memory usage */
return (static_cast<size_t>(
gpr_atm_no_barrier_load(&channel->call_size_estimate)) +
2 * ROUND_UP_SIZE) &
~static_cast<size_t>(ROUND_UP_SIZE - 1);
return CreateWithBuilder(&builder);
}
void grpc_channel_update_call_size_estimate(grpc_channel* channel,
size_t size) {
size_t cur = static_cast<size_t>(
gpr_atm_no_barrier_load(&channel->call_size_estimate));
void Channel::UpdateCallSizeEstimate(size_t size) {
size_t cur = call_size_estimate_.load(std::memory_order_relaxed);
if (cur < size) {
/* size grew: update estimate */
gpr_atm_no_barrier_cas(&channel->call_size_estimate,
static_cast<gpr_atm>(cur),
static_cast<gpr_atm>(size));
/* if we lose: never mind, something else will likely update soon enough */
// size grew: update estimate
call_size_estimate_.compare_exchange_weak(
cur, size, std::memory_order_relaxed, std::memory_order_relaxed);
// if we lose: never mind, something else will likely update soon enough
} else if (cur == size) {
/* no change: holding pattern */
// no change: holding pattern
} else if (cur > 0) {
/* size shrank: decrease estimate */
gpr_atm_no_barrier_cas(
&channel->call_size_estimate, static_cast<gpr_atm>(cur),
static_cast<gpr_atm>(std::min(cur - 1, (255 * cur + size) / 256)));
/* if we lose: never mind, something else will likely update soon enough */
// size shrank: decrease estimate
call_size_estimate_.compare_exchange_weak(
cur, std::min(cur - 1, (255 * cur + size) / 256),
std::memory_order_relaxed, std::memory_order_relaxed);
// if we lose: never mind, something else will likely update soon enough
}
}
} // namespace grpc_core
char* grpc_channel_get_target(grpc_channel* channel) {
GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel));
return gpr_strdup(channel->target->c_str());
auto target = grpc_core::Channel::FromC(channel)->target();
char* buffer = static_cast<char*>(gpr_zalloc(target.size() + 1));
memcpy(buffer, target.data(), target.size());
return buffer;
}
void grpc_channel_get_info(grpc_channel* channel,
const grpc_channel_info* channel_info) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_channel_element* elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
grpc_channel_element* elem = grpc_channel_stack_element(
grpc_core::Channel::FromC(channel)->channel_stack(), 0);
elem->filter->get_channel_info(elem, channel_info);
}
@ -332,21 +270,22 @@ void grpc_channel_reset_connect_backoff(grpc_channel* channel) {
(channel));
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->reset_connect_backoff = true;
grpc_channel_element* elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
grpc_channel_element* elem = grpc_channel_stack_element(
grpc_core::Channel::FromC(channel)->channel_stack(), 0);
elem->filter->start_transport_op(elem, op);
}
static grpc_call* grpc_channel_create_call_internal(
grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_channel* c_channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
grpc_core::Slice path, absl::optional<grpc_core::Slice> authority,
grpc_core::Timestamp deadline) {
GPR_ASSERT(channel->is_client);
auto channel = grpc_core::Channel::FromC(c_channel)->Ref();
GPR_ASSERT(channel->is_client());
GPR_ASSERT(!(cq != nullptr && pollset_set_alternative != nullptr));
grpc_call_create_args args;
args.channel = channel;
args.channel = std::move(channel);
args.server = nullptr;
args.parent = parent_call;
args.propagation_mask = propagation_mask;
@ -424,20 +363,27 @@ void* grpc_channel_register_call(grpc_channel* channel, const char* method,
GPR_ASSERT(!reserved);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
return grpc_core::Channel::FromC(channel)->RegisterCall(method, host);
}
namespace grpc_core {
grpc_core::MutexLock lock(&channel->registration_table->mu);
channel->registration_table->method_registration_attempts++;
RegisteredCall* Channel::RegisterCall(const char* method, const char* host) {
MutexLock lock(&registration_table_.mu);
registration_table_.method_registration_attempts++;
auto key = std::make_pair(std::string(host != nullptr ? host : ""),
std::string(method != nullptr ? method : ""));
auto rc_posn = channel->registration_table->map.find(key);
if (rc_posn != channel->registration_table->map.end()) {
auto rc_posn = registration_table_.map.find(key);
if (rc_posn != registration_table_.map.end()) {
return &rc_posn->second;
}
auto insertion_result = channel->registration_table->map.insert(
{std::move(key), grpc_core::RegisteredCall(method, host)});
auto insertion_result = registration_table_.map.insert(
{std::move(key), RegisteredCall(method, host)});
return &insertion_result.first->second;
}
} // namespace grpc_core
grpc_call* grpc_channel_create_registered_call(
grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* completion_queue, void* registered_call_handle,
@ -469,32 +415,16 @@ grpc_call* grpc_channel_create_registered_call(
return call;
}
static void destroy_channel(void* arg, grpc_error_handle /*error*/) {
grpc_channel* channel = static_cast<grpc_channel*>(arg);
if (channel->channelz_node != nullptr) {
channel->channelz_node->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel destroyed"));
channel->channelz_node.reset();
}
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
channel->registration_table.Destroy();
channel->allocator.Destroy();
channel->target.Destroy();
gpr_free(channel);
// See comment in grpc_channel_create_internal() for why we do this.
grpc_shutdown();
}
void grpc_channel_destroy_internal(grpc_channel* channel) {
void grpc_channel_destroy_internal(grpc_channel* c_channel) {
grpc_core::RefCountedPtr<grpc_core::Channel> channel(
grpc_core::Channel::FromC(c_channel));
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (c_channel));
op->disconnect_with_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Destroyed");
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
elem = grpc_channel_stack_element(channel->channel_stack(), 0);
elem->filter->start_transport_op(elem, op);
GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel");
}
void grpc_channel_destroy(grpc_channel* channel) {

@ -26,27 +26,15 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/cpp_impl_of.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/surface/channel_stack_type.h"
/// Creates a grpc_channel.
grpc_channel* grpc_channel_create_internal(
const char* target, const grpc_channel_args* args,
grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport, grpc_error_handle* error);
/** The same as grpc_channel_destroy, but doesn't create an ExecCtx, and so
* is safe to use from within core. */
void grpc_channel_destroy_internal(grpc_channel* channel);
/// Creates a grpc_channel with a builder. See the description of
/// \a grpc_channel_create for variable definitions.
grpc_channel* grpc_channel_create_with_builder(
grpc_core::ChannelStackBuilder* builder,
grpc_channel_stack_type channel_stack_type,
grpc_error_handle* error = nullptr);
/** Create a call given a grpc_channel, in order to call \a method.
Progress is tied to activity on \a pollset_set. The returned call object is
meant to be used with \a grpc_call_start_batch_and_execute, which relies on
@ -92,68 +80,94 @@ struct CallRegistrationTable {
int method_registration_attempts ABSL_GUARDED_BY(mu) = 0;
};
} // namespace grpc_core
struct grpc_channel {
int is_client;
grpc_compression_options compression_options;
gpr_atm call_size_estimate;
// TODO(vjpai): Once the grpc_channel is allocated via new rather than malloc,
// expand the members of the CallRegistrationTable directly into
// the grpc_channel. For now it is kept separate so that all the
// manual constructing can be done with a single call rather than
// a separate manual construction for each field.
grpc_core::ManualConstructor<grpc_core::CallRegistrationTable>
registration_table;
grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_node;
grpc_core::ManualConstructor<grpc_core::MemoryAllocator> allocator;
grpc_core::ManualConstructor<std::string> target;
class Channel : public RefCounted<Channel>,
public CppImplOf<Channel, grpc_channel> {
public:
static absl::StatusOr<RefCountedPtr<Channel>> Create(
const char* target, ChannelArgs args,
grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport);
static absl::StatusOr<RefCountedPtr<Channel>> CreateWithBuilder(
ChannelStackBuilder* builder);
grpc_channel_stack* channel_stack() const { return channel_stack_.get(); }
grpc_compression_options compression_options() const {
return compression_options_;
}
channelz::ChannelNode* channelz_node() const { return channelz_node_.get(); }
size_t CallSizeEstimate() {
// We round up our current estimate to the NEXT value of kRoundUpSize.
// This ensures:
// 1. a consistent size allocation when our estimate is drifting slowly
// (which is common) - which tends to help most allocators reuse memory
// 2. a small amount of allowed growth over the estimate without hitting
// the arena size doubling case, reducing overall memory usage
static constexpr size_t kRoundUpSize = 256;
return (call_size_estimate_.load(std::memory_order_relaxed) +
2 * kRoundUpSize) &
~(kRoundUpSize - 1);
}
void UpdateCallSizeEstimate(size_t size);
absl::string_view target() const { return target_; }
MemoryAllocator* allocator() { return &allocator_; }
bool is_client() const { return is_client_; }
RegisteredCall* RegisterCall(const char* method, const char* host);
int TestOnlyRegisteredCalls() {
MutexLock lock(&registration_table_.mu);
return registration_table_.map.size();
}
int TestOnlyRegistrationAttempts() {
MutexLock lock(&registration_table_.mu);
return registration_table_.method_registration_attempts;
}
private:
Channel(bool is_client, std::string target, ChannelArgs channel_args,
grpc_compression_options compression_options,
RefCountedPtr<grpc_channel_stack> channel_stack);
const bool is_client_;
const grpc_compression_options compression_options_;
std::atomic<size_t> call_size_estimate_;
CallRegistrationTable registration_table_;
RefCountedPtr<channelz::ChannelNode> channelz_node_;
MemoryAllocator allocator_;
std::string target_;
const RefCountedPtr<grpc_channel_stack> channel_stack_;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack*)((c) + 1))
} // namespace grpc_core
inline grpc_compression_options grpc_channel_compression_options(
const grpc_channel* channel) {
return channel->compression_options;
return grpc_core::Channel::FromC(channel)->compression_options();
}
inline grpc_channel_stack* grpc_channel_get_channel_stack(
grpc_channel* channel) {
return CHANNEL_STACK_FROM_CHANNEL(channel);
return grpc_core::Channel::FromC(channel)->channel_stack();
}
inline grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
grpc_channel* channel) {
return channel->channelz_node.get();
return grpc_core::Channel::FromC(channel)->channelz_node();
}
#ifndef NDEBUG
inline void grpc_channel_internal_ref(grpc_channel* channel,
const char* reason) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CHANNEL(channel), reason);
grpc_core::Channel::FromC(channel)->Ref(DEBUG_LOCATION, reason).release();
}
inline void grpc_channel_internal_unref(grpc_channel* channel,
const char* reason) {
GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CHANNEL(channel), reason);
}
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel, reason)
#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
grpc_channel_internal_unref(channel, reason)
#else
inline void grpc_channel_internal_ref(grpc_channel* channel) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CHANNEL(channel), "unused");
}
inline void grpc_channel_internal_unref(grpc_channel* channel) {
GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CHANNEL(channel), "unused");
grpc_core::Channel::FromC(channel)->Unref(DEBUG_LOCATION, reason);
}
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel)
#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
grpc_channel_internal_unref(channel)
#endif
// Return the channel's compression options.
grpc_compression_options grpc_channel_compression_options(

@ -82,39 +82,24 @@ static bool g_shutting_down ABSL_GUARDED_BY(g_init_mu) = false;
static bool maybe_prepend_client_auth_filter(
grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* args = builder->channel_args();
if (args) {
for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(GRPC_ARG_SECURITY_CONNECTOR, args->args[i].key)) {
builder->PrependFilter(&grpc_core::ClientAuthFilter::kFilter, nullptr);
break;
}
}
if (builder->channel_args().Contains(GRPC_ARG_SECURITY_CONNECTOR)) {
builder->PrependFilter(&grpc_core::ClientAuthFilter::kFilter, nullptr);
}
return true;
}
static bool maybe_prepend_server_auth_filter(
grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* args = builder->channel_args();
if (args) {
for (size_t i = 0; i < args->num_args; i++) {
if (0 == strcmp(GRPC_SERVER_CREDENTIALS_ARG, args->args[i].key)) {
builder->PrependFilter(&grpc_server_auth_filter, nullptr);
break;
}
}
if (builder->channel_args().Contains(GRPC_SERVER_CREDENTIALS_ARG)) {
builder->PrependFilter(&grpc_server_auth_filter, nullptr);
}
return true;
}
static bool maybe_prepend_grpc_server_authz_filter(
grpc_core::ChannelStackBuilder* builder) {
const grpc_channel_args* args = builder->channel_args();
const auto* provider =
grpc_channel_args_find_pointer<grpc_authorization_policy_provider>(
args, GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER);
if (provider != nullptr) {
if (builder->channel_args().GetPointer<grpc_authorization_policy_provider>(
GRPC_ARG_AUTHORIZATION_POLICY_PROVIDER) != nullptr) {
builder->PrependFilter(&grpc_core::GrpcServerAuthzFilter::kFilterVtable,
nullptr);
}

@ -184,16 +184,16 @@ grpc_channel* grpc_lame_client_channel_create(const char* target,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
GRPC_ERROR_INT_GRPC_STATUS, error_code),
GRPC_ERROR_STR_GRPC_MESSAGE, error_message);
grpc_arg error_arg = grpc_core::MakeLameClientErrorArg(&error);
grpc_channel_args* args0 =
grpc_channel_args_copy_and_add(nullptr, &error_arg, 1);
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args0);
grpc_channel_args_destroy(args0);
grpc_channel* channel = grpc_channel_create_internal(
target, args, GRPC_CLIENT_LAME_CHANNEL, nullptr, nullptr);
grpc_channel_args_destroy(args);
GRPC_ERROR_UNREF(error);
return channel;
grpc_core::ChannelArgs args =
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.Set(GRPC_ARG_LAME_FILTER_ERROR,
grpc_core::ChannelArgs::Pointer(
new grpc_error_handle(error),
&grpc_core::kLameFilterErrorArgVtable));
auto channel = grpc_core::Channel::Create(target, std::move(args),
GRPC_CLIENT_LAME_CHANNEL, nullptr);
GPR_ASSERT(channel.ok());
return channel->release()->c_ptr();
}

@ -53,6 +53,7 @@
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/init.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_core {
@ -439,16 +440,16 @@ class ChannelBroadcaster {
// when the actual setup and shutdown broadcast take place.
// Copies over the channels from the locked server.
void FillChannelsLocked(std::vector<grpc_channel*> channels) {
void FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels) {
GPR_DEBUG_ASSERT(channels_.empty());
channels_ = std::move(channels);
}
// Broadcasts a shutdown on each channel.
void BroadcastShutdown(bool send_goaway, grpc_error_handle force_disconnect) {
for (grpc_channel* channel : channels_) {
SendShutdown(channel, send_goaway, GRPC_ERROR_REF(force_disconnect));
GRPC_CHANNEL_INTERNAL_UNREF(channel, "broadcast");
for (const RefCountedPtr<Channel>& channel : channels_) {
SendShutdown(channel->c_ptr(), send_goaway,
GRPC_ERROR_REF(force_disconnect));
}
channels_.clear(); // just for safety against double broadcast
GRPC_ERROR_UNREF(force_disconnect);
@ -486,7 +487,7 @@ class ChannelBroadcaster {
elem->filter->start_transport_op(elem, op);
}
std::vector<grpc_channel*> channels_;
std::vector<RefCountedPtr<Channel>> channels_;
};
} // namespace
@ -512,14 +513,13 @@ const grpc_channel_filter Server::kServerTopFilter = {
namespace {
RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
const grpc_channel_args* args) {
RefCountedPtr<channelz::ServerNode> CreateChannelzNode(ChannelArgs args) {
RefCountedPtr<channelz::ServerNode> channelz_node;
if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
GRPC_ENABLE_CHANNELZ_DEFAULT)) {
size_t channel_tracer_max_memory = grpc_channel_args_find_integer(
args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE,
{GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
size_t channel_tracer_max_memory = std::max(
0, args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
.value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT));
channelz_node =
MakeRefCounted<channelz::ServerNode>(channel_tracer_max_memory);
channelz_node->AddTraceEvent(
@ -531,9 +531,8 @@ RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
} // namespace
Server::Server(const grpc_channel_args* args)
: channel_args_(grpc_channel_args_copy(args)),
channelz_node_(CreateChannelzNode(args)) {}
Server::Server(ChannelArgs args)
: channel_args_(args.ToC()), channelz_node_(CreateChannelzNode(args)) {}
Server::~Server() {
grpc_channel_args_destroy(channel_args_);
@ -601,15 +600,13 @@ grpc_error_handle Server::SetupTransport(
const grpc_channel_args* args,
const RefCountedPtr<channelz::SocketNode>& socket_node) {
// Create channel.
grpc_error_handle error = GRPC_ERROR_NONE;
grpc_channel* channel = grpc_channel_create_internal(
nullptr, args, GRPC_SERVER_CHANNEL, transport, &error);
if (channel == nullptr) {
return error;
absl::StatusOr<RefCountedPtr<Channel>> channel = Channel::Create(
nullptr, ChannelArgs::FromC(args), GRPC_SERVER_CHANNEL, transport);
if (!channel.ok()) {
return absl_status_to_grpc_error(channel.status());
}
ChannelData* chand = static_cast<ChannelData*>(
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
->channel_data);
grpc_channel_stack_element((*channel)->channel_stack(), 0)->channel_data);
// Set up CQs.
size_t cq_idx;
for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) {
@ -626,7 +623,8 @@ grpc_error_handle Server::SetupTransport(
channelz_node_->AddChildSocket(socket_node);
}
// Initialize chand.
chand->InitTransport(Ref(), channel, cq_idx, transport, channelz_socket_uuid);
chand->InitTransport(Ref(), std::move(*channel), cq_idx, transport,
channelz_socket_uuid);
return GRPC_ERROR_NONE;
}
@ -751,12 +749,11 @@ void Server::KillPendingWorkLocked(grpc_error_handle error) {
GRPC_ERROR_UNREF(error);
}
std::vector<grpc_channel*> Server::GetChannelsLocked() const {
std::vector<grpc_channel*> channels;
std::vector<RefCountedPtr<Channel>> Server::GetChannelsLocked() const {
std::vector<RefCountedPtr<Channel>> channels;
channels.reserve(channels_.size());
for (const ChannelData* chand : channels_) {
channels.push_back(chand->channel());
GRPC_CHANNEL_INTERNAL_REF(chand->channel(), "broadcast");
channels.push_back(chand->channel()->Ref());
}
return channels;
}
@ -956,13 +953,8 @@ grpc_call_error Server::RequestRegisteredCall(
class Server::ChannelData::ConnectivityWatcher
: public AsyncConnectivityStateWatcherInterface {
public:
explicit ConnectivityWatcher(ChannelData* chand) : chand_(chand) {
GRPC_CHANNEL_INTERNAL_REF(chand_->channel_, "connectivity");
}
~ConnectivityWatcher() override {
GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel_, "connectivity");
}
explicit ConnectivityWatcher(ChannelData* chand)
: chand_(chand), channel_(chand_->channel_->Ref()) {}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
@ -974,7 +966,8 @@ class Server::ChannelData::ConnectivityWatcher
chand_->Destroy();
}
ChannelData* chand_;
ChannelData* const chand_;
const RefCountedPtr<Channel> channel_;
};
//
@ -999,7 +992,8 @@ Server::ChannelData::~ChannelData() {
}
void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
grpc_channel* channel, size_t cq_idx,
RefCountedPtr<Channel> channel,
size_t cq_idx,
grpc_transport* transport,
intptr_t channelz_socket_uuid) {
server_ = std::move(server);
@ -1119,8 +1113,10 @@ void Server::ChannelData::FinishDestroy(void* arg,
grpc_error_handle /*error*/) {
auto* chand = static_cast<Server::ChannelData*>(arg);
Server* server = chand->server_.get();
GRPC_CHANNEL_INTERNAL_UNREF(chand->channel_, "server");
auto* channel_stack = chand->channel_->channel_stack();
chand->channel_.reset();
server->Unref();
GRPC_CHANNEL_STACK_UNREF(channel_stack, "Server::ChannelData::Destroy");
}
void Server::ChannelData::Destroy() {
@ -1130,6 +1126,9 @@ void Server::ChannelData::Destroy() {
list_position_.reset();
server_->Ref().release();
server_->MaybeFinishShutdown();
// Unreffed by FinishDestroy
GRPC_CHANNEL_STACK_REF(channel_->channel_stack(),
"Server::ChannelData::Destroy");
GRPC_CLOSURE_INIT(&finish_destroy_channel_closure_, FinishDestroy, this,
grpc_schedule_on_exec_ctx);
if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) {
@ -1138,9 +1137,8 @@ void Server::ChannelData::Destroy() {
grpc_transport_op* op =
grpc_make_transport_op(&finish_destroy_channel_closure_);
op->set_accept_stream = true;
grpc_channel_next_op(
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel_), 0),
op);
grpc_channel_next_op(grpc_channel_stack_element(channel_->channel_stack(), 0),
op);
}
grpc_error_handle Server::ChannelData::InitChannelElement(
@ -1436,11 +1434,10 @@ void Server::CallData::StartTransportStreamOpBatch(
grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
const grpc_channel_args* new_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
grpc_core::Server* server = new grpc_core::Server(new_args);
grpc_channel_args_destroy(new_args);
grpc_core::Server* server =
new grpc_core::Server(grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args));
return server->c_ptr();
}

@ -37,6 +37,7 @@
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/transport/transport.h"
@ -96,7 +97,7 @@ class Server : public InternallyRefCounted<Server>,
virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
};
explicit Server(const grpc_channel_args* args);
explicit Server(ChannelArgs args);
~Server() override;
void Orphan() ABSL_LOCKS_EXCLUDED(mu_global_) override;
@ -191,12 +192,13 @@ class Server : public InternallyRefCounted<Server>,
ChannelData() = default;
~ChannelData();
void InitTransport(RefCountedPtr<Server> server, grpc_channel* channel,
size_t cq_idx, grpc_transport* transport,
void InitTransport(RefCountedPtr<Server> server,
RefCountedPtr<Channel> channel, size_t cq_idx,
grpc_transport* transport,
intptr_t channelz_socket_uuid);
RefCountedPtr<Server> server() const { return server_; }
grpc_channel* channel() const { return channel_; }
Channel* channel() const { return channel_.get(); }
size_t cq_idx() const { return cq_idx_; }
ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host,
@ -218,7 +220,7 @@ class Server : public InternallyRefCounted<Server>,
static void FinishDestroy(void* arg, grpc_error_handle error);
RefCountedPtr<Server> server_;
grpc_channel* channel_;
RefCountedPtr<Channel> channel_;
// The index into Server::cqs_ of the CQ used as a starting point for
// where to publish new incoming calls.
size_t cq_idx_;
@ -363,7 +365,7 @@ class Server : public InternallyRefCounted<Server>,
size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
std::vector<grpc_channel*> GetChannelsLocked() const;
std::vector<RefCountedPtr<Channel>> GetChannelsLocked() const;
// Take a shutdown ref for a request (increment by 2) and return if shutdown
// has not been called.
@ -410,7 +412,7 @@ class Server : public InternallyRefCounted<Server>,
return shutdown_refs_.load(std::memory_order_acquire) == 0;
}
grpc_channel_args* const channel_args_;
const grpc_channel_args* const channel_args_;
RefCountedPtr<channelz::ServerNode> channelz_node_;
std::unique_ptr<grpc_server_config_fetcher> config_fetcher_;

@ -26,13 +26,13 @@ namespace grpc {
namespace testing {
int ChannelTestPeer::registered_calls() const {
grpc_core::MutexLock lock(&channel_->c_channel_->registration_table->mu);
return static_cast<int>(channel_->c_channel_->registration_table->map.size());
return grpc_core::Channel::FromC(channel_->c_channel_)
->TestOnlyRegisteredCalls();
}
int ChannelTestPeer::registration_attempts() const {
grpc_core::MutexLock lock(&channel_->c_channel_->registration_table->mu);
return channel_->c_channel_->registration_table->method_registration_attempts;
return grpc_core::Channel::FromC(channel_->c_channel_)
->TestOnlyRegistrationAttempts();
}
} // namespace testing

@ -73,8 +73,12 @@ void RegisterChannelFilter(
auto maybe_add_filter = [include_filter,
filter](grpc_core::ChannelStackBuilder* builder) {
if (include_filter != nullptr) {
const grpc_channel_args* args = builder->channel_args();
if (!include_filter(*args)) return true;
const grpc_channel_args* args = builder->channel_args().ToC();
if (!include_filter(*args)) {
grpc_channel_args_destroy(args);
return true;
}
grpc_channel_args_destroy(args);
}
builder->PrependFilter(filter, nullptr);
return true;

@ -214,7 +214,8 @@ void grpc_run_bad_client_test(
grpc_server_start(a.server);
const grpc_channel_args* channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
transport = grpc_create_chttp2_transport(channel_args, sfd.server, false);
grpc_channel_args_destroy(channel_args);
server_setup_transport(&a, transport);

@ -91,9 +91,10 @@ static void client_setup_transport(grpc_transport* transport) {
grpc_channel_args_copy_and_add(nullptr, &authority_arg, 1);
/* TODO (pjaikumar): use GRPC_CLIENT_CHANNEL instead of
* GRPC_CLIENT_DIRECT_CHANNEL */
g_ctx.client = grpc_channel_create_internal("socketpair-target", args,
GRPC_CLIENT_DIRECT_CHANNEL,
transport, nullptr);
g_ctx.client = (*grpc_core::Channel::Create(
"socketpair-target", grpc_core::ChannelArgs::FromC(args),
GRPC_CLIENT_DIRECT_CHANNEL, transport))
->c_ptr();
grpc_channel_args_destroy(args);
}

@ -130,7 +130,8 @@ static int check_stack(const char* file, int line, const char* transport_name,
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
fake_transport_vtable.name = transport_name;
grpc_transport fake_transport = {&fake_transport_vtable};
grpc_channel_args* channel_args = grpc_channel_args_copy(init_args);
grpc_core::ChannelArgs channel_args =
grpc_core::ChannelArgs::FromC(init_args);
builder.SetTarget("foo.test.google.fr").SetChannelArgs(channel_args);
if (transport_name != nullptr) {
builder.SetTransport(&fake_transport);
@ -165,25 +166,7 @@ static int check_stack(const char* file, int line, const char* transport_name,
// figure out result, log if there's an error
int result = 0;
if (got != expect) {
parts.clear();
for (size_t i = 0; i < channel_args->num_args; i++) {
std::string value;
switch (channel_args->args[i].type) {
case GRPC_ARG_INTEGER: {
value = absl::StrCat(channel_args->args[i].value.integer);
break;
}
case GRPC_ARG_STRING:
value = channel_args->args[i].value.string;
break;
case GRPC_ARG_POINTER: {
value = absl::StrFormat("%p", channel_args->args[i].value.pointer.p);
break;
}
}
parts.push_back(absl::StrCat(channel_args->args[i].key, "=", value));
}
std::string args_str = absl::StrCat("{", absl::StrJoin(parts, ", "), "}");
std::string args_str = channel_args.ToString();
gpr_log(file, line, GPR_LOG_SEVERITY_ERROR,
"**************************************************");
@ -198,10 +181,5 @@ static int check_stack(const char* file, int line, const char* transport_name,
result = 1;
}
{
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(channel_args);
}
return result;
}

@ -74,26 +74,18 @@ typedef struct {
static void client_setup_transport(void* ts, grpc_transport* transport) {
sp_client_setup* cs = static_cast<sp_client_setup*>(ts);
grpc_arg authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
const_cast<char*>("test-authority"));
const grpc_channel_args* args =
grpc_channel_args_copy_and_add(cs->client_args, &authority_arg, 1);
grpc_error_handle error = GRPC_ERROR_NONE;
cs->f->client = grpc_channel_create_internal(
"socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL, transport, &error);
grpc_channel_args_destroy(args);
if (cs->f->client != nullptr) {
auto args = grpc_core::ChannelArgs::FromC(cs->client_args)
.Set(GRPC_ARG_DEFAULT_AUTHORITY, "test-authority");
auto channel = grpc_core::Channel::Create(
"socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
if (channel.ok()) {
cs->f->client = channel->release()->c_ptr();
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
cs->f->client =
grpc_lame_client_channel_create(nullptr, status, "lame channel");
cs->f->client = grpc_lame_client_channel_create(
nullptr, static_cast<grpc_status_code>(channel.status().code()),
"lame channel");
grpc_transport_destroy(transport);
}
}
@ -121,7 +113,8 @@ static void chttp2_init_client_socketpair(
cs.f = f;
client_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(client_args);
.PreconditionChannelArgs(client_args)
.ToC();
transport =
grpc_create_chttp2_transport(client_args, fixture_data->ep.client, true);
grpc_channel_args_destroy(client_args);
@ -140,7 +133,8 @@ static void chttp2_init_server_socketpair(
grpc_server_start(f->server);
server_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(server_args);
.PreconditionChannelArgs(server_args)
.ToC();
transport =
grpc_create_chttp2_transport(server_args, fixture_data->ep.server, false);
grpc_channel_args_destroy(server_args);

@ -70,26 +70,17 @@ typedef struct {
static void client_setup_transport(void* ts, grpc_transport* transport) {
sp_client_setup* cs = static_cast<sp_client_setup*>(ts);
grpc_arg authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
const_cast<char*>("test-authority"));
const grpc_channel_args* args =
grpc_channel_args_copy_and_add(cs->client_args, &authority_arg, 1);
grpc_error_handle error = GRPC_ERROR_NONE;
cs->f->client = grpc_channel_create_internal(
"socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL, transport, &error);
grpc_channel_args_destroy(args);
if (cs->f->client != nullptr) {
auto args = grpc_core::ChannelArgs::FromC(cs->client_args)
.Set(GRPC_ARG_DEFAULT_AUTHORITY, "test-authority");
auto channel = grpc_core::Channel::Create(
"socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
if (channel.ok()) {
cs->f->client = channel->release()->c_ptr();
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
cs->f->client =
grpc_lame_client_channel_create(nullptr, status, "lame channel");
cs->f->client = grpc_lame_client_channel_create(
nullptr, static_cast<grpc_status_code>(channel.status().code()),
"lame channel");
grpc_transport_destroy(transport);
}
}
@ -115,7 +106,8 @@ static void chttp2_init_client_socketpair(
sp_client_setup cs;
client_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(client_args);
.PreconditionChannelArgs(client_args)
.ToC();
cs.client_args = client_args;
cs.f = f;
transport =
@ -136,7 +128,8 @@ static void chttp2_init_server_socketpair(
grpc_server_start(f->server);
server_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(server_args);
.PreconditionChannelArgs(server_args)
.ToC();
transport =
grpc_create_chttp2_transport(server_args, fixture_data->ep.server, false);
grpc_channel_args_destroy(server_args);

@ -70,26 +70,17 @@ typedef struct {
static void client_setup_transport(void* ts, grpc_transport* transport) {
sp_client_setup* cs = static_cast<sp_client_setup*>(ts);
grpc_arg authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
const_cast<char*>("test-authority"));
const grpc_channel_args* args =
grpc_channel_args_copy_and_add(cs->client_args, &authority_arg, 1);
grpc_error_handle error = GRPC_ERROR_NONE;
cs->f->client = grpc_channel_create_internal(
"socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL, transport, &error);
grpc_channel_args_destroy(args);
if (cs->f->client != nullptr) {
auto args = grpc_core::ChannelArgs::FromC(cs->client_args)
.Set(GRPC_ARG_DEFAULT_AUTHORITY, "test-authority");
auto channel = grpc_core::Channel::Create(
"socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
if (channel.ok()) {
cs->f->client = channel->release()->c_ptr();
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
cs->f->client =
grpc_lame_client_channel_create(nullptr, status, "lame channel");
cs->f->client = grpc_lame_client_channel_create(
nullptr, static_cast<grpc_status_code>(channel.status().code()),
"lame channel");
grpc_transport_destroy(transport);
}
}
@ -126,7 +117,8 @@ static void chttp2_init_client_socketpair(
sp_client_setup cs;
client_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(client_args);
.PreconditionChannelArgs(client_args)
.ToC();
cs.client_args = client_args;
cs.f = f;
transport =
@ -147,7 +139,8 @@ static void chttp2_init_server_socketpair(
grpc_server_start(f->server);
server_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(server_args);
.PreconditionChannelArgs(server_args)
.ToC();
transport =
grpc_create_chttp2_transport(server_args, fixture_data->ep.server, false);
grpc_channel_args_destroy(server_args);

@ -537,7 +537,8 @@ static void on_read_request_done_locked(void* arg, grpc_error_handle error) {
grpc_schedule_on_exec_ctx);
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(&conn->on_server_connect_done, &conn->server_endpoint,
conn->pollset_set, args, &(*addresses_or)[0],
deadline);
@ -613,7 +614,8 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
// Create TCP server.
proxy->channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
.PreconditionChannelArgs(args)
.ToC();
grpc_error_handle error =
grpc_tcp_server_create(nullptr, proxy->channel_args, &proxy->server);
GPR_ASSERT(error == GRPC_ERROR_NONE);

@ -52,28 +52,25 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_transport* transport =
grpc_create_chttp2_transport(args, mock_endpoint, true);
grpc_channel_args_destroy(args);
grpc_resource_quota_unref(resource_quota);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_arg authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
const_cast<char*>("test-authority"));
args = grpc_channel_args_copy_and_add(nullptr, &authority_arg, 1);
const grpc_channel_args* channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
grpc_channel* channel = grpc_channel_create_internal(
"test-target", channel_args, GRPC_CLIENT_DIRECT_CHANNEL, transport,
nullptr);
grpc_channel_args_destroy(args);
grpc_channel_args_destroy(channel_args);
auto channel_args =
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, "test-authority");
auto channel = grpc_core::Channel::Create(
"test-target", channel_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_slice host = grpc_slice_from_static_string("localhost");
grpc_call* call = grpc_channel_create_call(
channel, nullptr, 0, cq, grpc_slice_from_static_string("/foo"), &host,
gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
grpc_call* call =
grpc_channel_create_call(channel->get()->c_ptr(), nullptr, 0, cq,
grpc_slice_from_static_string("/foo"), &host,
gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array_init(&initial_metadata_recv);
@ -157,7 +154,6 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_slice_unref(details);
grpc_channel_destroy(channel);
if (response_payload_recv != nullptr) {
grpc_byte_buffer_destroy(response_payload_recv);
}

@ -56,7 +56,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
const grpc_channel_args* channel_args =
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_transport* transport =
grpc_create_chttp2_transport(channel_args, mock_endpoint, false);
grpc_resource_quota_unref(resource_quota);

@ -39,11 +39,13 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
client_args =
const_cast<grpc_channel_args*>(grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(client_args));
.PreconditionChannelArgs(client_args)
.ToC());
server_args =
const_cast<grpc_channel_args*>(grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(server_args));
.PreconditionChannelArgs(server_args)
.ToC());
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);

@ -308,8 +308,9 @@ grpc_channel_filter FailSendOpsFilter::kFilterVtable = {
bool MaybeAddFilter(grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, true)) {
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.

@ -353,9 +353,9 @@ void retry_recv_message_replay(grpc_end2end_test_config config) {
GRPC_CLIENT_SUBCHANNEL, 0,
[](grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
true)) {
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.

@ -340,8 +340,9 @@ grpc_channel_filter InjectStatusFilter::kFilterVtable = {
bool AddFilter(grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, true)) {
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.

@ -368,9 +368,9 @@ void retry_send_op_fails(grpc_end2end_test_config config) {
GRPC_CLIENT_SUBCHANNEL, 0,
[](grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
true)) {
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.

@ -362,9 +362,9 @@ void retry_transparent_goaway(grpc_end2end_test_config config) {
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
[](grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
true)) {
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.

@ -361,9 +361,9 @@ void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config) {
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
[](grpc_core::ChannelStackBuilder* builder) {
// Skip on proxy (which explicitly disables retries).
const grpc_channel_args* args = builder->channel_args();
if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
true)) {
if (!builder->channel_args()
.GetBool(GRPC_ARG_ENABLE_RETRIES)
.value_or(true)) {
return true;
}
// Install filter.

@ -103,9 +103,10 @@ static void must_fail(void* arg, grpc_error_handle error) {
/* connect to it */
GPR_ASSERT(getsockname(svr_fd, (struct sockaddr*)addr, (socklen_t*)&resolved_addr.len) == 0);
GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
const grpc_channel_args* args =
grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
nullptr);
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(&done, &g_connecting, nullptr, args, &resolved_addr,
grpc_core::Timestamp::InfFuture());
grpc_channel_args_destroy(args);
@ -160,9 +161,10 @@ static void must_fail(void* arg, grpc_error_handle error) {
/* connect to a broken address */
GRPC_CLOSURE_INIT(&done, must_fail, nullptr, grpc_schedule_on_exec_ctx);
const grpc_channel_args* args =
grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
nullptr);
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(&done, &g_connecting, nullptr, args, &resolved_addr,
grpc_core::Timestamp::InfFuture());
grpc_channel_args_destroy(args);

@ -124,9 +124,10 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
/* connect to it */
XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0);
init_event_closure(&done, &connected_promise);
const grpc_channel_args *args =
grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
nullptr);
const grpc_channel_args *args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(&done, &ep_, nullptr, args, &resolved_addr,
grpc_core::Timestamp::InfFuture());
grpc_channel_args_destroy(args);

@ -110,7 +110,8 @@ void test_succeeds(void) {
GRPC_CLOSURE_INIT(&done, must_succeed, nullptr, grpc_schedule_on_exec_ctx);
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(&done, &g_connecting, g_pollset_set, args,
&resolved_addr, grpc_core::Timestamp::InfFuture());
grpc_channel_args_destroy(args);

@ -167,7 +167,8 @@ static void test_no_op(void) {
grpc_tcp_server* s;
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, args, &s));
grpc_channel_args_destroy(args);
grpc_tcp_server_unref(s);
@ -178,7 +179,8 @@ static void test_no_op_with_start(void) {
grpc_tcp_server* s;
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, args, &s));
grpc_channel_args_destroy(args);
LOG_TEST("test_no_op_with_start");
@ -195,7 +197,8 @@ static void test_no_op_with_port(void) {
grpc_tcp_server* s;
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, args, &s));
grpc_channel_args_destroy(args);
LOG_TEST("test_no_op_with_port");
@ -219,7 +222,8 @@ static void test_no_op_with_port_and_start(void) {
grpc_tcp_server* s;
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(nullptr, args, &s));
grpc_channel_args_destroy(args);
LOG_TEST("test_no_op_with_port_and_start");
@ -320,7 +324,8 @@ static void test_connect(size_t num_connects,
const grpc_channel_args* new_channel_args =
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(channel_args);
.PreconditionChannelArgs(channel_args)
.ToC();
GPR_ASSERT(GRPC_ERROR_NONE ==
grpc_tcp_server_create(nullptr, new_channel_args, &s));
grpc_channel_args_destroy(new_channel_args);

@ -138,7 +138,8 @@ void bad_server_thread(void* vargs) {
grpc_tcp_server* s;
const grpc_channel_args* channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_error_handle error = grpc_tcp_server_create(nullptr, channel_args, &s);
grpc_channel_args_destroy(channel_args);
GPR_ASSERT(error == GRPC_ERROR_NONE);

@ -37,7 +37,7 @@ void test_unknown_scheme_target(void) {
grpc_channel_stack_element(grpc_channel_get_channel_stack(chan), 0);
GPR_ASSERT(0 == strcmp(elem->filter->name, "lame-client"));
grpc_core::ExecCtx exec_ctx;
GRPC_CHANNEL_INTERNAL_UNREF(chan, "test");
grpc_core::Channel::FromC(chan)->Unref();
creds->Unref();
}
@ -51,7 +51,7 @@ void test_security_connector_already_in_arg(void) {
grpc_channel_stack_element(grpc_channel_get_channel_stack(chan), 0);
GPR_ASSERT(0 == strcmp(elem->filter->name, "lame-client"));
grpc_core::ExecCtx exec_ctx;
GRPC_CHANNEL_INTERNAL_UNREF(chan, "test");
grpc_core::Channel::FromC(chan)->Unref();
}
void test_null_creds(void) {
@ -60,7 +60,7 @@ void test_null_creds(void) {
grpc_channel_stack_element(grpc_channel_get_channel_stack(chan), 0);
GPR_ASSERT(0 == strcmp(elem->filter->name, "lame-client"));
grpc_core::ExecCtx exec_ctx;
GRPC_CHANNEL_INTERNAL_UNREF(chan, "test");
grpc_core::Channel::FromC(chan)->Unref();
}
int main(int argc, char** argv) {

@ -55,13 +55,14 @@ DEFINE_PROTO_FUZZER(const binder_transport_fuzzer::Input& input) {
const_cast<char*>("test-authority"));
grpc_channel_args* args =
grpc_channel_args_copy_and_add(nullptr, &authority_arg, 1);
const grpc_channel_args* channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
grpc_channel* channel = grpc_channel_create_internal(
"test-target", channel_args, GRPC_CLIENT_DIRECT_CHANNEL,
client_transport, nullptr);
grpc_channel_args_destroy(channel_args);
auto channel_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
auto channel =
grpc_core::Channel::Create("test-target", channel_args,
GRPC_CLIENT_DIRECT_CHANNEL, client_transport)
->release()
->c_ptr();
grpc_channel_args_destroy(args);
grpc_slice host = grpc_slice_from_static_string("localhost");
grpc_call* call = grpc_channel_create_call(

@ -53,7 +53,8 @@ DEFINE_PROTO_FUZZER(const binder_transport_fuzzer::Input& input) {
const grpc_channel_args* channel_args =
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
(void)grpc_core::Server::FromC(server)->SetupTransport(
server_transport, nullptr, channel_args, nullptr);
grpc_channel_args_destroy(channel_args);

@ -112,14 +112,12 @@ grpc_channel* grpc_binder_channel_create_for_testing(
grpc_server* server, const grpc_channel_args* args, void* /*reserved*/) {
grpc_core::ExecCtx exec_ctx;
grpc_arg default_authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
const_cast<char*>("test.authority"));
args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args);
grpc_channel_args* client_args =
grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
.PreconditionChannelArgs(args)
.ToC();
auto client_args = grpc_core::ChannelArgs::FromC(args).Set(
GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
grpc_transport *client_transport, *server_transport;
std::tie(client_transport, server_transport) =
@ -127,11 +125,9 @@ grpc_channel* grpc_binder_channel_create_for_testing(
grpc_error_handle error = grpc_core::Server::FromC(server)->SetupTransport(
server_transport, nullptr, args, nullptr);
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_channel* channel = grpc_channel_create_internal(
"binder", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport,
&error);
GPR_ASSERT(error == GRPC_ERROR_NONE);
auto channel = grpc_core::Channel::Create(
"binder", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
GPR_ASSERT(channel.ok());
grpc_channel_args_destroy(args);
grpc_channel_args_destroy(client_args);
return channel;
return channel->release()->c_ptr();
}

@ -74,7 +74,8 @@ TEST_F(ContextListTest, ExecuteFlushesList) {
grpc_endpoint* mock_endpoint = grpc_mock_endpoint_create(discard_write);
const grpc_channel_args* args = CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_transport* t = grpc_create_chttp2_transport(args, mock_endpoint, true);
grpc_channel_args_destroy(args);
std::vector<grpc_chttp2_stream*> s;
@ -129,7 +130,8 @@ TEST_F(ContextListTest, NonEmptyListEmptyTimestamp) {
grpc_endpoint* mock_endpoint = grpc_mock_endpoint_create(discard_write);
const grpc_channel_args* args = CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_transport* t = grpc_create_chttp2_transport(args, mock_endpoint, true);
grpc_channel_args_destroy(args);
std::vector<grpc_chttp2_stream*> s;

@ -125,7 +125,8 @@ class Client {
EventState state;
const grpc_channel_args* args = CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_tcp_client_connect(state.closure(), &endpoint_, pollset_set, args,
addresses_or->data(),
ExecCtx::Get()->Now() + Duration::Seconds(1));

@ -68,7 +68,8 @@ void test_tcp_server_start(test_tcp_server* server, int port) {
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
.PreconditionChannelArgs(nullptr)
.ToC();
grpc_error_handle error = grpc_tcp_server_create(&server->shutdown_complete,
args, &server->tcp_server);
grpc_channel_args_destroy(args);

@ -81,7 +81,7 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertion) {
// Create a phony ChannelStackBuilder object
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL);
builder.SetChannelArgs(args);
builder.SetChannelArgs(ChannelArgs::FromC(args));
grpc_channel_args_destroy(args);
grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
@ -119,7 +119,7 @@ TEST(XdsChannelStackModifierTest, XdsHttpFiltersInsertionAfterCensus) {
// Create a phony ChannelStackBuilder object
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
ChannelStackBuilderImpl builder("test", GRPC_SERVER_CHANNEL);
builder.SetChannelArgs(args);
builder.SetChannelArgs(ChannelArgs::FromC(args));
grpc_channel_args_destroy(args);
grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));

@ -722,20 +722,19 @@ class IsolatedCallFixture : public TrackCounters {
// the grpc_shutdown() run by grpc_channel_destroy(). So we need to
// call grpc_init() manually here to balance things out.
grpc_init();
const grpc_channel_args* args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
grpc_core::ChannelStackBuilderImpl builder("phony", GRPC_CLIENT_CHANNEL);
builder.SetTarget("phony_target");
builder.SetChannelArgs(args);
builder.AppendFilter(&isolated_call_filter::isolated_call_filter, nullptr);
{
grpc_core::ExecCtx exec_ctx;
channel_ = grpc_channel_create_with_builder(&builder, GRPC_CLIENT_CHANNEL,
nullptr);
channel_ =
grpc_core::Channel::CreateWithBuilder(&builder)->release()->c_ptr();
}
cq_ = grpc_completion_queue_create_for_next(nullptr);
grpc_channel_args_destroy(args);
}
void Finish(benchmark::State& state) override {

@ -138,7 +138,8 @@ class Fixture {
ep_ = new PhonyEndpoint;
const grpc_channel_args* final_args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(&c_args);
.PreconditionChannelArgs(&c_args)
.ToC();
t_ = grpc_create_chttp2_transport(final_args, ep_, client);
grpc_channel_args_destroy(final_args);
grpc_chttp2_transport_start_reading(t_, nullptr, nullptr, nullptr);

@ -201,9 +201,12 @@ class EndpointPairFixture : public BaseFixture {
client_transport_ =
grpc_create_chttp2_transport(&c_args, endpoints.client, true);
GPR_ASSERT(client_transport_);
grpc_channel* channel = grpc_channel_create_internal(
"target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_,
nullptr);
grpc_channel* channel =
grpc_core::Channel::Create(
"target", grpc_core::ChannelArgs::FromC(&c_args),
GRPC_CLIENT_DIRECT_CHANNEL, client_transport_)
->release()
->c_ptr();
grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr,
nullptr);

@ -96,8 +96,12 @@ class EndpointPairFixture {
grpc_transport* transport =
grpc_create_chttp2_transport(&c_args, endpoints.client, true);
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create_internal(
"target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport, nullptr);
grpc_channel* channel =
grpc_core::Channel::Create("target",
grpc_core::ChannelArgs::FromC(&c_args),
GRPC_CLIENT_DIRECT_CHANNEL, transport)
->release()
->c_ptr();
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
channel_ = grpc::CreateChannelInternal(

Loading…
Cancel
Save