[client channel] remove grpc_channel_num_external_connectivity_watchers() (#35840)

Implements gRFC L113 (https://github.com/grpc/proposal/pull/417).

Closes #35840

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35840 from markdroth:client_channel_remove_num_external_watchers 334670c13c
PiperOrigin-RevId: 606766495
pull/35800/head
Mark D. Roth 10 months ago committed by Copybara-Service
parent 17d9e20ff1
commit 574b0572f1
  1. 1
      grpc.def
  2. 6
      include/grpc/grpc.h
  3. 15
      src/core/client_channel/channel_connectivity.cc
  4. 5
      src/core/client_channel/client_channel_filter.h
  5. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  6. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  7. 4
      test/core/surface/concurrent_connectivity_test.cc
  8. 14
      test/core/surface/num_external_connectivity_watchers_test.cc
  9. 3
      test/core/surface/sequential_connectivity_test.cc

1
grpc.def generated

@ -41,7 +41,6 @@ EXPORTS
grpc_completion_queue_thread_local_cache_init grpc_completion_queue_thread_local_cache_init
grpc_completion_queue_thread_local_cache_flush grpc_completion_queue_thread_local_cache_flush
grpc_channel_check_connectivity_state grpc_channel_check_connectivity_state
grpc_channel_num_external_connectivity_watchers
grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state
grpc_channel_support_connectivity_watcher grpc_channel_support_connectivity_watcher
grpc_channel_create_call grpc_channel_create_call

@ -177,12 +177,6 @@ GRPCAPI int grpc_completion_queue_thread_local_cache_flush(
GRPCAPI grpc_connectivity_state grpc_channel_check_connectivity_state( GRPCAPI grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel* channel, int try_to_connect); grpc_channel* channel, int try_to_connect);
/** Number of active "external connectivity state watchers" attached to a
* channel.
* Useful for testing. **/
GRPCAPI int grpc_channel_num_external_connectivity_watchers(
grpc_channel* channel);
/** Watch for a change in connectivity state. /** Watch for a change in connectivity state.
Once the channel connectivity state is different from last_observed_state, Once the channel connectivity state is different from last_observed_state,
tag will be enqueued on cq with success=1. tag will be enqueued on cq with success=1.

@ -81,21 +81,6 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
return client_channel->CheckConnectivityState(try_to_connect); return client_channel->CheckConnectivityState(try_to_connect);
} }
int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) {
grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel);
grpc_core::ClientChannelFilter* client_channel =
grpc_core::ClientChannelFilter::GetFromChannel(channel);
if (client_channel == nullptr) {
if (!grpc_core::IsLameChannel(channel)) {
gpr_log(GPR_ERROR,
"grpc_channel_num_external_connectivity_watchers called on "
"something that is not a client channel");
}
return 0;
}
return client_channel->NumExternalConnectivityWatchers();
}
int grpc_channel_support_connectivity_watcher(grpc_channel* channel) { int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
return grpc_core::ClientChannelFilter::GetFromChannel( return grpc_core::ClientChannelFilter::GetFromChannel(
grpc_core::Channel::FromC(channel)) != nullptr; grpc_core::Channel::FromC(channel)) != nullptr;

@ -149,11 +149,6 @@ class ClientChannelFilter {
this, on_complete, /*cancel=*/true); this, on_complete, /*cancel=*/true);
} }
int NumExternalConnectivityWatchers() const {
MutexLock lock(&external_watchers_mu_);
return static_cast<int>(external_watchers_.size());
}
// Starts and stops a connectivity watch. The watcher will be initially // Starts and stops a connectivity watch. The watcher will be initially
// notified as soon as the state changes from initial_state and then on // notified as soon as the state changes from initial_state and then on
// every subsequent state change until either the watch is stopped or // every subsequent state change until either the watch is stopped or

