Merge Master

pull/37138/head
tanvi-jagtap 8 months ago
commit 61a6b37dbe
  1. 21
      CMakeLists.txt
  2. 3
      bazel/experiments.bzl
  3. 9
      build_autogenerated.yaml
  4. 1
      gRPC-C++.podspec
  5. 1
      gRPC-Core.podspec
  6. 5
      include/grpc/support/metrics.h
  7. 1
      src/core/BUILD
  8. 91
      src/core/client_channel/client_channel.cc
  9. 66
      src/core/client_channel/subchannel.cc
  10. 14
      src/core/client_channel/subchannel.h
  11. 16
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  12. 16
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  13. 3
      src/core/ext/transport/chttp2/transport/frame_data.cc
  14. 3
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  15. 17
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
  16. 2
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
  17. 51
      src/core/lib/experiments/experiments.cc
  18. 27
      src/core/lib/experiments/experiments.h
  19. 24
      src/core/lib/experiments/experiments.yaml
  20. 6
      src/core/lib/experiments/rollouts.yaml
  21. 33
      src/core/lib/gprpp/work_serializer.cc
  22. 2
      src/core/lib/iomgr/ev_epoll1_linux.cc
  23. 97
      src/core/lib/promise/party.cc
  24. 2
      src/core/lib/promise/party.h
  25. 29
      src/core/lib/resource_quota/memory_quota.cc
  26. 5
      src/core/lib/resource_quota/memory_quota.h
  27. 10
      src/core/lib/slice/slice_refcount.h
  28. 102
      src/core/resolver/xds/xds_dependency_manager.cc
  29. 13
      src/core/util/useful.h
  30. 3
      test/core/end2end/tests/http2_stats.cc
  31. 2
      test/core/transport/chttp2/ping_configuration_test.cc
  32. 5
      test/core/util/useful_test.cc
  33. 6
      test/cpp/interop/pre_stop_hook_server_test.cc
  34. 12
      test/cpp/microbenchmarks/BUILD
  35. 41
      test/cpp/microbenchmarks/bm_callback_streaming_ping_pong.cc
  36. 25
      tools/internal_ci/linux/grpc_performance_profile_daily.cfg
  37. 25
      tools/internal_ci/linux/grpc_performance_profile_daily.sh
  38. 25
      tools/internal_ci/linux/grpc_performance_profile_master.cfg
  39. 25
      tools/internal_ci/linux/grpc_performance_profile_master.sh
  40. 29
      tools/internal_ci/linux/grpc_performance_profile_summary_in_docker.sh
  41. 22
      tools/run_tests/sanity/banned_functions.py
  42. 1
      tools/run_tests/sanity/check_bad_dependencies.sh

21
CMakeLists.txt generated

@ -1803,6 +1803,7 @@ target_link_libraries(gpr
absl::log_globals
absl::log
absl::memory
absl::bits
absl::random_random
absl::status
absl::cord
@ -4076,8 +4077,8 @@ target_include_directories(benchmark_helpers
target_link_libraries(benchmark_helpers
${_gRPC_ALLTARGETS_LIBRARIES}
${_gRPC_BENCHMARK_LIBRARIES}
grpc++_unsecure
grpc_test_util_unsecure
grpc++
grpc_test_util
grpc++_test_config
)
@ -8679,6 +8680,7 @@ target_link_libraries(bitset_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::check
absl::bits
)
@ -30421,6 +30423,7 @@ target_link_libraries(table_test
gtest
absl::check
absl::type_traits
absl::bits
absl::utility
)
@ -32952,6 +32955,7 @@ target_link_libraries(unique_type_name_test
gtest
absl::flat_hash_map
absl::check
absl::bits
absl::str_format
)
@ -33072,6 +33076,7 @@ target_link_libraries(useful_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::check
absl::bits
)
@ -37966,7 +37971,7 @@ generate_pkgconfig(
"gpr"
"gRPC platform support library"
"${gRPC_CORE_VERSION}"
"absl_any_invocable absl_base absl_check absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_log absl_log_globals absl_log_severity absl_memory absl_optional absl_random_random absl_status absl_str_format absl_strings absl_synchronization absl_time absl_variant"
"absl_any_invocable absl_base absl_bits absl_check absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_log absl_log_globals absl_log_severity absl_memory absl_optional absl_random_random absl_status absl_str_format absl_strings absl_synchronization absl_time absl_variant"
""
"-lgpr"
""
@ -37977,7 +37982,7 @@ generate_pkgconfig(
"gRPC"
"high performance general RPC framework"
"${gRPC_CORE_VERSION}"
"absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr"
"absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_bits absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr"
"libcares openssl re2 zlib"
"-lgrpc"
"-laddress_sorting -lupb_textformat_lib -lupb_json_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib -lutf8_range_lib"
@ -37988,7 +37993,7 @@ generate_pkgconfig(
"gRPC unsecure"
"high performance general RPC framework without SSL"
"${gRPC_CORE_VERSION}"
"absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr"
"absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_bits absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr"
"libcares zlib"
"-lgrpc_unsecure"
"-laddress_sorting -lupb_message_lib -lupb_mem_lib -lupb_base_lib -lutf8_range_lib"
@ -37999,7 +38004,7 @@ generate_pkgconfig(
"gRPC++"
"C++ wrapper for gRPC"
"${gRPC_CPP_VERSION}"
"absl_absl_check absl_absl_log absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc"
"absl_absl_check absl_absl_log absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_bits absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc"
"libcares openssl re2 zlib"
"-lgrpc++"
"-laddress_sorting -lupb_textformat_lib -lupb_json_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib -lutf8_range_lib"
@ -38010,7 +38015,7 @@ generate_pkgconfig(
"gRPC++ unsecure"
"C++ wrapper for gRPC without SSL"
"${gRPC_CPP_VERSION}"
"absl_absl_check absl_absl_log absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc_unsecure"
"absl_absl_check absl_absl_log absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_bits absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc_unsecure"
"libcares zlib"
"-lgrpc++_unsecure"
"-laddress_sorting -lupb_message_lib -lupb_mem_lib -lupb_base_lib -lutf8_range_lib"
@ -38021,7 +38026,7 @@ generate_pkgconfig(
"gRPC++ OpenTelemetry Plugin"
"OpenTelemetry Plugin for gRPC C++"
"${gRPC_CPP_VERSION}"
"absl_absl_check absl_absl_log absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc grpc++ opentelemetry_api"
"absl_absl_check absl_absl_log absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_bits absl_check absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_log absl_log_globals absl_log_severity absl_memory absl_no_destructor absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc grpc++ opentelemetry_api"
"libcares openssl re2 zlib"
"-lgrpcpp_otel_plugin"
"-laddress_sorting -lupb_textformat_lib -lupb_json_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib -lutf8_range_lib"

@ -25,9 +25,6 @@ EXPERIMENT_ENABLES = {
"event_engine_dns": "event_engine_dns",
"event_engine_listener": "event_engine_listener",
"free_large_allocator": "free_large_allocator",
"http2_stats_fix": "http2_stats_fix",
"keepalive_fix": "keepalive_fix",
"keepalive_server_fix": "keepalive_server_fix",
"max_pings_wo_data_throttle": "max_pings_wo_data_throttle",
"monitoring_experiment": "monitoring_experiment",
"multiping": "multiping",

@ -133,6 +133,7 @@ libs:
- absl/log:globals
- absl/log:log
- absl/memory:memory
- absl/numeric:bits
- absl/random:random
- absl/status:status
- absl/strings:cord
@ -3625,8 +3626,8 @@ libs:
- test/cpp/microbenchmarks/helpers.cc
deps:
- benchmark
- grpc++_unsecure
- grpc_test_util_unsecure
- grpc++
- grpc_test_util
- grpc++_test_config
defaults: benchmark
- name: grpc++
@ -6449,6 +6450,7 @@ targets:
deps:
- gtest
- absl/log:check
- absl/numeric:bits
uses_polling: false
- name: buffer_list_test
gtest: true
@ -19736,6 +19738,7 @@ targets:
- gtest
- absl/log:check
- absl/meta:type_traits
- absl/numeric:bits
- absl/utility:utility
uses_polling: false
- name: tcp_client_posix_test
@ -20936,6 +20939,7 @@ targets:
- gtest
- absl/container:flat_hash_map
- absl/log:check
- absl/numeric:bits
- absl/strings:str_format
uses_polling: false
- name: unknown_frame_bad_client_test
@ -20973,6 +20977,7 @@ targets:
deps:
- gtest
- absl/log:check
- absl/numeric:bits
uses_polling: false
- name: uuid_v4_test
gtest: true

1
gRPC-C++.podspec generated

@ -254,6 +254,7 @@ Pod::Spec.new do |s|
ss.dependency 'abseil/log/log', abseil_version
ss.dependency 'abseil/memory/memory', abseil_version
ss.dependency 'abseil/meta/type_traits', abseil_version
ss.dependency 'abseil/numeric/bits', abseil_version
ss.dependency 'abseil/random/bit_gen_ref', abseil_version
ss.dependency 'abseil/random/distributions', abseil_version
ss.dependency 'abseil/random/random', abseil_version

1
gRPC-Core.podspec generated

@ -221,6 +221,7 @@ Pod::Spec.new do |s|
ss.dependency 'abseil/log/log', abseil_version
ss.dependency 'abseil/memory/memory', abseil_version
ss.dependency 'abseil/meta/type_traits', abseil_version
ss.dependency 'abseil/numeric/bits', abseil_version
ss.dependency 'abseil/random/bit_gen_ref', abseil_version
ss.dependency 'abseil/random/distributions', abseil_version
ss.dependency 'abseil/random/random', abseil_version

@ -38,10 +38,11 @@ class StatsPluginChannelScope {
absl::string_view target() const { return target_; }
/// Returns the default authority for the channel.
absl::string_view default_authority() const { return default_authority_; }
/// Returns channel arguments.
/// Returns channel arguments. THIS METHOD IS EXPERIMENTAL.
// TODO(roth, ctiller, yashkt): Find a better representation for
// channel args before de-experimentalizing this API.
const grpc_event_engine::experimental::EndpointConfig& args() const {
const grpc_event_engine::experimental::EndpointConfig& experimental_args()
const {
return args_;
}

@ -265,6 +265,7 @@ grpc_cc_library(
hdrs = ["util/useful.h"],
external_deps = [
"absl/log:check",
"absl/numeric:bits",
"absl/strings",
"absl/types:variant",
],

@ -688,10 +688,93 @@ grpc_connectivity_state ClientChannel::CheckConnectivityState(
return state;
}
void ClientChannel::WatchConnectivityState(grpc_connectivity_state, Timestamp,
grpc_completion_queue*, void*) {
// TODO(ctiller): implement
Crash("not implemented");
namespace {
// A fire-and-forget object to handle external connectivity state watches.
class ExternalStateWatcher : public RefCounted<ExternalStateWatcher> {
public:
ExternalStateWatcher(WeakRefCountedPtr<ClientChannel> channel,
grpc_completion_queue* cq, void* tag,
grpc_connectivity_state last_observed_state,
Timestamp deadline)
: channel_(std::move(channel)), cq_(cq), tag_(tag) {
MutexLock lock(&mu_);
// Start watch. This inherits the ref from creation.
auto watcher =
MakeOrphanable<Watcher>(RefCountedPtr<ExternalStateWatcher>(this));
watcher_ = watcher.get();
channel_->AddConnectivityWatcher(last_observed_state, std::move(watcher));
// Start timer. This takes a second ref.
const Duration timeout = deadline - Timestamp::Now();
timer_handle_ =
channel_->event_engine()->RunAfter(timeout, [self = Ref()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->MaybeStartCompletion(absl::DeadlineExceededError(
"Timed out waiting for connection state change"));
// ExternalStateWatcher deletion might require an active ExecCtx.
self.reset();
});
}
private:
class Watcher : public AsyncConnectivityStateWatcherInterface {
public:
explicit Watcher(RefCountedPtr<ExternalStateWatcher> external_state_watcher)
: external_state_watcher_(std::move(external_state_watcher)) {}
void OnConnectivityStateChange(grpc_connectivity_state /*new_state*/,
const absl::Status& /*status*/) override {
external_state_watcher_->MaybeStartCompletion(absl::OkStatus());
}
private:
RefCountedPtr<ExternalStateWatcher> external_state_watcher_;
};
// This is called both when the watch reports a new connectivity state
// and when the timer fires. It will trigger a CQ notification only
// on the first call. Subsequent calls will be ignored, because
// events can come in asynchronously.
void MaybeStartCompletion(absl::Status status) {
MutexLock lock(&mu_);
if (watcher_ == nullptr) return; // Ignore subsequent notifications.
// Cancel watch.
channel_->RemoveConnectivityWatcher(watcher_);
watcher_ = nullptr;
// Cancel timer.
channel_->event_engine()->Cancel(timer_handle_);
// Send CQ completion.
Ref().release(); // Released in FinishedCompletion().
grpc_cq_end_op(cq_, tag_, status, FinishedCompletion, this,
&completion_storage_);
}
// Called when the completion is returned to the CQ.
static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
auto* self = static_cast<ExternalStateWatcher*>(arg);
self->Unref();
}
WeakRefCountedPtr<ClientChannel> channel_;
Mutex mu_;
grpc_completion_queue* cq_ ABSL_GUARDED_BY(&mu_);
void* tag_ ABSL_GUARDED_BY(&mu_);
grpc_cq_completion completion_storage_ ABSL_GUARDED_BY(&mu_);
Watcher* watcher_ ABSL_GUARDED_BY(&mu_) = nullptr;
grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_
ABSL_GUARDED_BY(&mu_);
};
} // namespace
void ClientChannel::WatchConnectivityState(grpc_connectivity_state state,
Timestamp deadline,
grpc_completion_queue* cq,
void* tag) {
new ExternalStateWatcher(WeakRefAsSubclass<ClientChannel>(), cq, tag, state,
deadline);
}
void ClientChannel::AddConnectivityWatcher(

@ -97,14 +97,11 @@ using ::grpc_event_engine::experimental::EventEngine;
// ConnectedSubchannel
//
ConnectedSubchannel::ConnectedSubchannel(
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
ConnectedSubchannel::ConnectedSubchannel(const ChannelArgs& args)
: RefCounted<ConnectedSubchannel>(
GRPC_TRACE_FLAG_ENABLED(subchannel_refcount) ? "ConnectedSubchannel"
: nullptr),
args_(args),
channelz_subchannel_(std::move(channelz_subchannel)) {}
args_(args) {}
//
// LegacyConnectedSubchannel
@ -114,14 +111,19 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
public:
LegacyConnectedSubchannel(
RefCountedPtr<grpc_channel_stack> channel_stack, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
RefCountedPtr<channelz::SubchannelNode> channelz_node)
: ConnectedSubchannel(args),
channelz_node_(std::move(channelz_node)),
channel_stack_(std::move(channel_stack)) {}
~LegacyConnectedSubchannel() override {
channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel");
}
channelz::SubchannelNode* channelz_node() const {
return channelz_node_.get();
}
void StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
@ -162,6 +164,7 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
}
private:
RefCountedPtr<channelz::SubchannelNode> channelz_node_;
RefCountedPtr<grpc_channel_stack> channel_stack_;
};
@ -191,9 +194,8 @@ class NewConnectedSubchannel : public ConnectedSubchannel {
NewConnectedSubchannel(
RefCountedPtr<UnstartedCallDestination> call_destination,
RefCountedPtr<TransportCallDestination> transport,
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
const ChannelArgs& args)
: ConnectedSubchannel(args),
call_destination_(std::move(call_destination)),
transport_(std::move(transport)) {}
@ -240,7 +242,8 @@ RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
}
SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
: connected_subchannel_(std::move(args.connected_subchannel)),
: connected_subchannel_(args.connected_subchannel
.TakeAsSubclass<LegacyConnectedSubchannel>()),
deadline_(args.deadline) {
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
const grpc_call_element_args call_args = {
@ -259,7 +262,7 @@ SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
return;
}
grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
auto* channelz_node = connected_subchannel_->channelz_subchannel();
auto* channelz_node = connected_subchannel_->channelz_node();
if (channelz_node != nullptr) {
channelz_node->RecordCallStarted();
}
@ -327,13 +330,9 @@ void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) {
void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
grpc_transport_stream_op_batch* batch) {
// only intercept payloads with recv trailing.
if (!batch->recv_trailing_metadata) {
return;
}
if (!batch->recv_trailing_metadata) return;
// only add interceptor is channelz is enabled.
if (connected_subchannel_->channelz_subchannel() == nullptr) {
return;
}
if (connected_subchannel_->channelz_node() == nullptr) return;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
this, grpc_schedule_on_exec_ctx);
// save some state needed for the interception callback.
@ -366,13 +365,13 @@ void SubchannelCall::RecvTrailingMetadataReady(void* arg,
CHECK_NE(call->recv_trailing_metadata_, nullptr);
grpc_status_code status = GRPC_STATUS_OK;
GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, error);
channelz::SubchannelNode* channelz_subchannel =
call->connected_subchannel_->channelz_subchannel();
CHECK_NE(channelz_subchannel, nullptr);
channelz::SubchannelNode* channelz_node =
call->connected_subchannel_->channelz_node();
CHECK_NE(channelz_node, nullptr);
if (status == GRPC_STATUS_OK) {
channelz_subchannel->RecordCallSucceeded();
channelz_node->RecordCallSucceeded();
} else {
channelz_subchannel->RecordCallFailed();
channelz_node->RecordCallFailed();
}
Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error);
}
@ -860,6 +859,24 @@ bool Subchannel::PublishTransportLocked() {
->client_transport());
InterceptionChainBuilder builder(
connecting_result_.channel_args.SetObject(transport.get()));
if (channelz_node_ != nullptr) {
// TODO(ctiller): If/when we have a good way to access the subchannel
// from a filter (maybe GetContext<Subchannel>?), consider replacing
// these two hooks with a filter so that we can avoid storing two
// separate refs to the channelz node in each connection.
builder.AddOnClientInitialMetadata(
[channelz_node = channelz_node_](ClientMetadata&) {
channelz_node->RecordCallStarted();
});
builder.AddOnServerTrailingMetadata(
[channelz_node = channelz_node_](ServerMetadata& metadata) {
if (IsStatusOk(metadata)) {
channelz_node->RecordCallSucceeded();
} else {
channelz_node->RecordCallFailed();
}
});
}
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
GRPC_CLIENT_SUBCHANNEL, builder);
auto transport_destination =
@ -874,8 +891,7 @@ bool Subchannel::PublishTransportLocked() {
return false;
}
connected_subchannel_ = MakeRefCounted<NewConnectedSubchannel>(
std::move(*call_destination), std::move(transport_destination), args_,
channelz_node_);
std::move(*call_destination), std::move(transport_destination), args_);
}
connecting_result_.Reset();
// Publish.

@ -66,9 +66,6 @@ class SubchannelCall;
class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
public:
const ChannelArgs& args() const { return args_; }
channelz::SubchannelNode* channelz_subchannel() const {
return channelz_subchannel_.get();
}
virtual void StartWatch(
grpc_pollset_set* interested_parties,
@ -85,17 +82,14 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
virtual void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) = 0;
protected:
ConnectedSubchannel(
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
explicit ConnectedSubchannel(const ChannelArgs& args);
private:
ChannelArgs args_;
// ref counted pointer to the channelz node in this connected subchannel's
// owning subchannel.
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
};
class LegacyConnectedSubchannel;
// Implements the interface of RefCounted<>.
class SubchannelCall final {
public:
@ -150,7 +144,7 @@ class SubchannelCall final {
static void Destroy(void* arg, grpc_error_handle error);
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
RefCountedPtr<LegacyConnectedSubchannel> connected_subchannel_;
grpc_closure* after_call_stack_destroy_ = nullptr;
// State needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_;

@ -158,13 +158,15 @@ void Chttp2Connector::OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result) {
grpc_chttp2_transport_start_reading(
result_->transport, (*result)->read_buffer.c_slice_buffer(),
&on_receive_settings_, args_.interested_parties, nullptr);
timer_handle_ =
event_engine_->RunAfter(args_.deadline - Timestamp::Now(),
[self = RefAsSubclass<Chttp2Connector>()] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnTimeout();
});
timer_handle_ = event_engine_->RunAfter(
args_.deadline - Timestamp::Now(),
[self = RefAsSubclass<Chttp2Connector>()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnTimeout();
// Ensure the Chttp2Connector is deleted under an ExecCtx.
self.reset();
});
} else {
// If the handshaking succeeded but there is no endpoint, then the
// handshaker may have handed off the connection to some external

@ -456,15 +456,11 @@ static void read_channel_args(grpc_chttp2_transport* t,
if (t->is_client) {
t->keepalive_permit_without_calls =
channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
.value_or(grpc_core::IsKeepaliveFixEnabled()
? g_default_client_keepalive_permit_without_calls
: false);
.value_or(g_default_client_keepalive_permit_without_calls);
} else {
t->keepalive_permit_without_calls =
channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
.value_or(grpc_core::IsKeepaliveServerFixEnabled()
? g_default_server_keepalive_permit_without_calls
: false);
.value_or(g_default_server_keepalive_permit_without_calls);
}
t->settings_timeout =
@ -1468,11 +1464,9 @@ static void perform_stream_op_locked(void* stream_op,
frame_hdr[3] = static_cast<uint8_t>(len >> 8);
frame_hdr[4] = static_cast<uint8_t>(len);
if (grpc_core::IsHttp2StatsFixEnabled()) {
s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES;
s->stats.outgoing.data_bytes +=
op_payload->send_message.send_message->Length();
}
s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES;
s->stats.outgoing.data_bytes +=
op_payload->send_message.send_message->Length();
s->next_message_end_offset =
s->flow_controlled_bytes_written +
static_cast<int64_t>(s->flow_controlled_buffer.length) +

@ -78,9 +78,6 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
stats->framing_bytes += header_size;
if (!grpc_core::IsHttp2StatsFixEnabled()) {
stats->data_bytes += write_bytes;
}
}
grpc_core::Poll<grpc_error_handle> grpc_deframe_unprocessed_incoming_frames(

@ -356,7 +356,8 @@ Epoll1Poller::Epoll1Poller(Scheduler* scheduler)
wakeup_fd_ = *CreateWakeupFd();
CHECK(wakeup_fd_ != nullptr);
CHECK_GE(g_epoll_set_.epfd, 0);
LOG(INFO) << "grpc epoll fd: " << g_epoll_set_.epfd;
GRPC_TRACE_LOG(event_engine_poller, INFO)
<< "grpc epoll fd: " << g_epoll_set_.epfd;
struct epoll_event ev;
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
ev.data.ptr = wakeup_fd_.get();

@ -148,15 +148,21 @@ absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock,
return absl::OkStatus();
}
#endif // GRPC_POSIX_SOCKET_UTILS_COMMON
} // namespace
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#ifndef GRPC_SET_SOCKET_DUALSTACK_CUSTOM
bool SetSocketDualStack(int fd) {
const int off = 0;
return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off));
}
#endif // GRPC_SET_SOCKET_DUALSTACK_CUSTOM
#endif // GRPC_POSIX_SOCKET_UTILS_COMMON
} // namespace
PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
void* value;
PosixTcpOptions options;
@ -627,12 +633,15 @@ void PosixSocketWrapper::TrySetSocketTcpUserTimeout(
// if it is available.
if (g_socket_supports_tcp_user_timeout.load() == 0) {
if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
// This log is intentionally not protected behind a flag, so that users
// know that TCP_USER_TIMEOUT is not being used.
LOG(INFO) << "TCP_USER_TIMEOUT is not available. TCP_USER_TIMEOUT "
"won't be used thereafter";
g_socket_supports_tcp_user_timeout.store(-1);
} else {
LOG(INFO) << "TCP_USER_TIMEOUT is available. TCP_USER_TIMEOUT will be "
"used thereafter";
GRPC_TRACE_LOG(tcp, INFO)
<< "TCP_USER_TIMEOUT is available. TCP_USER_TIMEOUT will be "
"used thereafter";
g_socket_supports_tcp_user_timeout.store(1);
}
}

@ -323,6 +323,8 @@ struct PosixSocketWrapper::PosixSocketCreateResult {
EventEngine::ResolvedAddress mapped_target_addr;
};
bool SetSocketDualStack(int fd);
} // namespace experimental
} // namespace grpc_event_engine

@ -47,17 +47,6 @@ const char* const additional_constraints_event_engine_listener = "{}";
const char* const description_free_large_allocator =
"If set, return all free bytes from a \042big\042 allocator";
const char* const additional_constraints_free_large_allocator = "{}";
const char* const description_http2_stats_fix =
"Fix on HTTP2 outgoing data stats reporting";
const char* const additional_constraints_http2_stats_fix = "{}";
const char* const description_keepalive_fix =
"Allows overriding keepalive_permit_without_calls. Refer "
"https://github.com/grpc/grpc/pull/33428 for more information.";
const char* const additional_constraints_keepalive_fix = "{}";
const char* const description_keepalive_server_fix =
"Allows overriding keepalive_permit_without_calls for servers. Refer "
"https://github.com/grpc/grpc/pull/33917 for more information.";
const char* const additional_constraints_keepalive_server_fix = "{}";
const char* const description_max_pings_wo_data_throttle =
"Experiment to throttle pings to a period of 1 min when "
"GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA limit has reached (instead of "
@ -138,12 +127,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_event_engine_listener, nullptr, 0, false, true},
{"free_large_allocator", description_free_large_allocator,
additional_constraints_free_large_allocator, nullptr, 0, false, true},
{"http2_stats_fix", description_http2_stats_fix,
additional_constraints_http2_stats_fix, nullptr, 0, true, true},
{"keepalive_fix", description_keepalive_fix,
additional_constraints_keepalive_fix, nullptr, 0, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
additional_constraints_keepalive_server_fix, nullptr, 0, false, false},
{"max_pings_wo_data_throttle", description_max_pings_wo_data_throttle,
additional_constraints_max_pings_wo_data_throttle, nullptr, 0, false,
true},
@ -214,17 +197,6 @@ const char* const additional_constraints_event_engine_listener = "{}";
const char* const description_free_large_allocator =
"If set, return all free bytes from a \042big\042 allocator";
const char* const additional_constraints_free_large_allocator = "{}";
const char* const description_http2_stats_fix =
"Fix on HTTP2 outgoing data stats reporting";
const char* const additional_constraints_http2_stats_fix = "{}";
const char* const description_keepalive_fix =
"Allows overriding keepalive_permit_without_calls. Refer "
"https://github.com/grpc/grpc/pull/33428 for more information.";
const char* const additional_constraints_keepalive_fix = "{}";
const char* const description_keepalive_server_fix =
"Allows overriding keepalive_permit_without_calls for servers. Refer "
"https://github.com/grpc/grpc/pull/33917 for more information.";
const char* const additional_constraints_keepalive_server_fix = "{}";
const char* const description_max_pings_wo_data_throttle =
"Experiment to throttle pings to a period of 1 min when "
"GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA limit has reached (instead of "
@ -305,12 +277,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_event_engine_listener, nullptr, 0, true, true},
{"free_large_allocator", description_free_large_allocator,
additional_constraints_free_large_allocator, nullptr, 0, false, true},
{"http2_stats_fix", description_http2_stats_fix,
additional_constraints_http2_stats_fix, nullptr, 0, true, true},
{"keepalive_fix", description_keepalive_fix,
additional_constraints_keepalive_fix, nullptr, 0, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
additional_constraints_keepalive_server_fix, nullptr, 0, false, false},
{"max_pings_wo_data_throttle", description_max_pings_wo_data_throttle,
additional_constraints_max_pings_wo_data_throttle, nullptr, 0, false,
true},
@ -381,17 +347,6 @@ const char* const additional_constraints_event_engine_listener = "{}";
const char* const description_free_large_allocator =
"If set, return all free bytes from a \042big\042 allocator";
const char* const additional_constraints_free_large_allocator = "{}";
const char* const description_http2_stats_fix =
"Fix on HTTP2 outgoing data stats reporting";
const char* const additional_constraints_http2_stats_fix = "{}";
const char* const description_keepalive_fix =
"Allows overriding keepalive_permit_without_calls. Refer "
"https://github.com/grpc/grpc/pull/33428 for more information.";
const char* const additional_constraints_keepalive_fix = "{}";
const char* const description_keepalive_server_fix =
"Allows overriding keepalive_permit_without_calls for servers. Refer "
"https://github.com/grpc/grpc/pull/33917 for more information.";
const char* const additional_constraints_keepalive_server_fix = "{}";
const char* const description_max_pings_wo_data_throttle =
"Experiment to throttle pings to a period of 1 min when "
"GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA limit has reached (instead of "
@ -472,12 +427,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_event_engine_listener, nullptr, 0, true, true},
{"free_large_allocator", description_free_large_allocator,
additional_constraints_free_large_allocator, nullptr, 0, false, true},
{"http2_stats_fix", description_http2_stats_fix,
additional_constraints_http2_stats_fix, nullptr, 0, true, true},
{"keepalive_fix", description_keepalive_fix,
additional_constraints_keepalive_fix, nullptr, 0, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
additional_constraints_keepalive_server_fix, nullptr, 0, false, false},
{"max_pings_wo_data_throttle", description_max_pings_wo_data_throttle,
additional_constraints_max_pings_wo_data_throttle, nullptr, 0, false,
true},