@ -64,7 +64,6 @@ grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import;
grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import; grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import;
grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import; grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import;
grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import;
grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import; grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import;
grpc_channel_create_call_type grpc_channel_create_call_import; grpc_channel_create_call_type grpc_channel_create_call_import;
@ -355,7 +354,6 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_completion_queue_thread_local_cache_init_import = (grpc_completion_queue_thread_local_cache_init_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_init"); grpc_completion_queue_thread_local_cache_init_import = (grpc_completion_queue_thread_local_cache_init_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_init");
grpc_completion_queue_thread_local_cache_flush_import = (grpc_completion_queue_thread_local_cache_flush_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_flush"); grpc_completion_queue_thread_local_cache_flush_import = (grpc_completion_queue_thread_local_cache_flush_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_flush");
grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state"); grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state");
grpc_channel_num_external_connectivity_watchers_import = (grpc_channel_num_external_connectivity_watchers_type) GetProcAddress(library, "grpc_channel_num_external_connectivity_watchers");
grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state"); grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state");
grpc_channel_support_connectivity_watcher_import = (grpc_channel_support_connectivity_watcher_type) GetProcAddress(library, "grpc_channel_support_connectivity_watcher"); grpc_channel_support_connectivity_watcher_import = (grpc_channel_support_connectivity_watcher_type) GetProcAddress(library, "grpc_channel_support_connectivity_watcher");
grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call"); grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call");

@ -167,9 +167,6 @@ extern grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue
typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel* channel, int try_to_connect); typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel* channel, int try_to_connect);
extern grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; extern grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
#define grpc_channel_check_connectivity_state grpc_channel_check_connectivity_state_import #define grpc_channel_check_connectivity_state grpc_channel_check_connectivity_state_import
typedef int(*grpc_channel_num_external_connectivity_watchers_type)(grpc_channel* channel);
extern grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import;
#define grpc_channel_num_external_connectivity_watchers grpc_channel_num_external_connectivity_watchers_import
typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel* channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue* cq, void* tag); typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel* channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue* cq, void* tag);
extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
#define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import #define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import

@ -93,8 +93,6 @@ void create_loop_destroy(void* addr) {
grpc_timeout_milliseconds_to_deadline(POLL_MILLIS); grpc_timeout_milliseconds_to_deadline(POLL_MILLIS);
ASSERT_EQ(grpc_completion_queue_next(cq, poll_time, nullptr).type, ASSERT_EQ(grpc_completion_queue_next(cq, poll_time, nullptr).type,
GRPC_OP_COMPLETE); GRPC_OP_COMPLETE);
// check that the watcher from "watch state" was free'd
ASSERT_EQ(grpc_channel_num_external_connectivity_watchers(chan), 0);
} }
grpc_channel_destroy(chan); grpc_channel_destroy(chan);
grpc_completion_queue_destroy(cq); grpc_completion_queue_destroy(cq);
@ -292,8 +290,6 @@ void watches_with_short_timeouts(void* addr) {
grpc_event ev = grpc_completion_queue_next(cq, poll_time, nullptr); grpc_event ev = grpc_completion_queue_next(cq, poll_time, nullptr);
ASSERT_EQ(ev.type, GRPC_OP_COMPLETE); ASSERT_EQ(ev.type, GRPC_OP_COMPLETE);
ASSERT_EQ(ev.success, false); ASSERT_EQ(ev.success, false);
// check that the watcher from "watch state" was free'd
ASSERT_EQ(grpc_channel_num_external_connectivity_watchers(chan), 0);
} }
grpc_channel_destroy(chan); grpc_channel_destroy(chan);
grpc_completion_queue_destroy(cq); grpc_completion_queue_destroy(cq);