@ -66,10 +66,6 @@ inline bool IsEventEngineClientEnabled() { return false; }
inline bool IsEventEngineDnsEnabled() { return false; }
inline bool IsEventEngineListenerEnabled() { return false; }
inline bool IsFreeLargeAllocatorEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX
inline bool IsHttp2StatsFixEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsMaxPingsWoDataThrottleEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_MONITORING_EXPERIMENT
inline bool IsMonitoringExperimentEnabled() { return true; }
@ -103,10 +99,6 @@ inline bool IsEventEngineDnsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_LISTENER
inline bool IsEventEngineListenerEnabled() { return true; }
inline bool IsFreeLargeAllocatorEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX
inline bool IsHttp2StatsFixEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsMaxPingsWoDataThrottleEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_MONITORING_EXPERIMENT
inline bool IsMonitoringExperimentEnabled() { return true; }
@ -139,10 +131,6 @@ inline bool IsEventEngineDnsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_LISTENER
inline bool IsEventEngineListenerEnabled() { return true; }
inline bool IsFreeLargeAllocatorEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX
inline bool IsHttp2StatsFixEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsMaxPingsWoDataThrottleEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_MONITORING_EXPERIMENT
inline bool IsMonitoringExperimentEnabled() { return true; }
@ -175,9 +163,6 @@ enum ExperimentIds {
kExperimentIdEventEngineDns,
kExperimentIdEventEngineListener,
kExperimentIdFreeLargeAllocator,
kExperimentIdHttp2StatsFix,
kExperimentIdKeepaliveFix,
kExperimentIdKeepaliveServerFix,
kExperimentIdMaxPingsWoDataThrottle,
kExperimentIdMonitoringExperiment,
kExperimentIdMultiping,
@ -227,18 +212,6 @@ inline bool IsEventEngineListenerEnabled() {
inline bool IsFreeLargeAllocatorEnabled() {
return IsExperimentEnabled<kExperimentIdFreeLargeAllocator>();
}
#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX
inline bool IsHttp2StatsFixEnabled() {
return IsExperimentEnabled<kExperimentIdHttp2StatsFix>();
}
#define GRPC_EXPERIMENT_IS_INCLUDED_KEEPALIVE_FIX
inline bool IsKeepaliveFixEnabled() {
return IsExperimentEnabled<kExperimentIdKeepaliveFix>();
}
#define GRPC_EXPERIMENT_IS_INCLUDED_KEEPALIVE_SERVER_FIX
inline bool IsKeepaliveServerFixEnabled() {
return IsExperimentEnabled<kExperimentIdKeepaliveServerFix>();
}
#define GRPC_EXPERIMENT_IS_INCLUDED_MAX_PINGS_WO_DATA_THROTTLE
inline bool IsMaxPingsWoDataThrottleEnabled() {
return IsExperimentEnabled<kExperimentIdMaxPingsWoDataThrottle>();

@ -91,28 +91,6 @@
expiry: 2024/08/01
owner: alishananda@google.com
test_tags: [resource_quota_test]
- name: http2_stats_fix
description:
Fix on HTTP2 outgoing data stats reporting
expiry: 2024/09/30
owner: yashkt@google.com
test_tags: []
- name: keepalive_fix
description:
Allows overriding keepalive_permit_without_calls.
Refer https://github.com/grpc/grpc/pull/33428 for more information.
expiry: 2024/06/30
owner: yashkt@google.com
test_tags: []
allow_in_fuzzing_config: false
- name: keepalive_server_fix
description:
Allows overriding keepalive_permit_without_calls for servers.
Refer https://github.com/grpc/grpc/pull/33917 for more information.
expiry: 2024/12/31
owner: yashkt@google.com
test_tags: []
allow_in_fuzzing_config: false
- name: max_pings_wo_data_throttle
description:
Experiment to throttle pings to a period of 1 min when
@ -205,6 +183,6 @@
Have the work serializer dispatch work to event engine for every callback,
instead of running things inline in the first thread that successfully
enqueues work.
expiry: 2024/06/30
expiry: 2024/09/30
owner: ysseung@google.com
test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test", "lb_unit_test"]

@ -73,12 +73,6 @@
windows: true
- name: free_large_allocator
default: false
- name: http2_stats_fix
default: true
- name: keepalive_fix
default: false
- name: keepalive_server_fix
default: false
- name: max_pings_wo_data_throttle
default: false
- name: monitoring_experiment

@ -31,7 +31,6 @@
#include "absl/log/log.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
@ -138,8 +137,8 @@ class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl {
void WorkSerializer::LegacyWorkSerializer::Run(std::function<void()> callback,
const DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line());
LOG(INFO) << "WorkSerializer::Run() " << this << " Scheduling callback ["
<< location.file() << ":" << location.line() << "]";
}
// Increment queue size for the new callback and owner count to attempt to
// take ownership of the WorkSerializer.
@ -164,7 +163,7 @@ void WorkSerializer::LegacyWorkSerializer::Run(std::function<void()> callback,
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper);
LOG(INFO) << " Scheduling on queue : item " << cb_wrapper;
}
queue_.Push(&cb_wrapper->mpscq_node);
}
@ -175,9 +174,9 @@ void WorkSerializer::LegacyWorkSerializer::Schedule(
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
gpr_log(GPR_INFO,
"WorkSerializer::Schedule() %p Scheduling callback %p [%s:%d]",
this, cb_wrapper, location.file(), location.line());
LOG(INFO) << "WorkSerializer::Schedule() " << this
<< " Scheduling callback " << cb_wrapper << " ["
<< location.file() << ":" << location.line() << "]";
}
refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_acq_rel);
queue_.Push(&cb_wrapper->mpscq_node);
@ -185,7 +184,7 @@ void WorkSerializer::LegacyWorkSerializer::Schedule(
void WorkSerializer::LegacyWorkSerializer::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
LOG(INFO) << "WorkSerializer::Orphan() " << this;
}
const uint64_t prev_ref_pair =
refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
@ -199,7 +198,7 @@ void WorkSerializer::LegacyWorkSerializer::Orphan() {
// execute all the scheduled callbacks.
void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
LOG(INFO) << "WorkSerializer::DrainQueue() " << this;
}
// Attempt to take ownership of the WorkSerializer. Also increment the queue
// size as required by `DrainQueueOwned()`.
@ -220,7 +219,7 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueueOwned() %p", this);
LOG(INFO) << "WorkSerializer::DrainQueueOwned() " << this;
}
while (true) {
auto prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1));
@ -267,9 +266,9 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
<< " Queue returned nullptr, trying again";
}
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
gpr_log(GPR_INFO, " Running item %p : callback scheduled at [%s:%d]",
cb_wrapper, cb_wrapper->location.file(),
cb_wrapper->location.line());
LOG(INFO) << " Running item " << cb_wrapper
<< " : callback scheduled at [" << cb_wrapper->location.file()
<< ":" << cb_wrapper->location.line() << "]";
}
cb_wrapper->callback();
delete cb_wrapper;
@ -407,8 +406,8 @@ void WorkSerializer::DispatchingWorkSerializer::Orphan() {
void WorkSerializer::DispatchingWorkSerializer::Run(
std::function<void()> callback, const DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
gpr_log(GPR_INFO, "WorkSerializer[%p] Scheduling callback [%s:%d]", this,
location.file(), location.line());
LOG(INFO) << "WorkSerializer[" << this << "] Scheduling callback ["
<< location.file() << ":" << location.line() << "]";
}
global_stats().IncrementWorkSerializerItemsEnqueued();
MutexLock lock(&mu_);
@ -440,8 +439,8 @@ void WorkSerializer::DispatchingWorkSerializer::Run() {
// queue since processing_ is stored in reverse order.
auto& cb = processing_.back();
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
gpr_log(GPR_INFO, "WorkSerializer[%p] Executing callback [%s:%d]", this,
cb.location.file(), cb.location.line());
LOG(INFO) << "WorkSerializer[" << this << "] Executing callback ["
<< cb.location.file() << ":" << cb.location.line() << "]";
}
// Run the work item.
const auto start = std::chrono::steady_clock::now();

@ -122,7 +122,7 @@ static bool epoll_set_init() {
return false;
}
gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
GRPC_TRACE_LOG(polling, INFO) << "grpc epoll fd: " << g_epoll_set.epfd;
gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
return true;

@ -36,11 +36,6 @@
namespace grpc_core {
namespace {
// TODO(ctiller): Once all activities are parties we can remove this.
thread_local Party** g_current_party_run_next = nullptr;
} // namespace
///////////////////////////////////////////////////////////////////////////////
// PartySyncUsingAtomics
@ -214,53 +209,67 @@ void Party::ForceImmediateRepoll(WakeupMask mask) {
sync_.ForceImmediateRepoll(mask);
}
void Party::RunLocked() {
void Party::RunLocked(Party* party) {
GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked");
#ifdef GRPC_MAXIMIZE_THREADYNESS
Thread thd(
"RunParty",
[party]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
if (party->RunParty()) party->PartyIsOver();
},
nullptr, Thread::Options().set_joinable(false));
thd.Start();
#else
struct RunState;
static thread_local RunState* g_run_state = nullptr;
struct RunState {
explicit RunState(Party* party) : running(party), next(nullptr) {}
Party* running;
Party* next;
void Run() {
g_run_state = this;
do {
GRPC_LATENT_SEE_INNER_SCOPE("run_one_party");
if (running->RunParty()) {
running->PartyIsOver();
}
running = std::exchange(next, nullptr);
} while (running != nullptr);
DCHECK(g_run_state == this);
g_run_state = nullptr;
}
};
// If there is a party running, then we don't run it immediately
// but instead add it to the end of the list of parties to run.
// This enables a fairly straightforward batching of work from a
// call to a transport (or back again).
if (g_current_party_run_next != nullptr) {
if (*g_current_party_run_next == nullptr) {
*g_current_party_run_next = this;
} else {
// But if there's already a party queued, we're better off asking event
// engine to run it so we can spread load.
arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
[this]() {
if (g_run_state != nullptr) {
if (g_run_state->running == party || g_run_state->next == party) {
// Already running or already queued.
return;
}
if (g_run_state->next != nullptr) {
// If there's already a different party queued, we're better off asking
// event engine to run it so we can spread load.
// We swap the oldest party to run on the event engine so that we don't
// accidentally end up with a tail latency problem whereby one party
// gets held for a really long time.
std::swap(g_run_state->next, party);
party->arena_->GetContext<grpc_event_engine::experimental::EventEngine>()
->Run([party]() {
GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked offload");
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunLocked();
RunState{party}.Run();
});
return;
}
g_run_state->next = party;
return;
}
auto body = [this]() {
DCHECK_EQ(g_current_party_run_next, nullptr);
Party* run_next = nullptr;
g_current_party_run_next = &run_next;
const bool done = RunParty();
DCHECK(g_current_party_run_next == &run_next);
g_current_party_run_next = nullptr;
if (done) {
PartyIsOver();
}
if (run_next != nullptr) {
run_next->RunLocked();
}
};
#ifdef GRPC_MAXIMIZE_THREADYNESS
Thread thd(
"RunParty",
[body]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
body();
},
nullptr, Thread::Options().set_joinable(false));
thd.Start();
#else
body();
RunState{party}.Run();
#endif
}
@ -320,12 +329,12 @@ void Party::AddParticipants(Participant** participants, size_t count) {
participants_[slots[i]].store(participants[i], std::memory_order_release);
}
});
if (run_party) RunLocked();
if (run_party) RunLocked(this);
Unref();
}
void Party::Wakeup(WakeupMask wakeup_mask) {
if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked();
if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked(this);
Unref();
}
@ -335,7 +344,7 @@ void Party::WakeupAsync(WakeupMask wakeup_mask) {
[this]() {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
RunLocked();
RunLocked(this);
Unref();
});
} else {

@ -581,7 +581,7 @@ class Party : public Activity, private Wakeable {
void CancelRemainingParticipants();
// Run the locked part of the party until it is unlocked.
void RunLocked();
static void RunLocked(Party* party);
// Called in response to Unref() hitting zero - ultimately calls PartyOver,
// but needs to set some stuff up.
// Here so it gets compiled out of line.

@ -26,6 +26,7 @@
#include <utility>
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
@ -355,7 +356,7 @@ void GrpcMemoryAllocatorImpl::MaybeDonateBack() {
std::memory_order_acq_rel,
std::memory_order_acquire)) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
gpr_log(GPR_INFO, "[%p] Early return %" PRIdPTR " bytes", this, ret);
LOG(INFO) << "[" << this << "] Early return " << ret << " bytes";
}
CHECK(taken_bytes_.fetch_sub(ret, std::memory_order_relaxed) >= ret);
memory_quota_->Return(ret);
@ -452,10 +453,9 @@ void BasicMemoryQuota::Start() {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
double free = std::max(intptr_t{0}, self->free_bytes_.load());
size_t quota_size = self->quota_size_.load();
gpr_log(GPR_INFO,
"RQ: %s perform %s reclamation. Available free bytes: %f, "
"total quota_size: %zu",
self->name_.c_str(), std::get<0>(arg), free, quota_size);
LOG(INFO) << "RQ: " << self->name_ << " perform " << std::get<0>(arg)
<< " reclamation. Available free bytes: " << free
<< ", total quota_size: " << quota_size;
}
// One of the reclaimer queues gave us a way to get back memory.
// Call the reclaimer with a token that contains enough to wake us
@ -535,10 +535,9 @@ void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
double free = std::max(intptr_t{0}, free_bytes_.load());
size_t quota_size = quota_size_.load();
gpr_log(GPR_INFO,
"RQ: %s reclamation complete. Available free bytes: %f, "
"total quota_size: %zu",
name_.c_str(), free, quota_size);
LOG(INFO) << "RQ: " << name_
<< " reclamation complete. Available free bytes: " << free
<< ", total quota_size: " << quota_size;
}
waker.Wakeup();
}
@ -550,7 +549,7 @@ void BasicMemoryQuota::Return(size_t amount) {
void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
gpr_log(GPR_INFO, "Adding allocator %p", allocator);
LOG(INFO) << "Adding allocator " << allocator;
}
AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator);
@ -563,7 +562,7 @@ void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
gpr_log(GPR_INFO, "Removing allocator %p", allocator);
LOG(INFO) << "Removing allocator " << allocator;
}
AllocatorBucket::Shard& small_shard =
@ -610,7 +609,7 @@ void BasicMemoryQuota::MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator,
void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
gpr_log(GPR_INFO, "Moving allocator %p to small", allocator);
LOG(INFO) << "Moving allocator " << allocator << " to small";
}
AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator);
@ -631,7 +630,7 @@ void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig(
GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
gpr_log(GPR_INFO, "Moving allocator %p to big", allocator);
LOG(INFO) << "Moving allocator " << allocator << " to big";
}
AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator);
@ -768,8 +767,8 @@ double PressureTracker::AddSampleAndGetControlValue(double sample) {
report = controller_.Update(current_estimate - kSetPoint);
}
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
gpr_log(GPR_INFO, "RQ: pressure:%lf report:%lf controller:%s",
current_estimate, report, controller_.DebugString().c_str());
LOG(INFO) << "RQ: pressure:" << current_estimate << " report:" << report
<< " controller:" << controller_.DebugString();
}
report_.store(report, std::memory_order_relaxed);
});

@ -29,12 +29,12 @@
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_set.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/memory_request.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
@ -426,7 +426,8 @@ class GrpcMemoryAllocatorImpl final : public EventEngineMemoryAllocatorImpl {
size_t ret = free_bytes_.exchange(0, std::memory_order_acq_rel);
if (ret == 0) return;
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
gpr_log(GPR_INFO, "Allocator %p returning %zu bytes to quota", this, ret);
LOG(INFO) << "Allocator " << this << " returning " << ret
<< " bytes to quota";
}
taken_bytes_.fetch_sub(ret, std::memory_order_relaxed);
memory_quota_->Return(ret);

@ -20,7 +20,6 @@
#include <atomic>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/debug/trace.h"
@ -48,16 +47,15 @@ struct grpc_slice_refcount {
void Ref(grpc_core::DebugLocation location) {
auto prev_refs = ref_.fetch_add(1, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(slice_refcount)) {
gpr_log(location.file(), location.line(), GPR_LOG_SEVERITY_INFO,
"REF %p %" PRIdPTR "->%" PRIdPTR, this, prev_refs, prev_refs + 1);
LOG(INFO).AtLocation(location.file(), location.line())
<< "REF " << this << " " << prev_refs << "->" << prev_refs + 1;
}
}
void Unref(grpc_core::DebugLocation location) {
auto prev_refs = ref_.fetch_sub(1, std::memory_order_acq_rel);
if (GRPC_TRACE_FLAG_ENABLED(slice_refcount)) {
gpr_log(location.file(), location.line(), GPR_LOG_SEVERITY_INFO,
"UNREF %p %" PRIdPTR "->%" PRIdPTR, this, prev_refs,
prev_refs - 1);
LOG(INFO).AtLocation(location.file(), location.line())
<< "UNREF " << this << " " << prev_refs << "->" << prev_refs - 1;
}
if (prev_refs == 1) {
destroyer_fn_(this);

@ -19,6 +19,7 @@
#include <set>
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/strings/str_join.h"
#include <grpc/support/port_platform.h>
@ -360,9 +361,8 @@ XdsDependencyManager::XdsDependencyManager(
args_(std::move(args)),
interested_parties_(interested_parties) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] starting watch for listener %s", this,
listener_resource_name_.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] starting watch for listener " << listener_resource_name_;
}
auto listener_watcher = MakeRefCounted<ListenerWatcher>(Ref());
listener_watcher_ = listener_watcher.get();
@ -372,7 +372,7 @@ XdsDependencyManager::XdsDependencyManager(
void XdsDependencyManager::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] shutting down", this);
LOG(INFO) << "[XdsDependencyManager " << this << "] shutting down";
}
if (listener_watcher_ != nullptr) {
XdsListenerResourceType::CancelWatch(
@ -405,8 +405,8 @@ void XdsDependencyManager::Orphan() {
void XdsDependencyManager::OnListenerUpdate(
std::shared_ptr<const XdsListenerResource> listener) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Listener update",
this);
LOG(INFO) << "[XdsDependencyManager " << this
<< "] received Listener update";
}
if (xds_client_ == nullptr) return;
const auto* hcm = absl::get_if<XdsListenerResource::HttpConnectionManager>(
@ -438,10 +438,9 @@ void XdsDependencyManager::OnListenerUpdate(
// Start watch for the new RDS resource name.
route_config_name_ = rds_name;
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(
GPR_INFO,
"[XdsDependencyManager %p] starting watch for route config %s",
this, route_config_name_.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] starting watch for route config "
<< route_config_name_;
}
auto watcher =
MakeRefCounted<RouteConfigWatcher>(Ref(), route_config_name_);
@ -526,9 +525,9 @@ void XdsDependencyManager::OnRouteConfigUpdate(
const std::string& name,
std::shared_ptr<const XdsRouteConfigResource> route_config) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] received RouteConfig update for %s",
this, name.empty() ? "<inline>" : name.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] received RouteConfig update for "
<< (name.empty() ? "<inline>" : name);
}
if (xds_client_ == nullptr) return;
// Ignore updates for stale names.
@ -561,10 +560,9 @@ void XdsDependencyManager::OnRouteConfigUpdate(
void XdsDependencyManager::OnError(std::string context, absl::Status status) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] received Listener or RouteConfig "
"error: %s %s",
this, context.c_str(), status.ToString().c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] received Listener or RouteConfig error: " << context << " "
<< status;
}
if (xds_client_ == nullptr) return;
if (current_virtual_host_ != nullptr) return;
@ -573,7 +571,7 @@ void XdsDependencyManager::OnError(std::string context, absl::Status status) {
void XdsDependencyManager::OnResourceDoesNotExist(std::string context) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] %s", this, context.c_str());
LOG(INFO) << "[XdsDependencyManager " << this << "] " << context;
}
if (xds_client_ == nullptr) return;
current_virtual_host_ = nullptr;
@ -584,8 +582,8 @@ void XdsDependencyManager::OnClusterUpdate(
const std::string& name,
std::shared_ptr<const XdsClusterResource> cluster) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Cluster update: %s",
this, name.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] received Cluster update: " << name;
}
if (xds_client_ == nullptr) return;
auto it = cluster_watchers_.find(name);
@ -597,8 +595,8 @@ void XdsDependencyManager::OnClusterUpdate(
void XdsDependencyManager::OnClusterError(const std::string& name,
absl::Status status) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Cluster error: %s %s",
this, name.c_str(), status.ToString().c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] received Cluster error: " << name << " " << status;
}
if (xds_client_ == nullptr) return;
auto it = cluster_watchers_.find(name);
@ -612,8 +610,8 @@ void XdsDependencyManager::OnClusterError(const std::string& name,
void XdsDependencyManager::OnClusterDoesNotExist(const std::string& name) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] Cluster does not exist: %s",
this, name.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] Cluster does not exist: " << name;
}
if (xds_client_ == nullptr) return;
auto it = cluster_watchers_.find(name);
@ -627,8 +625,8 @@ void XdsDependencyManager::OnEndpointUpdate(
const std::string& name,
std::shared_ptr<const XdsEndpointResource> endpoint) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] received Endpoint update: %s",
this, name.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] received Endpoint update: " << name;
}
if (xds_client_ == nullptr) return;
auto it = endpoint_watchers_.find(name);
@ -659,9 +657,8 @@ void XdsDependencyManager::OnEndpointUpdate(
void XdsDependencyManager::OnEndpointError(const std::string& name,
absl::Status status) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] received Endpoint error: %s %s", this,
name.c_str(), status.ToString().c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] received Endpoint error: " << name << " " << status;
}
if (xds_client_ == nullptr) return;
auto it = endpoint_watchers_.find(name);
@ -675,8 +672,8 @@ void XdsDependencyManager::OnEndpointError(const std::string& name,
void XdsDependencyManager::OnEndpointDoesNotExist(const std::string& name) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] Endpoint does not exist: %s",
this, name.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] Endpoint does not exist: " << name;
}
if (xds_client_ == nullptr) return;
auto it = endpoint_watchers_.find(name);
@ -690,8 +687,8 @@ void XdsDependencyManager::OnEndpointDoesNotExist(const std::string& name) {
void XdsDependencyManager::OnDnsResult(const std::string& dns_name,
Resolver::Result result) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] received DNS update: %s", this,
dns_name.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] received DNS update: " << dns_name;
}
if (xds_client_ == nullptr) return;
auto it = dns_resolvers_.find(dns_name);
@ -749,9 +746,8 @@ bool XdsDependencyManager::PopulateClusterConfigMap(
if (state.watcher == nullptr) {
auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), name);
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] starting watch for cluster %s", this,
std::string(name).c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] starting watch for cluster " << name;
}
state.watcher = watcher.get();
XdsClusterResourceType::StartWatch(xds_client_.get(), name,
@ -777,9 +773,8 @@ bool XdsDependencyManager::PopulateClusterConfigMap(
auto& eds_state = endpoint_watchers_[eds_resource_name];
if (eds_state.watcher == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] starting watch for endpoint %s",
this, std::string(eds_resource_name).c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] starting watch for endpoint " << eds_resource_name;
}
auto watcher =
MakeRefCounted<EndpointWatcher>(Ref(), eds_resource_name);
@ -806,9 +801,8 @@ bool XdsDependencyManager::PopulateClusterConfigMap(
auto& dns_state = dns_resolvers_[logical_dns.hostname];
if (dns_state.resolver == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] starting DNS resolver for %s",
this, logical_dns.hostname.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] starting DNS resolver for " << logical_dns.hostname;
}
auto* fake_resolver_response_generator = args_.GetPointer<
FakeResolverResponseGenerator>(
@ -974,9 +968,8 @@ void XdsDependencyManager::MaybeReportUpdate() {
continue;
}
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] cancelling watch for cluster %s", this,
cluster_name.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] cancelling watch for cluster " << cluster_name;
}
XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name,
it->second.watcher,
@ -993,9 +986,8 @@ void XdsDependencyManager::MaybeReportUpdate() {
continue;
}
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] cancelling watch for EDS resource %s",
this, eds_resource_name.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] cancelling watch for EDS resource " << eds_resource_name;
}
XdsEndpointResourceType::CancelWatch(xds_client_.get(), eds_resource_name,
it->second.watcher,
@ -1011,24 +1003,22 @@ void XdsDependencyManager::MaybeReportUpdate() {
continue;
}
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] shutting down DNS resolver for %s",
this, dns_name.c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] shutting down DNS resolver for " << dns_name;
}
dns_resolvers_.erase(it++);
}
// If we have all the data we need, then send an update.
if (!have_all_resources) {
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO,
"[XdsDependencyManager %p] missing data -- NOT returning config",
this);
LOG(INFO) << "[XdsDependencyManager " << this
<< "] missing data -- NOT returning config";
}
return;
}
if (GRPC_TRACE_FLAG_ENABLED(xds_resolver)) {
gpr_log(GPR_INFO, "[XdsDependencyManager %p] returning config: %s", this,
config->ToString().c_str());
LOG(INFO) << "[XdsDependencyManager " << this
<< "] returning config: " << config->ToString();
}
watcher_->OnUpdate(std::move(config));
}