@ -55,8 +55,6 @@ static void channel_idle_start_watch(grpc_channel* channel,
grpc_channel_watch_connectivity_state(channel, GRPC_CHANNEL_IDLE, grpc_channel_watch_connectivity_state(channel, GRPC_CHANNEL_IDLE,
connect_deadline, cq, connect_deadline, cq,
reinterpret_cast<void*>(next_tag++)); reinterpret_cast<void*>(next_tag++));
gpr_log(GPR_DEBUG, "number of active connect watchers: %d",
grpc_channel_num_external_connectivity_watchers(channel));
} }
static void channel_idle_poll_for_timeout(grpc_channel* channel, static void channel_idle_poll_for_timeout(grpc_channel* channel,
@ -71,9 +69,8 @@ static void channel_idle_poll_for_timeout(grpc_channel* channel,
GRPC_CHANNEL_IDLE); GRPC_CHANNEL_IDLE);
} }
// Test and use the "num_external_watchers" call to make sure // Test to make sure that "connectivity watcher" structs are free'd just
// that "connectivity watcher" structs are free'd just after, if // after, if their corresponding timeouts occur.
// their corresponding timeouts occur.
static void run_timeouts_test(const test_fixture* fixture) { static void run_timeouts_test(const test_fixture* fixture) {
gpr_log(GPR_INFO, "TEST: %s", fixture->name); gpr_log(GPR_INFO, "TEST: %s", fixture->name);
@ -87,7 +84,6 @@ static void run_timeouts_test(const test_fixture* fixture) {
// start 1 watcher and then let it time out // start 1 watcher and then let it time out
channel_idle_start_watch(channel, cq); channel_idle_start_watch(channel, cq);
channel_idle_poll_for_timeout(channel, cq); channel_idle_poll_for_timeout(channel, cq);
ASSERT_EQ(grpc_channel_num_external_connectivity_watchers(channel), 0);
// start 3 watchers and then let them all time out // start 3 watchers and then let them all time out
for (size_t i = 1; i <= 3; i++) { for (size_t i = 1; i <= 3; i++) {
@ -96,7 +92,6 @@ static void run_timeouts_test(const test_fixture* fixture) {
for (size_t i = 1; i <= 3; i++) { for (size_t i = 1; i <= 3; i++) {
channel_idle_poll_for_timeout(channel, cq); channel_idle_poll_for_timeout(channel, cq);
} }
ASSERT_EQ(grpc_channel_num_external_connectivity_watchers(channel), 0);
// start 3 watchers, see one time out, start another 3, and then see them all // start 3 watchers, see one time out, start another 3, and then see them all
// time out // time out
@ -110,7 +105,6 @@ static void run_timeouts_test(const test_fixture* fixture) {
for (size_t i = 1; i <= 5; i++) { for (size_t i = 1; i <= 5; i++) {
channel_idle_poll_for_timeout(channel, cq); channel_idle_poll_for_timeout(channel, cq);
} }
ASSERT_EQ(grpc_channel_num_external_connectivity_watchers(channel), 0);
grpc_channel_destroy(channel); grpc_channel_destroy(channel);
grpc_completion_queue_shutdown(cq); grpc_completion_queue_shutdown(cq);
@ -136,9 +130,7 @@ static void run_channel_shutdown_before_timeout_test(
grpc_channel* channel = fixture->create_channel(addr.c_str()); grpc_channel* channel = fixture->create_channel(addr.c_str());
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr); grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
// start 1 watcher and then shut down the channel before the timer goes off // start 1 watcher and then shut down the channel before the timer goes off.
ASSERT_EQ(grpc_channel_num_external_connectivity_watchers(channel), 0);
// expecting a 30 second timeout to go off much later than the shutdown. // expecting a 30 second timeout to go off much later than the shutdown.
gpr_timespec connect_deadline = grpc_timeout_seconds_to_deadline(30); gpr_timespec connect_deadline = grpc_timeout_seconds_to_deadline(30);
ASSERT_EQ(grpc_channel_check_connectivity_state(channel, 0), ASSERT_EQ(grpc_channel_check_connectivity_state(channel, 0),

@ -124,9 +124,6 @@ static void run_test(const test_fixture* fixture, bool share_subchannel) {
connect_deadline, cq, nullptr); connect_deadline, cq, nullptr);
grpc_event ev = grpc_completion_queue_next( grpc_event ev = grpc_completion_queue_next(
cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
// check that the watcher from "watch state" was free'd
ASSERT_EQ(grpc_channel_num_external_connectivity_watchers(channels[i]),
0);
ASSERT_EQ(ev.type, GRPC_OP_COMPLETE); ASSERT_EQ(ev.type, GRPC_OP_COMPLETE);
ASSERT_EQ(ev.tag, nullptr); ASSERT_EQ(ev.tag, nullptr);
ASSERT_EQ(ev.success, true); ASSERT_EQ(ev.success, true);

Loading…
Cancel
Save