@ -24,6 +24,7 @@
#include <cstddef>
#include "absl/log/check.h"
#include "absl/numeric/bits.h"
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"
@ -38,16 +39,6 @@ T Clamp(T val, T min, T max) {
return val;
}
/// rotl, rotr assume x is unsigned
template <typename T>
constexpr T RotateLeft(T x, T n) {
return ((x << n) | (x >> (sizeof(x) * 8 - n)));
}
template <typename T>
constexpr T RotateRight(T x, T n) {
return ((x >> n) | (x << (sizeof(x) * 8 - n)));
}
// Set the n-th bit of i
template <typename T>
T SetBit(T* i, size_t n) {
@ -197,7 +188,7 @@ inline int64_t SaturatingAdd(int64_t a, int64_t b) {
}
inline uint32_t MixHash32(uint32_t a, uint32_t b) {
return RotateLeft(a, 2u) ^ b;
return absl::rotl(a, 2u) ^ b;
}
inline uint32_t RoundUpToPowerOf2(uint32_t v) {

@ -192,9 +192,6 @@ class NewFakeStatsPlugin : public FakeStatsPlugin {
// This test verifies the HTTP2 stats on a stream
CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
if (!IsHttp2StatsFixEnabled()) {
GTEST_SKIP() << "Test needs http2_stats_fix experiment to be enabled";
}
g_mu = new Mutex();
g_client_call_ended_notify = new CoreEnd2endTest::TestNotification(this);
g_server_call_ended_notify = new CoreEnd2endTest::TestNotification(this);

@ -198,8 +198,6 @@ TEST_F(ConfigurationTest, ModifyServerDefaults) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_core::ForceEnableExperiment("keepalive_fix", true);
grpc_core::ForceEnableExperiment("keepalive_server_fix", true);
grpc_init();
auto ret = RUN_ALL_TESTS();
grpc_shutdown();

@ -37,11 +37,6 @@ TEST(UsefulTest, ClampWorks) {
EXPECT_EQ(Clamp(3, 0, 2), 2);
}
TEST(UsefulTest, Rotate) {
EXPECT_EQ(RotateLeft(0x80000001u, 1u), 3);
EXPECT_EQ(RotateRight(0x80000001u, 1u), 0xc0000000);
}
TEST(UsefulTest, ArraySize) {
int four[4];
int five[5];

@ -209,7 +209,8 @@ TEST(PreStopHookService, StartDoRequestStop) {
stub.async()->Hook(
&infos[1].context, &infos[1].request, &infos[1].response,
[&infos](const Status& status) { infos[1].SetStatus(status); });
ASSERT_TRUE(service.TestOnlyExpectRequests(2, absl::Milliseconds(100)));
ASSERT_TRUE(service.TestOnlyExpectRequests(
2, absl::Milliseconds(500) * grpc_test_slowdown_factor()));
ClientContext ctx;
SetReturnStatusRequest request;
request.set_grpc_code_to_return(StatusCode::INTERNAL);
@ -238,7 +239,8 @@ TEST(PreStopHookService, StartDoRequestStop) {
stub.async()->Hook(
&call_hangs.context, &call_hangs.request, &call_hangs.response,
[&](const Status& status) { call_hangs.SetStatus(status); });
ASSERT_TRUE(service.TestOnlyExpectRequests(1, absl::Milliseconds(100)));
ASSERT_TRUE(service.TestOnlyExpectRequests(
1, absl::Milliseconds(500) * grpc_test_slowdown_factor()));
status = call_hangs.WaitForStatus(absl::Milliseconds(100));
EXPECT_FALSE(status.has_value()) << status->error_message();
service.Stop();

@ -98,10 +98,10 @@ grpc_cc_library(
"benchmark",
],
deps = [
"//:grpc++_unsecure",
"//:grpc++",
"//src/proto/grpc/testing:echo_proto",
"//test/core/test_util:grpc_test_util",
"//test/core/test_util:grpc_test_util_base",
"//test/core/test_util:grpc_test_util_unsecure",
"//test/cpp/util:test_config",
],
)
@ -350,10 +350,6 @@ grpc_cc_benchmark(
srcs = [
"bm_callback_unary_ping_pong.cc",
],
tags = [
"manual",
"notap",
],
deps = [":callback_unary_ping_pong_h"],
)
@ -377,10 +373,6 @@ grpc_cc_benchmark(
srcs = [
"bm_callback_streaming_ping_pong.cc",
],
tags = [
"manual",
"notap",
],
deps = [":callback_streaming_ping_pong_h"],
)

@ -16,6 +16,7 @@
//
//
#include "test/core/test_util/build.h"
#include "test/core/test_util/test_config.h"
#include "test/cpp/microbenchmarks/callback_streaming_ping_pong.h"
#include "test/cpp/util/test_config.h"
@ -27,43 +28,43 @@ namespace testing {
// CONFIGURATIONS
//
// Replace "benchmark::internal::Benchmark" with "::testing::Benchmark" to use
// internal microbenchmarking tooling
static void StreamingPingPongMsgSizeArgs(benchmark::internal::Benchmark* b) {
// base case: 0 byte ping-pong msgs
b->Args({0, 1});
b->Args({0, 2});
static const int kMaxMessageSize = [] {
if (BuiltUnderMsan() || BuiltUnderTsan() || BuiltUnderUbsan()) {
// Scale down sizes for intensive benchmarks to avoid timeouts.
return 8 * 1024 * 1024;
}
return 128 * 1024 * 1024;
}();
// Generate Args for StreamingPingPong benchmarks. Currently generates args for
// only "small streams" (i.e streams with 0, 1 or 2 messages)
static void StreamingPingPongArgs(benchmark::internal::Benchmark* b) {
int msg_size = 0;
b->Args({0, 0}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here)
for (int msg_size = 1; msg_size <= 128 * 1024 * 1024; msg_size *= 8) {
for (msg_size = 0; msg_size <= kMaxMessageSize;
msg_size == 0 ? msg_size++ : msg_size *= 8) {
b->Args({msg_size, 1});
b->Args({msg_size, 2});
}
}
// Replace "benchmark::internal::Benchmark" with "::testing::Benchmark" to use
// internal microbenchmarking tooling
static void StreamingPingPongMsgsNumberArgs(benchmark::internal::Benchmark* b) {
for (int msg_number = 1; msg_number <= 256 * 1024; msg_number *= 8) {
b->Args({0, msg_number});
b->Args({1024, msg_number});
}
}
// Streaming with different message size
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgSizeArgs);
->Apply(StreamingPingPongArgs);
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, MinInProcess, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgSizeArgs);
->Apply(StreamingPingPongArgs);
// Streaming with different message number
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgsNumberArgs);
->Apply(StreamingPingPongArgs);
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, MinInProcess, NoOpMutator,
NoOpMutator)
->Apply(StreamingPingPongMsgsNumberArgs);
->Apply(StreamingPingPongArgs);
// Client context with different metadata
BENCHMARK_TEMPLATE(BM_CallbackBidiStreaming, InProcess,

@ -1,25 +0,0 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/linux/grpc_performance_profile_daily.sh"
timeout_mins: 1440
action {
define_artifacts {
regex: "github/grpc/reports/**"
}
}

@ -1,25 +0,0 @@
#!/usr/bin/env bash
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
# Enter the gRPC repo root
cd $(dirname $0)/../../..
source tools/internal_ci/helper_scripts/prepare_build_linux_rc
export DOCKERFILE_DIR=tools/dockerfile/test/cxx_debian11_x64
export DOCKER_RUN_SCRIPT=tools/internal_ci/linux/grpc_performance_profile_summary_in_docker.sh
exec tools/run_tests/dockerize/build_and_run_docker.sh

@ -1,25 +0,0 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/linux/grpc_performance_profile_master.sh"
timeout_mins: 600
action {
define_artifacts {
regex: "github/grpc/reports/**"
}
}

@ -1,25 +0,0 @@
#!/usr/bin/env bash
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
# Enter the gRPC repo root
cd $(dirname $0)/../../..
source tools/internal_ci/helper_scripts/prepare_build_linux_rc
export DOCKERFILE_DIR=tools/dockerfile/test/cxx_debian11_x64
export DOCKER_RUN_SCRIPT=tools/internal_ci/linux/grpc_performance_profile_summary_in_docker.sh
exec tools/run_tests/dockerize/build_and_run_docker.sh

@ -1,29 +0,0 @@
#!/usr/bin/env bash
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
# Enter the gRPC repo root
cd $(dirname $0)/../../..
# some extra pip packages are needed for the check_on_pr.py script to work
# TODO(jtattermusch): avoid needing to install these pip packages each time
time python3 -m pip install --user -r tools/internal_ci/helper_scripts/requirements.linux_perf.txt
CPUS=`python3 -c 'import multiprocessing; print(multiprocessing.cpu_count())'`
tools/run_tests/start_port_server.py
tools/run_tests/run_microbenchmark.py --collect summary --bq_result_table microbenchmarks.microbenchmarks

@ -60,12 +60,6 @@ DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"./src/core/ext/filters/logging/logging_filter.cc",
"./src/core/ext/filters/message_size/message_size_filter.cc",
"./src/core/ext/transport/binder/wire_format/wire_reader_impl.cc",
"./src/core/ext/transport/binder/wire_format/wire_writer.cc",
"./src/core/ext/transport/chaotic_good/chaotic_good_transport.h",
"./src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc",
"./src/core/ext/transport/chaotic_good/client_transport.cc",
"./src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc",
"./src/core/ext/transport/chaotic_good/server_transport.cc",
"./src/core/ext/transport/chttp2/client/chttp2_connector.cc",
"./src/core/ext/transport/chttp2/transport/bin_decoder.cc",
"./src/core/ext/transport/chttp2/transport/flow_control.cc",
@ -78,17 +72,12 @@ DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"./src/core/ext/transport/chttp2/transport/parsing.cc",
"./src/core/ext/transport/chttp2/transport/stream_lists.cc",
"./src/core/ext/transport/chttp2/transport/writing.cc",
"./src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc",
"./src/core/ext/transport/cronet/transport/cronet_transport.cc",
"./src/core/ext/transport/inproc/inproc_transport.cc",
"./src/core/ext/transport/inproc/legacy_inproc_transport.cc",
"./src/core/handshaker/handshaker.cc",
"./src/core/handshaker/http_connect/http_connect_handshaker.cc",
"./src/core/handshaker/http_connect/http_proxy_mapper.cc",
"./src/core/handshaker/security/secure_endpoint.cc",
"./src/core/lib/event_engine/ares_resolver.h",
"./src/core/lib/gprpp/time.h",
"./src/core/lib/gprpp/work_serializer.cc",
"./src/core/lib/iomgr/call_combiner.cc",
"./src/core/lib/iomgr/call_combiner.h",
"./src/core/lib/iomgr/cfstream_handle.cc",
@ -119,17 +108,6 @@ DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"./src/core/lib/promise/party.cc",
"./src/core/lib/promise/party.h",
"./src/core/lib/promise/pipe.h",
"./src/core/lib/resource_quota/memory_quota.cc",
"./src/core/lib/resource_quota/memory_quota.h",
"./src/core/lib/security/authorization/grpc_authorization_policy_provider.cc",
"./src/core/lib/security/authorization/grpc_server_authz_filter.cc",
"./src/core/lib/security/context/security_context.cc",
"./src/core/lib/security/credentials/jwt/jwt_credentials.cc",
"./src/core/lib/security/credentials/oauth2/oauth2_credentials.cc",
"./src/core/lib/security/credentials/plugin/plugin_credentials.cc",
"./src/core/lib/security/security_connector/security_connector.cc",
"./src/core/lib/security/transport/server_auth_filter.cc",
"./src/core/lib/slice/slice_refcount.h",
"./src/core/lib/surface/api_trace.h",
"./src/core/lib/surface/call.cc",
"./src/core/lib/surface/call_utils.cc",

@ -21,7 +21,6 @@ set -ex
test "$(bazel query 'somepath("//:grpc_unsecure", "//third_party:libssl")' 2>/dev/null | wc -l)" -eq 0 || exit 1
test "$(bazel query 'somepath("//:grpc++_unsecure", "//third_party:libssl")' 2>/dev/null | wc -l)" -eq 0 || exit 1
test "$(bazel query 'somepath("//:grpc++_codegen_proto", "//third_party:libssl")' 2>/dev/null | wc -l)" -eq 0 || exit 1
test "$(bazel query 'somepath("//test/cpp/microbenchmarks:helpers", "//third_party:libssl")' 2>/dev/null | wc -l)" -eq 0 || exit 1
# Make sure that core doesn't depend on anything in C++ library

Loading…
Cancel
Save