Revert "Merge pull request #20255 from markdroth/transport_connectivity_state_watcher"

This reverts commit 0f5a111aad, reversing
changes made to 1276a8f628.
pull/20407/head
Mark D. Roth 6 years ago
parent 24b529e408
commit 33f139b6e6
  1. 73
      CMakeLists.txt
  2. 84
      Makefile
  3. 19
      build.yaml
  4. 235
      src/core/ext/filters/client_channel/client_channel.cc
  5. 6
      src/core/ext/filters/client_channel/client_channel.h
  6. 5
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  7. 8
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  8. 2
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  9. 4
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  10. 6
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  11. 6
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  12. 3
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  13. 97
      src/core/ext/filters/client_channel/subchannel.cc
  14. 6
      src/core/ext/filters/client_channel/subchannel.h
  15. 82
      src/core/ext/filters/max_age/max_age_filter.cc
  16. 31
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  17. 16
      src/core/ext/transport/chttp2/transport/internal.h
  18. 23
      src/core/ext/transport/inproc/inproc_transport.cc
  19. 3
      src/core/lib/channel/channelz.cc
  20. 29
      src/core/lib/surface/lame_client.cc
  21. 63
      src/core/lib/surface/server.cc
  22. 203
      src/core/lib/transport/connectivity_state.cc
  23. 140
      src/core/lib/transport/connectivity_state.h
  24. 9
      src/core/lib/transport/transport.h
  25. 25
      src/core/lib/transport/transport_op_string.cc
  26. 32
      test/core/surface/lame_client_test.cc
  27. 3
      test/core/transport/BUILD
  28. 206
      test/core/transport/connectivity_state_test.cc
  29. 48
      tools/run_tests/generated/tests.json

@ -427,6 +427,7 @@ add_dependencies(buildtests_c time_averaged_stats_test)
add_dependencies(buildtests_c timeout_encoding_test) add_dependencies(buildtests_c timeout_encoding_test)
add_dependencies(buildtests_c timer_heap_test) add_dependencies(buildtests_c timer_heap_test)
add_dependencies(buildtests_c timer_list_test) add_dependencies(buildtests_c timer_list_test)
add_dependencies(buildtests_c transport_connectivity_state_test)
add_dependencies(buildtests_c transport_metadata_test) add_dependencies(buildtests_c transport_metadata_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c transport_security_test) add_dependencies(buildtests_c transport_security_test)
@ -725,7 +726,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx time_change_test) add_dependencies(buildtests_cxx time_change_test)
endif() endif()
add_dependencies(buildtests_cxx timer_test) add_dependencies(buildtests_cxx timer_test)
add_dependencies(buildtests_cxx transport_connectivity_state_test)
add_dependencies(buildtests_cxx transport_pid_controller_test) add_dependencies(buildtests_cxx transport_pid_controller_test)
add_dependencies(buildtests_cxx transport_security_common_api_test) add_dependencies(buildtests_cxx transport_security_common_api_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -9847,6 +9847,37 @@ target_link_libraries(timer_list_test
) )
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(transport_connectivity_state_test
test/core/transport/connectivity_state_test.cc
)
target_include_directories(transport_connectivity_state_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_UPB_GENERATED_DIR}
PRIVATE ${_gRPC_UPB_GRPC_GENERATED_DIR}
PRIVATE ${_gRPC_UPB_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(transport_connectivity_state_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
)
endif (gRPC_BUILD_TESTS) endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS)
@ -16661,46 +16692,6 @@ target_link_libraries(timer_test
) )
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(transport_connectivity_state_test
test/core/transport/connectivity_state_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(transport_connectivity_state_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_UPB_GENERATED_DIR}
PRIVATE ${_gRPC_UPB_GRPC_GENERATED_DIR}
PRIVATE ${_gRPC_UPB_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(transport_connectivity_state_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS) endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS)

@ -1137,6 +1137,7 @@ time_averaged_stats_test: $(BINDIR)/$(CONFIG)/time_averaged_stats_test
timeout_encoding_test: $(BINDIR)/$(CONFIG)/timeout_encoding_test timeout_encoding_test: $(BINDIR)/$(CONFIG)/timeout_encoding_test
timer_heap_test: $(BINDIR)/$(CONFIG)/timer_heap_test timer_heap_test: $(BINDIR)/$(CONFIG)/timer_heap_test
timer_list_test: $(BINDIR)/$(CONFIG)/timer_list_test timer_list_test: $(BINDIR)/$(CONFIG)/timer_list_test
transport_connectivity_state_test: $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
transport_metadata_test: $(BINDIR)/$(CONFIG)/transport_metadata_test transport_metadata_test: $(BINDIR)/$(CONFIG)/transport_metadata_test
transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test
udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test
@ -1293,7 +1294,6 @@ thread_manager_test: $(BINDIR)/$(CONFIG)/thread_manager_test
thread_stress_test: $(BINDIR)/$(CONFIG)/thread_stress_test thread_stress_test: $(BINDIR)/$(CONFIG)/thread_stress_test
time_change_test: $(BINDIR)/$(CONFIG)/time_change_test time_change_test: $(BINDIR)/$(CONFIG)/time_change_test
timer_test: $(BINDIR)/$(CONFIG)/timer_test timer_test: $(BINDIR)/$(CONFIG)/timer_test
transport_connectivity_state_test: $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
transport_pid_controller_test: $(BINDIR)/$(CONFIG)/transport_pid_controller_test transport_pid_controller_test: $(BINDIR)/$(CONFIG)/transport_pid_controller_test
transport_security_common_api_test: $(BINDIR)/$(CONFIG)/transport_security_common_api_test transport_security_common_api_test: $(BINDIR)/$(CONFIG)/transport_security_common_api_test
writes_per_rpc_test: $(BINDIR)/$(CONFIG)/writes_per_rpc_test writes_per_rpc_test: $(BINDIR)/$(CONFIG)/writes_per_rpc_test
@ -1563,6 +1563,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/timeout_encoding_test \ $(BINDIR)/$(CONFIG)/timeout_encoding_test \
$(BINDIR)/$(CONFIG)/timer_heap_test \ $(BINDIR)/$(CONFIG)/timer_heap_test \
$(BINDIR)/$(CONFIG)/timer_list_test \ $(BINDIR)/$(CONFIG)/timer_list_test \
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test \
$(BINDIR)/$(CONFIG)/transport_metadata_test \ $(BINDIR)/$(CONFIG)/transport_metadata_test \
$(BINDIR)/$(CONFIG)/transport_security_test \ $(BINDIR)/$(CONFIG)/transport_security_test \
$(BINDIR)/$(CONFIG)/udp_server_test \ $(BINDIR)/$(CONFIG)/udp_server_test \
@ -1765,7 +1766,6 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/thread_stress_test \ $(BINDIR)/$(CONFIG)/thread_stress_test \
$(BINDIR)/$(CONFIG)/time_change_test \ $(BINDIR)/$(CONFIG)/time_change_test \
$(BINDIR)/$(CONFIG)/timer_test \ $(BINDIR)/$(CONFIG)/timer_test \
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test \
$(BINDIR)/$(CONFIG)/transport_pid_controller_test \ $(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/transport_security_common_api_test \ $(BINDIR)/$(CONFIG)/transport_security_common_api_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \ $(BINDIR)/$(CONFIG)/writes_per_rpc_test \
@ -1936,7 +1936,6 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/thread_stress_test \ $(BINDIR)/$(CONFIG)/thread_stress_test \
$(BINDIR)/$(CONFIG)/time_change_test \ $(BINDIR)/$(CONFIG)/time_change_test \
$(BINDIR)/$(CONFIG)/timer_test \ $(BINDIR)/$(CONFIG)/timer_test \
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test \
$(BINDIR)/$(CONFIG)/transport_pid_controller_test \ $(BINDIR)/$(CONFIG)/transport_pid_controller_test \
$(BINDIR)/$(CONFIG)/transport_security_common_api_test \ $(BINDIR)/$(CONFIG)/transport_security_common_api_test \
$(BINDIR)/$(CONFIG)/writes_per_rpc_test \ $(BINDIR)/$(CONFIG)/writes_per_rpc_test \
@ -2214,6 +2213,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/timer_heap_test || ( echo test timer_heap_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/timer_heap_test || ( echo test timer_heap_test failed ; exit 1 )
$(E) "[RUN] Testing timer_list_test" $(E) "[RUN] Testing timer_list_test"
$(Q) $(BINDIR)/$(CONFIG)/timer_list_test || ( echo test timer_list_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/timer_list_test || ( echo test timer_list_test failed ; exit 1 )
$(E) "[RUN] Testing transport_connectivity_state_test"
$(Q) $(BINDIR)/$(CONFIG)/transport_connectivity_state_test || ( echo test transport_connectivity_state_test failed ; exit 1 )
$(E) "[RUN] Testing transport_metadata_test" $(E) "[RUN] Testing transport_metadata_test"
$(Q) $(BINDIR)/$(CONFIG)/transport_metadata_test || ( echo test transport_metadata_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/transport_metadata_test || ( echo test transport_metadata_test failed ; exit 1 )
$(E) "[RUN] Testing transport_security_test" $(E) "[RUN] Testing transport_security_test"
@ -2480,8 +2481,6 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/time_change_test || ( echo test time_change_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/time_change_test || ( echo test time_change_test failed ; exit 1 )
$(E) "[RUN] Testing timer_test" $(E) "[RUN] Testing timer_test"
$(Q) $(BINDIR)/$(CONFIG)/timer_test || ( echo test timer_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/timer_test || ( echo test timer_test failed ; exit 1 )
$(E) "[RUN] Testing transport_connectivity_state_test"
$(Q) $(BINDIR)/$(CONFIG)/transport_connectivity_state_test || ( echo test transport_connectivity_state_test failed ; exit 1 )
$(E) "[RUN] Testing transport_pid_controller_test" $(E) "[RUN] Testing transport_pid_controller_test"
$(Q) $(BINDIR)/$(CONFIG)/transport_pid_controller_test || ( echo test transport_pid_controller_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/transport_pid_controller_test || ( echo test transport_pid_controller_test failed ; exit 1 )
$(E) "[RUN] Testing transport_security_common_api_test" $(E) "[RUN] Testing transport_security_common_api_test"
@ -13180,6 +13179,38 @@ endif
endif endif
TRANSPORT_CONNECTIVITY_STATE_TEST_SRC = \
test/core/transport/connectivity_state_test.cc \
TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(TRANSPORT_CONNECTIVITY_STATE_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
endif
$(OBJDIR)/$(CONFIG)/test/core/transport/connectivity_state_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_transport_connectivity_state_test: $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep)
endif
endif
TRANSPORT_METADATA_TEST_SRC = \ TRANSPORT_METADATA_TEST_SRC = \
test/core/transport/metadata_test.cc \ test/core/transport/metadata_test.cc \
@ -19921,49 +19952,6 @@ endif
endif endif
TRANSPORT_CONNECTIVITY_STATE_TEST_SRC = \
test/core/transport/connectivity_state_test.cc \
TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(TRANSPORT_CONNECTIVITY_STATE_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: $(PROTOBUF_DEP) $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/transport_connectivity_state_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/transport/connectivity_state_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_transport_connectivity_state_test: $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep)
endif
endif
TRANSPORT_PID_CONTROLLER_TEST_SRC = \ TRANSPORT_PID_CONTROLLER_TEST_SRC = \
test/core/transport/pid_controller_test.cc \ test/core/transport/pid_controller_test.cc \

@ -3847,6 +3847,15 @@ targets:
exclude_iomgrs: exclude_iomgrs:
- uv - uv
uses_polling: false uses_polling: false
- name: transport_connectivity_state_test
build: test
language: c
src:
- test/core/transport/connectivity_state_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- name: transport_metadata_test - name: transport_metadata_test
build: test build: test
language: c language: c
@ -5980,16 +5989,6 @@ targets:
- grpc++ - grpc++
- grpc - grpc
- gpr - gpr
- name: transport_connectivity_state_test
gtest: true
build: test
language: c++
src:
- test/core/transport/connectivity_state_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- name: transport_pid_controller_test - name: transport_pid_controller_test
build: test build: test
language: c++ language: c++

@ -152,41 +152,43 @@ class ChannelData {
SubchannelInterface* subchannel) const; SubchannelInterface* subchannel) const;
grpc_connectivity_state CheckConnectivityState(bool try_to_connect); grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
void AddExternalConnectivityWatcher(grpc_polling_entity pollent, void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_connectivity_state* state,
grpc_closure* on_complete, grpc_closure* on_complete,
grpc_closure* watcher_timer_init) { grpc_closure* watcher_timer_init) {
MutexLock lock(&external_watchers_mu_); // Will delete itself.
// Will be deleted when the watch is complete. New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
GPR_ASSERT(external_watchers_[on_complete] == nullptr); watcher_timer_init);
external_watchers_[on_complete] = New<ExternalConnectivityWatcher>(
this, pollent, state, on_complete, watcher_timer_init);
}
void RemoveExternalConnectivityWatcher(grpc_closure* on_complete,
bool cancel) {
MutexLock lock(&external_watchers_mu_);
auto it = external_watchers_.find(on_complete);
if (it != external_watchers_.end()) {
if (cancel) it->second->Cancel();
external_watchers_.erase(it);
}
} }
int NumExternalConnectivityWatchers() const { int NumExternalConnectivityWatchers() const {
MutexLock lock(&external_watchers_mu_); return external_connectivity_watcher_list_.size();
return static_cast<int>(external_watchers_.size());
} }
private: private:
class SubchannelWrapper; class SubchannelWrapper;
class ClientChannelControlHelper; class ClientChannelControlHelper;
// Represents a pending connectivity callback from an external caller class ExternalConnectivityWatcher {
// via grpc_client_channel_watch_connectivity_state().
class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface {
public: public:
class WatcherList {
public:
WatcherList() { gpr_mu_init(&mu_); }
~WatcherList() { gpr_mu_destroy(&mu_); }
int size() const;
ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
void Add(ExternalConnectivityWatcher* watcher);
void Remove(const ExternalConnectivityWatcher* watcher);
private:
// head_ is guarded by a mutex, since the size() method needs to
// iterate over the list, and it's called from the C-core API
// function grpc_channel_num_external_connectivity_watchers(), which
// is synchronous and therefore cannot run in the combiner.
mutable gpr_mu mu_;
ExternalConnectivityWatcher* head_ = nullptr;
};
ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent, ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_connectivity_state* state,
grpc_closure* on_complete, grpc_closure* on_complete,
@ -194,23 +196,17 @@ class ChannelData {
~ExternalConnectivityWatcher(); ~ExternalConnectivityWatcher();
void Notify(grpc_connectivity_state state) override;
void Cancel();
private: private:
static void AddWatcherLocked(void* arg, grpc_error* ignored); static void OnWatchCompleteLocked(void* arg, grpc_error* error);
static void RemoveWatcherLocked(void* arg, grpc_error* ignored); static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
ChannelData* chand_; ChannelData* chand_;
grpc_polling_entity pollent_; grpc_polling_entity pollent_;
grpc_connectivity_state initial_state_;
grpc_connectivity_state* state_; grpc_connectivity_state* state_;
grpc_closure* on_complete_; grpc_closure* on_complete_;
grpc_closure* watcher_timer_init_; grpc_closure* watcher_timer_init_;
grpc_closure add_closure_; grpc_closure my_closure_;
grpc_closure remove_closure_; ExternalConnectivityWatcher* next_ = nullptr;
Atomic<bool> done_{false};
}; };
ChannelData(grpc_channel_element_args* args, grpc_error** error); ChannelData(grpc_channel_element_args* args, grpc_error** error);
@ -277,7 +273,8 @@ class ChannelData {
grpc_pollset_set* interested_parties_; grpc_pollset_set* interested_parties_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_; OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
ConnectivityStateTracker state_tracker_; grpc_connectivity_state_tracker state_tracker_;
ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
UniquePtr<char> health_check_service_name_; UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_; RefCountedPtr<ServiceConfig> saved_service_config_;
bool received_first_resolver_result_ = false; bool received_first_resolver_result_ = false;
@ -308,13 +305,6 @@ class ChannelData {
gpr_mu info_mu_; gpr_mu info_mu_;
UniquePtr<char> info_lb_policy_name_; UniquePtr<char> info_lb_policy_name_;
UniquePtr<char> info_service_config_json_; UniquePtr<char> info_service_config_json_;
//
// Fields guarded by a mutex, since they need to be accessed
// synchronously via grpc_channel_num_external_connectivity_watchers().
//
mutable Mutex external_watchers_mu_;
Map<grpc_closure*, ExternalConnectivityWatcher*> external_watchers_;
}; };
// //
@ -1004,7 +994,8 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
"subchannel %p (connected_subchannel=%p state=%s); " "subchannel %p (connected_subchannel=%p state=%s); "
"hopping into combiner", "hopping into combiner",
parent_->chand_, parent_.get(), parent_->subchannel_, parent_->chand_, parent_.get(), parent_->subchannel_,
connected_subchannel.get(), ConnectivityStateName(new_state)); connected_subchannel.get(),
grpc_connectivity_state_name(new_state));
} }
// Will delete itself. // Will delete itself.
New<Updater>(Ref(), new_state, std::move(connected_subchannel)); New<Updater>(Ref(), new_state, std::move(connected_subchannel));
@ -1053,7 +1044,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
self->parent_->parent_->chand_, self->parent_->parent_.get(), self->parent_->parent_->chand_, self->parent_->parent_.get(),
self->parent_->parent_->subchannel_, self->parent_->parent_->subchannel_,
self->connected_subchannel_.get(), self->connected_subchannel_.get(),
ConnectivityStateName(self->state_), grpc_connectivity_state_name(self->state_),
self->parent_->watcher_.get()); self->parent_->watcher_.get());
} }
// Ignore update if the parent WatcherWrapper has been replaced // Ignore update if the parent WatcherWrapper has been replaced
@ -1114,6 +1105,55 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_; RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
}; };
//
// ChannelData::ExternalConnectivityWatcher::WatcherList
//
int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
MutexLock lock(&mu_);
int count = 0;
for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
++count;
}
return count;
}
ChannelData::ExternalConnectivityWatcher*
ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
grpc_closure* on_complete) const {
MutexLock lock(&mu_);
ExternalConnectivityWatcher* w = head_;
while (w != nullptr && w->on_complete_ != on_complete) {
w = w->next_;
}
return w;
}
void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
ExternalConnectivityWatcher* watcher) {
GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
MutexLock lock(&mu_);
GPR_ASSERT(watcher->next_ == nullptr);
watcher->next_ = head_;
head_ = watcher;
}
void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
const ExternalConnectivityWatcher* watcher) {
MutexLock lock(&mu_);
if (watcher == head_) {
head_ = watcher->next_;
return;
}
for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
if (w->next_ == watcher) {
w->next_ = w->next_->next_;
return;
}
}
GPR_UNREACHABLE_CODE(return );
}
// //
// ChannelData::ExternalConnectivityWatcher // ChannelData::ExternalConnectivityWatcher
// //
@ -1124,7 +1164,6 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
grpc_closure* watcher_timer_init) grpc_closure* watcher_timer_init)
: chand_(chand), : chand_(chand),
pollent_(pollent), pollent_(pollent),
initial_state_(*state),
state_(state), state_(state),
on_complete_(on_complete), on_complete_(on_complete),
watcher_timer_init_(watcher_timer_init) { watcher_timer_init_(watcher_timer_init) {
@ -1132,7 +1171,7 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
chand_->interested_parties_); chand_->interested_parties_);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher"); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this, GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
grpc_combiner_scheduler(chand_->combiner_)), grpc_combiner_scheduler(chand_->combiner_)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
@ -1144,61 +1183,42 @@ ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
"ExternalConnectivityWatcher"); "ExternalConnectivityWatcher");
} }
void ChannelData::ExternalConnectivityWatcher::Notify( void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
grpc_connectivity_state state) { void* arg, grpc_error* error) {
bool done = false;
if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
MemoryOrder::RELAXED)) {
return; // Already done.
}
// Report new state to the user.
*state_ = state;
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_NONE);
// Remove external watcher.
chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false);
// Hop back into the combiner to clean up.
// Not needed in state SHUTDOWN, because the tracker will
// automatically remove all watchers in that case.
if (state != GRPC_CHANNEL_SHUTDOWN) {
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this,
grpc_combiner_scheduler(chand_->combiner_)),
GRPC_ERROR_NONE);
}
}
void ChannelData::ExternalConnectivityWatcher::Cancel() {
bool done = false;
if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
MemoryOrder::RELAXED)) {
return; // Already done.
}
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED);
// Hop back into the combiner to clean up.
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this,
grpc_combiner_scheduler(chand_->combiner_)),
GRPC_ERROR_NONE);
}
void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked(
void* arg, grpc_error* ignored) {
ExternalConnectivityWatcher* self = ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg); static_cast<ExternalConnectivityWatcher*>(arg);
// This assumes that the closure is scheduled on the ExecCtx scheduler grpc_closure* on_complete = self->on_complete_;
// and that GRPC_CLOSURE_RUN() will run the closure immediately. self->chand_->external_connectivity_watcher_list_.Remove(self);
GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE); Delete(self);
// Add new watcher. GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
self->chand_->state_tracker_.AddWatcher(
self->initial_state_,
OrphanablePtr<ConnectivityStateWatcherInterface>(self));
} }
void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked( void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
void* arg, grpc_error* ignored) { void* arg, grpc_error* ignored) {
ExternalConnectivityWatcher* self = ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg); static_cast<ExternalConnectivityWatcher*>(arg);
self->chand_->state_tracker_.RemoveWatcher(self); if (self->state_ == nullptr) {
// Handle cancellation.
GPR_ASSERT(self->watcher_timer_init_ == nullptr);
ExternalConnectivityWatcher* found =
self->chand_->external_connectivity_watcher_list_.Lookup(
self->on_complete_);
if (found != nullptr) {
grpc_connectivity_state_notify_on_state_change(
&found->chand_->state_tracker_, nullptr, &found->my_closure_);
}
Delete(self);
return;
}
// New watcher.
self->chand_->external_connectivity_watcher_list_.Add(self);
// This assumes that the closure is scheduled on the ExecCtx scheduler
// and that GRPC_CLOSURE_RUN would run the closure immediately.
GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
grpc_combiner_scheduler(self->chand_->combiner_));
grpc_connectivity_state_notify_on_state_change(
&self->chand_->state_tracker_, self->state_, &self->my_closure_);
} }
// //
@ -1251,7 +1271,7 @@ class ChannelData::ClientChannelControlHelper
? "" ? ""
: " (ignoring -- channel shutting down)"; : " (ignoring -- channel shutting down)";
gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_, gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
ConnectivityStateName(state), picker.get(), extra); grpc_connectivity_state_name(state), picker.get(), extra);
} }
// Do update only if not shutting down. // Do update only if not shutting down.
if (disconnect_error == GRPC_ERROR_NONE) { if (disconnect_error == GRPC_ERROR_NONE) {
@ -1342,13 +1362,14 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
combiner_(grpc_combiner_create()), combiner_(grpc_combiner_create()),
interested_parties_(grpc_pollset_set_create()), interested_parties_(grpc_pollset_set_create()),
subchannel_pool_(GetSubchannelPool(args->channel_args)), subchannel_pool_(GetSubchannelPool(args->channel_args)),
state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
disconnect_error_(GRPC_ERROR_NONE) { disconnect_error_(GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p", gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
this, owning_stack_); this, owning_stack_);
} }
// Initialize data members. // Initialize data members.
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"client_channel");
gpr_mu_init(&info_mu_); gpr_mu_init(&info_mu_);
// Start backup polling. // Start backup polling.
grpc_client_channel_start_backup_polling(interested_parties_); grpc_client_channel_start_backup_polling(interested_parties_);
@ -1412,6 +1433,7 @@ ChannelData::~ChannelData() {
grpc_pollset_set_destroy(interested_parties_); grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(combiner_, "client_channel"); GRPC_COMBINER_UNREF(combiner_, "client_channel");
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED)); GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
grpc_connectivity_state_destroy(&state_tracker_);
gpr_mu_destroy(&info_mu_); gpr_mu_destroy(&info_mu_);
} }
@ -1425,7 +1447,7 @@ void ChannelData::UpdateStateAndPickerLocked(
received_first_resolver_result_ = false; received_first_resolver_result_ = false;
} }
// Update connectivity state. // Update connectivity state.
state_tracker_.SetState(state, reason); grpc_connectivity_state_set(&state_tracker_, state, reason);
if (channelz_node_ != nullptr) { if (channelz_node_ != nullptr) {
channelz_node_->SetConnectivityState(state); channelz_node_->SetConnectivityState(state);
channelz_node_->AddTraceEvent( channelz_node_->AddTraceEvent(
@ -1714,7 +1736,7 @@ bool ChannelData::ProcessResolverResultLocked(
} }
grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
if (state_tracker_.state() != GRPC_CHANNEL_READY) { if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected"); return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
} }
LoadBalancingPolicy::PickResult result = LoadBalancingPolicy::PickResult result =
@ -1742,12 +1764,12 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
static_cast<grpc_channel_element*>(op->handler_private.extra_arg); static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Connectivity watch. // Connectivity watch.
if (op->start_connectivity_watch != nullptr) { if (op->on_connectivity_state_change != nullptr) {
chand->state_tracker_.AddWatcher(op->start_connectivity_watch_state, grpc_connectivity_state_notify_on_state_change(
std::move(op->start_connectivity_watch)); &chand->state_tracker_, op->connectivity_state,
} op->on_connectivity_state_change);
if (op->stop_connectivity_watch != nullptr) { op->on_connectivity_state_change = nullptr;
chand->state_tracker_.RemoveWatcher(op->stop_connectivity_watch); op->connectivity_state = nullptr;
} }
// Ping. // Ping.
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
@ -1878,7 +1900,7 @@ void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
grpc_connectivity_state ChannelData::CheckConnectivityState( grpc_connectivity_state ChannelData::CheckConnectivityState(
bool try_to_connect) { bool try_to_connect) {
grpc_connectivity_state out = state_tracker_.state(); grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) { if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect"); GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this, GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
@ -3928,13 +3950,6 @@ void grpc_client_channel_watch_connectivity_state(
grpc_connectivity_state* state, grpc_closure* closure, grpc_connectivity_state* state, grpc_closure* closure,
grpc_closure* watcher_timer_init) { grpc_closure* watcher_timer_init) {
auto* chand = static_cast<ChannelData*>(elem->channel_data); auto* chand = static_cast<ChannelData*>(elem->channel_data);
if (state == nullptr) {
// Handle cancellation.
GPR_ASSERT(watcher_timer_init == nullptr);
chand->RemoveExternalConnectivityWatcher(closure, /*cancel=*/true);
return;
}
// Handle addition.
return chand->AddExternalConnectivityWatcher(pollent, state, closure, return chand->AddExternalConnectivityWatcher(pollent, state, closure,
watcher_timer_init); watcher_timer_init);
} }

@ -46,12 +46,6 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
int grpc_client_channel_num_external_connectivity_watchers( int grpc_client_channel_num_external_connectivity_watchers(
grpc_channel_element* elem); grpc_channel_element* elem);
// TODO(roth): This function is used both when handling external
// connectivity watchers and for LB policies like grpclb and xds that
// contain nested channels. In the latter case, we ideally want
// something closer to the normal connectivity state tracker API.
// When we have time, consider refactoring this somehow to allow each
// use-case to be handled more cleanly.
void grpc_client_channel_watch_connectivity_state( void grpc_client_channel_watch_connectivity_state(
grpc_channel_element* elem, grpc_polling_entity pollent, grpc_channel_element* elem, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_closure* on_complete, grpc_connectivity_state* state, grpc_closure* on_complete,

@ -53,8 +53,9 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) {
connectivity_state_.Load(MemoryOrder::RELAXED); connectivity_state_.Load(MemoryOrder::RELAXED);
json = grpc_json_create_child(nullptr, json, "state", nullptr, json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false); GRPC_JSON_OBJECT, false);
grpc_json_create_child(nullptr, json, "state", ConnectivityStateName(state), grpc_json_create_child(nullptr, json, "state",
GRPC_JSON_STRING, false); grpc_connectivity_state_name(state), GRPC_JSON_STRING,
false);
} }
grpc_json* SubchannelNode::RenderJson() { grpc_json* SubchannelNode::RenderJson() {

@ -660,7 +660,7 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[grpclb %p helper %p] pending child policy %p reports state=%s", "[grpclb %p helper %p] pending child policy %p reports state=%s",
parent_.get(), this, parent_->pending_child_policy_.get(), parent_.get(), this, parent_->pending_child_policy_.get(),
ConnectivityStateName(state)); grpc_connectivity_state_name(state));
} }
if (state != GRPC_CHANNEL_READY) return; if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set( grpc_pollset_set_del_pollset_set(
@ -700,7 +700,8 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[grpclb %p helper %p] state=%s passing child picker %p as-is", "[grpclb %p helper %p] state=%s passing child picker %p as-is",
parent_.get(), this, ConnectivityStateName(state), picker.get()); parent_.get(), this, grpc_connectivity_state_name(state),
picker.get());
} }
parent_->channel_control_helper()->UpdateState(state, std::move(picker)); parent_->channel_control_helper()->UpdateState(state, std::move(picker));
return; return;
@ -708,7 +709,8 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
// Cases 2 and 3a: wrap picker from the child in our own picker. // Cases 2 and 3a: wrap picker from the child in our own picker.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p", gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p",
parent_.get(), this, ConnectivityStateName(state), picker.get()); parent_.get(), this, grpc_connectivity_state_name(state),
picker.get());
} }
RefCountedPtr<GrpcLbClientStats> client_stats; RefCountedPtr<GrpcLbClientStats> client_stats;
if (parent_->lb_calld_ != nullptr && if (parent_->lb_calld_ != nullptr &&

@ -294,7 +294,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Pick First %p selected subchannel connectivity changed to %s", p, "Pick First %p selected subchannel connectivity changed to %s", p,
ConnectivityStateName(connectivity_state)); grpc_connectivity_state_name(connectivity_state));
} }
// If the new state is anything other than READY and there is a // If the new state is anything other than READY and there is a
// pending update, switch to the pending update. // pending update, switch to the pending update.

@ -379,8 +379,8 @@ void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(), p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(), subchannel_list()->num_subchannels(),
ConnectivityStateName(last_connectivity_state_), grpc_connectivity_state_name(last_connectivity_state_),
ConnectivityStateName(connectivity_state)); grpc_connectivity_state_name(connectivity_state));
} }
// Decide what state to report for aggregation purposes. // Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state // If we haven't seen a failure since the last time we were in state

@ -254,7 +254,8 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
subchannel_list_.get(), subchannel_data_->Index(), subchannel_list_.get(), subchannel_data_->Index(),
subchannel_list_->num_subchannels(), subchannel_list_->num_subchannels(),
subchannel_data_->subchannel_.get(), subchannel_data_->subchannel_.get(),
ConnectivityStateName(new_state), subchannel_list_->shutting_down(), grpc_connectivity_state_name(new_state),
subchannel_list_->shutting_down(),
subchannel_data_->pending_watcher_); subchannel_data_->pending_watcher_);
} }
if (!subchannel_list_->shutting_down() && if (!subchannel_list_->shutting_down() &&
@ -317,7 +318,8 @@ void SubchannelData<SubchannelListType,
" (subchannel %p): starting watch (from %s)", " (subchannel %p): starting watch (from %s)",
subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_.get(), ConnectivityStateName(connectivity_state_)); subchannel_.get(),
grpc_connectivity_state_name(connectivity_state_));
} }
GPR_ASSERT(pending_watcher_ == nullptr); GPR_ASSERT(pending_watcher_ == nullptr);
pending_watcher_ = pending_watcher_ =

@ -823,7 +823,7 @@ void XdsLb::FallbackHelper::UpdateState(grpc_connectivity_state state,
GPR_INFO, GPR_INFO,
"[xdslb %p helper %p] pending fallback policy %p reports state=%s", "[xdslb %p helper %p] pending fallback policy %p reports state=%s",
parent_.get(), this, parent_->pending_fallback_policy_.get(), parent_.get(), this, parent_->pending_fallback_policy_.get(),
ConnectivityStateName(state)); grpc_connectivity_state_name(state));
} }
if (state != GRPC_CHANNEL_READY) return; if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set( grpc_pollset_set_del_pollset_set(
@ -2502,7 +2502,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xdslb %p] Priority %" PRIu32 " (%p) connectivity changed to %s", "[xdslb %p] Priority %" PRIu32 " (%p) connectivity changed to %s",
xds_policy(), priority_, this, xds_policy(), priority_, this,
ConnectivityStateName(connectivity_state_)); grpc_connectivity_state_name(connectivity_state_));
} }
} }
@ -2834,7 +2834,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState(
"[xdslb %p helper %p] pending child policy %p reports state=%s", "[xdslb %p helper %p] pending child policy %p reports state=%s",
locality_->xds_policy(), this, locality_->xds_policy(), this,
locality_->pending_child_policy_.get(), locality_->pending_child_policy_.get(),
ConnectivityStateName(state)); grpc_connectivity_state_name(state));
} }
if (state != GRPC_CHANNEL_READY) return; if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set( grpc_pollset_set_del_pollset_set(

@ -123,7 +123,8 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"resolving_lb=%p helper=%p: pending child policy %p reports " "resolving_lb=%p helper=%p: pending child policy %p reports "
"state=%s", "state=%s",
parent_.get(), this, child_, ConnectivityStateName(state)); parent_.get(), this, child_,
grpc_connectivity_state_name(state));
} }
if (state != GRPC_CHANNEL_READY) return; if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set( grpc_pollset_set_del_pollset_set(

@ -95,14 +95,15 @@ ConnectedSubchannel::~ConnectedSubchannel() {
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
} }
void ConnectedSubchannel::StartWatch( void ConnectedSubchannel::NotifyOnStateChange(
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) { grpc_closure* closure) {
grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch = std::move(watcher); grpc_channel_element* elem;
op->start_connectivity_watch_state = GRPC_CHANNEL_READY; op->connectivity_state = state;
op->on_connectivity_state_change = closure;
op->bind_pollset_set = interested_parties; op->bind_pollset_set = interested_parties;
grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0); elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op); elem->filter->start_transport_op(elem, op);
} }
@ -309,14 +310,19 @@ void SubchannelCall::IncrementRefCount(const grpc_core::DebugLocation& location,
// Subchannel::ConnectedSubchannelStateWatcher // Subchannel::ConnectedSubchannelStateWatcher
// //
class Subchannel::ConnectedSubchannelStateWatcher class Subchannel::ConnectedSubchannelStateWatcher {
: public AsyncConnectivityStateWatcherInterface {
public: public:
// Must be instantiated while holding c->mu. // Must be instantiated while holding c->mu.
explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) { explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) {
// Steal subchannel ref for connecting. // Steal subchannel ref for connecting.
GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting");
// Start watching for connectivity state changes.
GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this,
grpc_schedule_on_exec_ctx);
c->connected_subchannel_->NotifyOnStateChange(c->pollset_set_,
&pending_connectivity_state_,
&on_connectivity_changed_);
} }
~ConnectedSubchannelStateWatcher() { ~ConnectedSubchannelStateWatcher() {
@ -324,41 +330,54 @@ class Subchannel::ConnectedSubchannelStateWatcher
} }
private: private:
void OnConnectivityStateChange(grpc_connectivity_state new_state) override { static void OnConnectivityChanged(void* arg, grpc_error* error) {
Subchannel* c = subchannel_; auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
MutexLock lock(&c->mu_); Subchannel* c = self->subchannel_;
switch (new_state) { {
case GRPC_CHANNEL_TRANSIENT_FAILURE: MutexLock lock(&c->mu_);
case GRPC_CHANNEL_SHUTDOWN: { switch (self->pending_connectivity_state_) {
if (!c->disconnected_ && c->connected_subchannel_ != nullptr) { case GRPC_CHANNEL_TRANSIENT_FAILURE:
if (grpc_trace_subchannel.enabled()) { case GRPC_CHANNEL_SHUTDOWN: {
gpr_log(GPR_INFO, if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
"Connected subchannel %p of subchannel %p has gone into " if (grpc_trace_subchannel.enabled()) {
"%s. Attempting to reconnect.", gpr_log(GPR_INFO,
c->connected_subchannel_.get(), c, "Connected subchannel %p of subchannel %p has gone into "
ConnectivityStateName(new_state)); "%s. Attempting to reconnect.",
} c->connected_subchannel_.get(), c,
c->connected_subchannel_.reset(); grpc_connectivity_state_name(
if (c->channelz_node() != nullptr) { self->pending_connectivity_state_));
c->channelz_node()->SetChildSocket(nullptr); }
c->connected_subchannel_.reset();
if (c->channelz_node() != nullptr) {
c->channelz_node()->SetChildSocket(nullptr);
}
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE);
c->backoff_begun_ = false;
c->backoff_.Reset();
} }
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE); break;
c->backoff_begun_ = false; }
c->backoff_.Reset(); default: {
// In principle, this should never happen. We should not get
// a callback for READY, because that was the state we started
// this watch from. And a connected subchannel should never go
// from READY to CONNECTING or IDLE.
c->SetConnectivityStateLocked(self->pending_connectivity_state_);
c->connected_subchannel_->NotifyOnStateChange(
nullptr, &self->pending_connectivity_state_,
&self->on_connectivity_changed_);
return; // So we don't delete ourself below.
} }
break;
}
default: {
// In principle, this should never happen. We should not get
// a callback for READY, because that was the state we started
// this watch from. And a connected subchannel should never go
// from READY to CONNECTING or IDLE.
c->SetConnectivityStateLocked(new_state);
} }
} }
// Don't delete until we've released the lock, because this might
// cause the subchannel (which contains the lock) to be destroyed.
Delete(self);
} }
Subchannel* subchannel_; Subchannel* subchannel_;
grpc_closure on_connectivity_changed_;
grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY;
}; };
// //
@ -1069,10 +1088,8 @@ bool Subchannel::PublishTransportLocked() {
if (channelz_node_ != nullptr) { if (channelz_node_ != nullptr) {
channelz_node_->SetChildSocket(std::move(socket)); channelz_node_->SetChildSocket(std::move(socket));
} }
// Start watching connected subchannel. // Instantiate state watcher. Will clean itself up.
connected_subchannel_->StartWatch( New<ConnectedSubchannelStateWatcher>(this);
pollset_set_, OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface>(
New<ConnectedSubchannelStateWatcher>(this)));
// Report initial state. // Report initial state.
SetConnectivityStateLocked(GRPC_CHANNEL_READY); SetConnectivityStateLocked(GRPC_CHANNEL_READY);
return true; return true;

@ -77,9 +77,9 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel); RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
~ConnectedSubchannel(); ~ConnectedSubchannel();
void StartWatch(grpc_pollset_set* interested_parties, void NotifyOnStateChange(grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher); grpc_connectivity_state* state,
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_channel_stack* channel_stack() const { return channel_stack_; } grpc_channel_stack* channel_stack() const { return channel_stack_; }

@ -90,6 +90,10 @@ struct channel_data {
grpc_closure start_max_age_timer_after_init; grpc_closure start_max_age_timer_after_init;
/* Closure to run when the goaway op is finished and the max_age_timer */ /* Closure to run when the goaway op is finished and the max_age_timer */
grpc_closure start_max_age_grace_timer_after_goaway_op; grpc_closure start_max_age_grace_timer_after_goaway_op;
/* Closure to run when the channel connectivity state changes */
grpc_closure channel_connectivity_changed;
/* Records the current connectivity state */
grpc_connectivity_state connectivity_state;
/* Number of active calls */ /* Number of active calls */
gpr_atm call_count; gpr_atm call_count;
/* TODO(zyc): C++lize this state machine */ /* TODO(zyc): C++lize this state machine */
@ -216,47 +220,6 @@ static void start_max_idle_timer_after_init(void* arg, grpc_error* error) {
"max_age start_max_idle_timer_after_init"); "max_age start_max_idle_timer_after_init");
} }
namespace grpc_core {
class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface {
public:
explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) {
GRPC_CHANNEL_STACK_REF(chand_->channel_stack, "max_age conn_watch");
}
~ConnectivityWatcher() {
GRPC_CHANNEL_STACK_UNREF(chand_->channel_stack, "max_age conn_watch");
}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
{
MutexLock lock(&chand_->max_age_timer_mu);
if (chand_->max_age_timer_pending) {
grpc_timer_cancel(&chand_->max_age_timer);
chand_->max_age_timer_pending = false;
}
if (chand_->max_age_grace_timer_pending) {
grpc_timer_cancel(&chand_->max_age_grace_timer);
chand_->max_age_grace_timer_pending = false;
}
}
/* If there are no active calls, this increasement will cancel
max_idle_timer, and prevent max_idle_timer from being started in the
future. */
increase_call_count(chand_);
if (gpr_atm_acq_load(&chand_->idle_state) ==
MAX_IDLE_STATE_SEEN_EXIT_IDLE) {
grpc_timer_cancel(&chand_->max_idle_timer);
}
}
channel_data* chand_;
};
} // namespace grpc_core
static void start_max_age_timer_after_init(void* arg, grpc_error* error) { static void start_max_age_timer_after_init(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg); channel_data* chand = static_cast<channel_data*>(arg);
gpr_mu_lock(&chand->max_age_timer_mu); gpr_mu_lock(&chand->max_age_timer_mu);
@ -267,9 +230,8 @@ static void start_max_age_timer_after_init(void* arg, grpc_error* error) {
&chand->close_max_age_channel); &chand->close_max_age_channel);
gpr_mu_unlock(&chand->max_age_timer_mu); gpr_mu_unlock(&chand->max_age_timer_mu);
grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch.reset( op->on_connectivity_state_change = &chand->channel_connectivity_changed;
grpc_core::New<grpc_core::ConnectivityWatcher>(chand)); op->connectivity_state = &chand->connectivity_state;
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), op); grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), op);
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, GRPC_CHANNEL_STACK_UNREF(chand->channel_stack,
"max_age start_max_age_timer_after_init"); "max_age start_max_age_timer_after_init");
@ -388,6 +350,35 @@ static void force_close_max_age_channel(void* arg, grpc_error* error) {
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_grace_timer"); GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_grace_timer");
} }
static void channel_connectivity_changed(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->on_connectivity_state_change = &chand->channel_connectivity_changed;
op->connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0),
op);
} else {
gpr_mu_lock(&chand->max_age_timer_mu);
if (chand->max_age_timer_pending) {
grpc_timer_cancel(&chand->max_age_timer);
chand->max_age_timer_pending = false;
}
if (chand->max_age_grace_timer_pending) {
grpc_timer_cancel(&chand->max_age_grace_timer);
chand->max_age_grace_timer_pending = false;
}
gpr_mu_unlock(&chand->max_age_timer_mu);
/* If there are no active calls, this increasement will cancel
max_idle_timer, and prevent max_idle_timer from being started in the
future. */
increase_call_count(chand);
if (gpr_atm_acq_load(&chand->idle_state) == MAX_IDLE_STATE_SEEN_EXIT_IDLE) {
grpc_timer_cancel(&chand->max_idle_timer);
}
}
}
/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out /* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out
connection storms. Note that the MAX_CONNECTION_AGE option without jitter connection storms. Note that the MAX_CONNECTION_AGE option without jitter
would not create connection storms by itself, but if there happened to be a would not create connection storms by itself, but if there happened to be a
@ -481,6 +472,9 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem,
GRPC_CLOSURE_INIT(&chand->start_max_age_grace_timer_after_goaway_op, GRPC_CLOSURE_INIT(&chand->start_max_age_grace_timer_after_goaway_op,
start_max_age_grace_timer_after_goaway_op, chand, start_max_age_grace_timer_after_goaway_op, chand,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
channel_connectivity_changed, chand,
grpc_schedule_on_exec_ctx);
if (chand->max_connection_age != GRPC_MILLIS_INF_FUTURE) { if (chand->max_connection_age != GRPC_MILLIS_INF_FUTURE) {
/* When the channel reaches its max age, we send down an op with /* When the channel reaches its max age, we send down an op with

@ -196,6 +196,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0); GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
grpc_chttp2_stream_map_destroy(&stream_map); grpc_chttp2_stream_map_destroy(&stream_map);
grpc_connectivity_state_destroy(&channel_callback.state_tracker);
cancel_pings(this, cancel_pings(this,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
@ -465,8 +466,6 @@ grpc_chttp2_transport::grpc_chttp2_transport(
ep(ep), ep(ep),
peer_string(grpc_endpoint_get_peer(ep)), peer_string(grpc_endpoint_get_peer(ep)),
resource_user(resource_user), resource_user(resource_user),
state_tracker(is_client ? "client_transport" : "server_transport",
GRPC_CHANNEL_READY),
is_client(is_client), is_client(is_client),
next_stream_id(is_client ? 1 : 2), next_stream_id(is_client ? 1 : 2),
deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) { deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
@ -481,6 +480,9 @@ grpc_chttp2_transport::grpc_chttp2_transport(
grpc_chttp2_stream_map_init(&stream_map, 8); grpc_chttp2_stream_map_init(&stream_map, 8);
grpc_slice_buffer_init(&read_buffer); grpc_slice_buffer_init(&read_buffer);
grpc_connectivity_state_init(
&channel_callback.state_tracker, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
grpc_slice_buffer_init(&outbuf); grpc_slice_buffer_init(&outbuf);
if (is_client) { if (is_client) {
grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string( grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
@ -768,7 +770,7 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
uint32_t id) { uint32_t id) {
if (t->accept_stream_cb == nullptr) { if (t->channel_callback.accept_stream == nullptr) {
return nullptr; return nullptr;
} }
// Don't accept the stream if memory quota doesn't allow. Note that we should // Don't accept the stream if memory quota doesn't allow. Note that we should
@ -786,8 +788,9 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* accepting = nullptr; grpc_chttp2_stream* accepting = nullptr;
GPR_ASSERT(t->accepting_stream == nullptr); GPR_ASSERT(t->accepting_stream == nullptr);
t->accepting_stream = &accepting; t->accepting_stream = &accepting;
t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base, t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
(void*)static_cast<uintptr_t>(id)); &t->base,
(void*)static_cast<uintptr_t>(id));
t->accepting_stream = nullptr; t->accepting_stream = nullptr;
return accepting; return accepting;
} }
@ -1840,8 +1843,9 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
} }
if (op->set_accept_stream) { if (op->set_accept_stream) {
t->accept_stream_cb = op->set_accept_stream_fn; t->channel_callback.accept_stream = op->set_accept_stream_fn;
t->accept_stream_cb_user_data = op->set_accept_stream_user_data; t->channel_callback.accept_stream_user_data =
op->set_accept_stream_user_data;
} }
if (op->bind_pollset) { if (op->bind_pollset) {
@ -1857,12 +1861,10 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
} }
if (op->start_connectivity_watch != nullptr) { if (op->on_connectivity_state_change != nullptr) {
t->state_tracker.AddWatcher(op->start_connectivity_watch_state, grpc_connectivity_state_notify_on_state_change(
std::move(op->start_connectivity_watch)); &t->channel_callback.state_tracker, op->connectivity_state,
} op->on_connectivity_state_change);
if (op->stop_connectivity_watch != nullptr) {
t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
} }
if (op->disconnect_with_error != GRPC_ERROR_NONE) { if (op->disconnect_with_error != GRPC_ERROR_NONE) {
@ -2848,7 +2850,8 @@ static void connectivity_state_set(grpc_chttp2_transport* t,
const char* reason) { const char* reason) {
GRPC_CHTTP2_IF_TRACING( GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state)); gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
t->state_tracker.SetState(state, reason); grpc_connectivity_state_set(&t->channel_callback.state_tracker, state,
reason);
} }
/******************************************************************************* /*******************************************************************************

@ -339,13 +339,15 @@ struct grpc_chttp2_transport {
publish the accepted server stream */ publish the accepted server stream */
grpc_chttp2_stream** accepting_stream = nullptr; grpc_chttp2_stream** accepting_stream = nullptr;
/* accept stream callback */ struct {
void (*accept_stream_cb)(void* user_data, grpc_transport* transport, /* accept stream callback */
const void* server_data); void (*accept_stream)(void* user_data, grpc_transport* transport,
void* accept_stream_cb_user_data; const void* server_data);
void* accept_stream_user_data;
/** connectivity tracking */
grpc_core::ConnectivityStateTracker state_tracker; /** connectivity tracking */
grpc_connectivity_state_tracker state_tracker;
} channel_callback;
/** data to write now */ /** data to write now */
grpc_slice_buffer outbuf; grpc_slice_buffer outbuf;

@ -75,17 +75,17 @@ struct shared_mu {
struct inproc_transport { struct inproc_transport {
inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu, inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu,
bool is_client) bool is_client)
: mu(mu), : mu(mu), is_client(is_client) {
is_client(is_client),
state_tracker(is_client ? "inproc_client" : "inproc_server",
GRPC_CHANNEL_READY) {
base.vtable = vtable; base.vtable = vtable;
// Start each side of transport with 2 refs since they each have a ref // Start each side of transport with 2 refs since they each have a ref
// to the other // to the other
gpr_ref_init(&refs, 2); gpr_ref_init(&refs, 2);
grpc_connectivity_state_init(&connectivity, GRPC_CHANNEL_READY,
is_client ? "inproc_client" : "inproc_server");
} }
~inproc_transport() { ~inproc_transport() {
grpc_connectivity_state_destroy(&connectivity);
if (gpr_unref(&mu->refs)) { if (gpr_unref(&mu->refs)) {
mu->~shared_mu(); mu->~shared_mu();
gpr_free(mu); gpr_free(mu);
@ -111,7 +111,7 @@ struct inproc_transport {
shared_mu* mu; shared_mu* mu;
gpr_refcount refs; gpr_refcount refs;
bool is_client; bool is_client;
grpc_core::ConnectivityStateTracker state_tracker; grpc_connectivity_state_tracker connectivity;
void (*accept_stream_cb)(void* user_data, grpc_transport* transport, void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
const void* server_data); const void* server_data);
void* accept_stream_data; void* accept_stream_data;
@ -1090,7 +1090,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
void close_transport_locked(inproc_transport* t) { void close_transport_locked(inproc_transport* t) {
INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, "close transport"); grpc_connectivity_state_set(&t->connectivity, GRPC_CHANNEL_SHUTDOWN,
"close transport");
if (!t->is_closed) { if (!t->is_closed) {
t->is_closed = true; t->is_closed = true;
/* Also end all streams on this transport */ /* Also end all streams on this transport */
@ -1109,12 +1110,10 @@ void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
inproc_transport* t = reinterpret_cast<inproc_transport*>(gt); inproc_transport* t = reinterpret_cast<inproc_transport*>(gt);
INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op); INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op);
gpr_mu_lock(&t->mu->mu); gpr_mu_lock(&t->mu->mu);
if (op->start_connectivity_watch != nullptr) { if (op->on_connectivity_state_change) {
t->state_tracker.AddWatcher(op->start_connectivity_watch_state, grpc_connectivity_state_notify_on_state_change(
std::move(op->start_connectivity_watch)); &t->connectivity, op->connectivity_state,
} op->on_connectivity_state_change);
if (op->stop_connectivity_watch != nullptr) {
t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
} }
if (op->set_accept_stream) { if (op->set_accept_stream) {
t->accept_stream_cb = op->set_accept_stream_fn; t->accept_stream_cb = op->set_accept_stream_fn;

@ -234,7 +234,8 @@ grpc_json* ChannelNode::RenderJson() {
static_cast<grpc_connectivity_state>(state_field >> 1); static_cast<grpc_connectivity_state>(state_field >> 1);
json = grpc_json_create_child(nullptr, json, "state", nullptr, json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false); GRPC_JSON_OBJECT, false);
grpc_json_create_child(nullptr, json, "state", ConnectivityStateName(state), grpc_json_create_child(nullptr, json, "state",
grpc_connectivity_state_name(state),
GRPC_JSON_STRING, false); GRPC_JSON_STRING, false);
json = data; json = data;
} }

@ -32,7 +32,6 @@
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/lame_client.h" #include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/static_metadata.h"
namespace grpc_core { namespace grpc_core {
@ -40,19 +39,15 @@ namespace grpc_core {
namespace { namespace {
struct CallData { struct CallData {
CallCombiner* call_combiner; grpc_core::CallCombiner* call_combiner;
grpc_linked_mdelem status; grpc_linked_mdelem status;
grpc_linked_mdelem details; grpc_linked_mdelem details;
Atomic<bool> filled_metadata; grpc_core::Atomic<bool> filled_metadata;
}; };
struct ChannelData { struct ChannelData {
ChannelData() : state_tracker("lame_channel", GRPC_CHANNEL_SHUTDOWN) {}
grpc_status_code error_code; grpc_status_code error_code;
const char* error_message; const char* error_message;
Mutex mu;
ConnectivityStateTracker state_tracker;
}; };
static void fill_metadata(grpc_call_element* elem, grpc_metadata_batch* mdb) { static void fill_metadata(grpc_call_element* elem, grpc_metadata_batch* mdb) {
@ -99,16 +94,10 @@ static void lame_get_channel_info(grpc_channel_element* elem,
static void lame_start_transport_op(grpc_channel_element* elem, static void lame_start_transport_op(grpc_channel_element* elem,
grpc_transport_op* op) { grpc_transport_op* op) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); if (op->on_connectivity_state_change) {
{ GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN);
MutexLock lock(&chand->mu); *op->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
if (op->start_connectivity_watch != nullptr) { GRPC_CLOSURE_SCHED(op->on_connectivity_state_change, GRPC_ERROR_NONE);
chand->state_tracker.AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
}
if (op->stop_connectivity_watch != nullptr) {
chand->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
}
} }
if (op->send_ping.on_initiate != nullptr) { if (op->send_ping.on_initiate != nullptr) {
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
@ -143,14 +132,10 @@ static grpc_error* lame_init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) { grpc_channel_element_args* args) {
GPR_ASSERT(args->is_first); GPR_ASSERT(args->is_first);
GPR_ASSERT(args->is_last); GPR_ASSERT(args->is_last);
new (elem->channel_data) ChannelData;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
static void lame_destroy_channel_elem(grpc_channel_element* elem) { static void lame_destroy_channel_elem(grpc_channel_element* elem) {}
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
}
} // namespace } // namespace

@ -105,6 +105,7 @@ struct channel_registered_method {
struct channel_data { struct channel_data {
grpc_server* server; grpc_server* server;
grpc_connectivity_state connectivity_state;
grpc_channel* channel; grpc_channel* channel;
size_t cq_idx; size_t cq_idx;
/* linked list of all channels on a server */ /* linked list of all channels on a server */
@ -114,6 +115,7 @@ struct channel_data {
uint32_t registered_method_slots; uint32_t registered_method_slots;
uint32_t registered_method_max_probes; uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure; grpc_closure finish_destroy_channel_closure;
grpc_closure channel_connectivity_changed;
intptr_t channelz_socket_uuid; intptr_t channelz_socket_uuid;
}; };
@ -456,7 +458,7 @@ static void finish_destroy_channel(void* cd, grpc_error* error) {
server_unref(server); server_unref(server);
} }
static void destroy_channel(channel_data* chand) { static void destroy_channel(channel_data* chand, grpc_error* error) {
if (is_channel_orphaned(chand)) return; if (is_channel_orphaned(chand)) return;
GPR_ASSERT(chand->server != nullptr); GPR_ASSERT(chand->server != nullptr);
orphan_channel(chand); orphan_channel(chand);
@ -465,9 +467,12 @@ static void destroy_channel(channel_data* chand) {
GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure, GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
finish_destroy_channel, chand, grpc_schedule_on_exec_ctx); finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace) &&
gpr_log(GPR_INFO, "Disconnected client"); error != GRPC_ERROR_NONE) {
const char* msg = grpc_error_string(error);
gpr_log(GPR_INFO, "Disconnected client: %s", msg);
} }
GRPC_ERROR_UNREF(error);
grpc_transport_op* op = grpc_transport_op* op =
grpc_make_transport_op(&chand->finish_destroy_channel_closure); grpc_make_transport_op(&chand->finish_destroy_channel_closure);
@ -886,6 +891,24 @@ static void accept_stream(void* cd, grpc_transport* transport,
grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata); grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
} }
static void channel_connectivity_changed(void* cd, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(cd);
grpc_server* server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->on_connectivity_state_change = &chand->channel_connectivity_changed;
op->connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
op);
} else {
gpr_mu_lock(&server->mu_global);
destroy_channel(chand, GRPC_ERROR_REF(error));
gpr_mu_unlock(&server->mu_global);
GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
}
}
static grpc_error* server_init_call_elem(grpc_call_element* elem, static grpc_error* server_init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) { const grpc_call_element_args* args) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@ -912,6 +935,10 @@ static grpc_error* server_init_channel_elem(grpc_channel_element* elem,
chand->channel = nullptr; chand->channel = nullptr;
chand->next = chand->prev = chand; chand->next = chand->prev = chand;
chand->registered_methods = nullptr; chand->registered_methods = nullptr;
chand->connectivity_state = GRPC_CHANNEL_IDLE;
GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
channel_connectivity_changed, chand,
grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
@ -1122,31 +1149,6 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
*pollsets = server->pollsets; *pollsets = server->pollsets;
} }
class ConnectivityWatcher
: public grpc_core::AsyncConnectivityStateWatcherInterface {
public:
explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) {
GRPC_CHANNEL_INTERNAL_REF(chand_->channel, "connectivity");
}
~ConnectivityWatcher() {
GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel, "connectivity");
}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
// Don't do anything until we are being shut down.
if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
// Shut down channel.
grpc_server* server = chand_->server;
gpr_mu_lock(&server->mu_global);
destroy_channel(chand_);
gpr_mu_unlock(&server->mu_global);
}
channel_data* chand_;
};
void grpc_server_setup_transport( void grpc_server_setup_transport(
grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset, grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
const grpc_channel_args* args, const grpc_channel_args* args,
@ -1239,12 +1241,13 @@ void grpc_server_setup_transport(
chand->next->prev = chand->prev->next = chand; chand->next->prev = chand->prev->next = chand;
gpr_mu_unlock(&s->mu_global); gpr_mu_unlock(&s->mu_global);
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
op = grpc_make_transport_op(nullptr); op = grpc_make_transport_op(nullptr);
op->set_accept_stream = true; op->set_accept_stream = true;
op->set_accept_stream_fn = accept_stream; op->set_accept_stream_fn = accept_stream;
op->set_accept_stream_user_data = chand; op->set_accept_stream_user_data = chand;
op->start_connectivity_watch.reset( op->on_connectivity_state_change = &chand->channel_connectivity_changed;
grpc_core::New<ConnectivityWatcher>(chand)); op->connectivity_state = &chand->connectivity_state;
if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
op->disconnect_with_error = op->disconnect_with_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");

@ -26,13 +26,9 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/exec_ctx.h" grpc_core::TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
namespace grpc_core { const char* grpc_connectivity_state_name(grpc_connectivity_state state) {
TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
const char* ConnectivityStateName(grpc_connectivity_state state) {
switch (state) { switch (state) {
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE:
return "IDLE"; return "IDLE";
@ -48,121 +44,122 @@ const char* ConnectivityStateName(grpc_connectivity_state state) {
GPR_UNREACHABLE_CODE(return "UNKNOWN"); GPR_UNREACHABLE_CODE(return "UNKNOWN");
} }
// void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
// AsyncConnectivityStateWatcherInterface grpc_connectivity_state init_state,
// const char* name) {
gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state);
// A fire-and-forget class to asynchronously deliver a connectivity tracker->watchers = nullptr;
// state notification to a watcher. tracker->name = gpr_strdup(name);
class AsyncConnectivityStateWatcherInterface::Notifier {
public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
grpc_connectivity_state state)
: watcher_(std::move(watcher)), state_(state) {
GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
private:
static void SendNotification(void* arg, grpc_error* ignored) {
Notifier* self = static_cast<Notifier*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s",
self->watcher_.get(), ConnectivityStateName(self->state_));
}
self->watcher_->OnConnectivityStateChange(self->state_);
Delete(self);
}
RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher_;
const grpc_connectivity_state state_;
grpc_closure closure_;
};
void AsyncConnectivityStateWatcherInterface::Notify(
grpc_connectivity_state state) {
New<Notifier>(Ref(), state); // Deletes itself when done.
} }
// void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) {
// ConnectivityStateTracker grpc_error* error;
// grpc_connectivity_state_watcher* w;
while ((w = tracker->watchers)) {
ConnectivityStateTracker::~ConnectivityStateTracker() { tracker->watchers = w->next;
grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
if (current_state == GRPC_CHANNEL_SHUTDOWN) return; if (GRPC_CHANNEL_SHUTDOWN != *w->current) {
for (const auto& p : watchers_) { *w->current = GRPC_CHANNEL_SHUTDOWN;
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { error = GRPC_ERROR_NONE;
gpr_log(GPR_INFO, } else {
"ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s", error =
name_, this, p.first, ConnectivityStateName(current_state), GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner");
ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
} }
p.second->Notify(GRPC_CHANNEL_SHUTDOWN); GRPC_CLOSURE_SCHED(w->notify, error);
gpr_free(w);
} }
gpr_free(tracker->name);
} }
void ConnectivityStateTracker::AddWatcher( grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state initial_state, grpc_connectivity_state_tracker* tracker) {
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) { grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
gpr_atm_no_barrier_load(&tracker->current_state_atm));
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_, gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
this, watcher.get()); grpc_connectivity_state_name(cur));
}
grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
if (initial_state != current_state) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO,
"ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
name_, this, watcher.get(), ConnectivityStateName(initial_state),
ConnectivityStateName(current_state));
}
watcher->Notify(current_state);
} }
watchers_.insert(MakePair(watcher.get(), std::move(watcher))); return cur;
} }
void ConnectivityStateTracker::RemoveWatcher( bool grpc_connectivity_state_has_watchers(
ConnectivityStateWatcherInterface* watcher) { grpc_connectivity_state_tracker* connectivity_state) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { return connectivity_state->watchers != nullptr;
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p",
name_, this, watcher);
}
watchers_.erase(watcher);
} }
void ConnectivityStateTracker::SetState(grpc_connectivity_state state, bool grpc_connectivity_state_notify_on_state_change(
const char* reason) { grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current,
grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED); grpc_closure* notify) {
if (state == current_state) return; grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
gpr_atm_no_barrier_load(&tracker->current_state_atm));
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s)", name_, if (current == nullptr) {
this, ConnectivityStateName(current_state), gpr_log(GPR_INFO, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
ConnectivityStateName(state), reason); tracker->name, notify);
} else {
gpr_log(GPR_INFO, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
tracker->name, grpc_connectivity_state_name(*current),
grpc_connectivity_state_name(cur), notify);
}
} }
state_.Store(state, MemoryOrder::RELAXED); if (current == nullptr) {
for (const auto& p : watchers_) { grpc_connectivity_state_watcher* w = tracker->watchers;
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { if (w != nullptr && w->notify == notify) {
gpr_log(GPR_INFO, GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
"ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s", tracker->watchers = w->next;
name_, this, p.first, ConnectivityStateName(current_state), gpr_free(w);
ConnectivityStateName(state)); return false;
}
while (w != nullptr) {
grpc_connectivity_state_watcher* rm_candidate = w->next;
if (rm_candidate != nullptr && rm_candidate->notify == notify) {
GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
w->next = w->next->next;
gpr_free(rm_candidate);
return false;
}
w = w->next;
} }
p.second->Notify(state); return false;
} else {
if (cur != *current) {
*current = cur;
GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_NONE);
} else {
grpc_connectivity_state_watcher* w =
static_cast<grpc_connectivity_state_watcher*>(gpr_malloc(sizeof(*w)));
w->current = current;
w->notify = notify;
w->next = tracker->watchers;
tracker->watchers = w;
}
return cur == GRPC_CHANNEL_IDLE;
} }
// If the new state is SHUTDOWN, orphan all of the watchers. This
// avoids the need for the callers to explicitly cancel them.
if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear();
} }
grpc_connectivity_state ConnectivityStateTracker::state() const { void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
grpc_connectivity_state state = state_.Load(MemoryOrder::RELAXED); grpc_connectivity_state state,
const char* reason) {
grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
gpr_atm_no_barrier_load(&tracker->current_state_atm));
grpc_connectivity_state_watcher* w;
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s", gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s]", tracker, tracker->name,
name_, this, ConnectivityStateName(state)); grpc_connectivity_state_name(cur),
grpc_connectivity_state_name(state), reason);
}
if (cur == state) {
return;
}
GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN);
gpr_atm_no_barrier_store(&tracker->current_state_atm, state);
while ((w = tracker->watchers) != nullptr) {
*w->current = state;
tracker->watchers = w->next;
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify);
}
GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_NONE);
gpr_free(w);
} }
return state;
} }
} // namespace grpc_core

@ -22,98 +22,58 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
namespace grpc_core { typedef struct grpc_connectivity_state_watcher {
/** we keep watchers in a linked list */
extern TraceFlag grpc_connectivity_state_trace; struct grpc_connectivity_state_watcher* next;
/** closure to notify on change */
// Enum to string conversion. grpc_closure* notify;
const char* ConnectivityStateName(grpc_connectivity_state state); /** the current state as believed by the watcher */
grpc_connectivity_state* current;
// Interface for watching connectivity state. } grpc_connectivity_state_watcher;
// Subclasses must implement the Notify() method.
// typedef struct {
// Note: Most callers will want to use /** current grpc_connectivity_state */
// AsyncConnectivityStateWatcherInterface instead. gpr_atm current_state_atm;
class ConnectivityStateWatcherInterface /** all our watchers */
: public InternallyRefCounted<ConnectivityStateWatcherInterface> { grpc_connectivity_state_watcher* watchers;
public: /** a name to help debugging */
virtual ~ConnectivityStateWatcherInterface() = default; char* name;
} grpc_connectivity_state_tracker;
// Notifies the watcher that the state has changed to new_state.
virtual void Notify(grpc_connectivity_state new_state) GRPC_ABSTRACT; extern grpc_core::TraceFlag grpc_connectivity_state_trace;
void Orphan() override { Unref(); } /** enum --> string conversion */
const char* grpc_connectivity_state_name(grpc_connectivity_state state);
GRPC_ABSTRACT_BASE_CLASS
}; void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
grpc_connectivity_state init_state,
// An alternative watcher interface that performs notifications via an const char* name);
// asynchronous callback scheduled on the ExecCtx. void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker);
// Subclasses must implement the OnConnectivityStateChange() method.
class AsyncConnectivityStateWatcherInterface /** Set connectivity state; not thread safe; access must be serialized with an
: public ConnectivityStateWatcherInterface { * external lock */
public: void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
virtual ~AsyncConnectivityStateWatcherInterface() = default; grpc_connectivity_state state,
const char* reason);
// Schedules a closure on the ExecCtx to invoke
// OnConnectivityStateChange() asynchronously. /** Return true if this connectivity state has watchers.
void Notify(grpc_connectivity_state new_state) override final; Access must be serialized with an external lock. */
bool grpc_connectivity_state_has_watchers(
protected: grpc_connectivity_state_tracker* tracker);
class Notifier;
/** Return the last seen connectivity state. No need to synchronize access. */
// Invoked asynchronously when Notify() is called. grpc_connectivity_state grpc_connectivity_state_check(
virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) grpc_connectivity_state_tracker* tracker);
GRPC_ABSTRACT;
}; /** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that
// Tracks connectivity state. Maintains a list of watchers that are case).
// notified whenever the state changes. Access must be serialized with an external lock. */
class ConnectivityStateTracker { bool grpc_connectivity_state_notify_on_state_change(
public: grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current,
ConnectivityStateTracker(const char* name, grpc_closure* notify);
grpc_connectivity_state state = GRPC_CHANNEL_IDLE)
: name_(name), state_(state) {}
~ConnectivityStateTracker();
// Adds a watcher.
// If the current state is different than initial_state, the watcher
// will be notified immediately. Otherwise, it will be notified
// whenever the state changes.
// Not thread safe; access must be serialized with an external lock.
void AddWatcher(grpc_connectivity_state initial_state,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
// Removes a watcher. The watcher will be orphaned.
// Not thread safe; access must be serialized with an external lock.
void RemoveWatcher(ConnectivityStateWatcherInterface* watcher);
// Sets connectivity state.
// Not thread safe; access must be serialized with an external lock.
void SetState(grpc_connectivity_state state, const char* reason);
// Gets the current state.
// Thread safe; no need to use an external lock.
grpc_connectivity_state state() const;
private:
const char* name_;
Atomic<grpc_connectivity_state> state_;
// TODO(roth): This could be a set instead of a map if we had a set
// implementation.
Map<ConnectivityStateWatcherInterface*,
OrphanablePtr<ConnectivityStateWatcherInterface>>
watchers_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_TRANSPORT_CONNECTIVITY_STATE_H */ #endif /* GRPC_CORE_LIB_TRANSPORT_CONNECTIVITY_STATE_H */

@ -25,7 +25,6 @@
#include "src/core/lib/channel/context.h" #include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/arena.h" #include "src/core/lib/gprpp/arena.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"
@ -33,7 +32,6 @@
#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/byte_stream.h" #include "src/core/lib/transport/byte_stream.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/metadata_batch.h"
/* Minimum and maximum protocol accepted versions. */ /* Minimum and maximum protocol accepted versions. */
@ -322,11 +320,8 @@ typedef struct grpc_transport_op {
/** Called when processing of this op is done. */ /** Called when processing of this op is done. */
grpc_closure* on_consumed = nullptr; grpc_closure* on_consumed = nullptr;
/** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
grpc_core::OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface> grpc_closure* on_connectivity_state_change = nullptr;
start_connectivity_watch; grpc_connectivity_state* connectivity_state = nullptr;
grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
grpc_core::ConnectivityStateWatcherInterface* stop_connectivity_watch =
nullptr;
/** should the transport be disconnected /** should the transport be disconnected
* Error contract: the transport that gets this op must cause * Error contract: the transport that gets this op must cause
* disconnect_with_error to be unref'ed after processing it */ * disconnect_with_error to be unref'ed after processing it */

@ -134,22 +134,19 @@ char* grpc_transport_op_string(grpc_transport_op* op) {
gpr_strvec b; gpr_strvec b;
gpr_strvec_init(&b); gpr_strvec_init(&b);
if (op->start_connectivity_watch != nullptr) { if (op->on_connectivity_state_change != nullptr) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = false; first = false;
gpr_asprintf( if (op->connectivity_state != nullptr) {
&tmp, "START_CONNECTIVITY_WATCH:watcher=%p:from=%s", gpr_asprintf(&tmp, "ON_CONNECTIVITY_STATE_CHANGE:p=%p:from=%s",
op->start_connectivity_watch.get(), op->on_connectivity_state_change,
grpc_core::ConnectivityStateName(op->start_connectivity_watch_state)); grpc_connectivity_state_name(*op->connectivity_state));
gpr_strvec_add(&b, tmp); gpr_strvec_add(&b, tmp);
} } else {
gpr_asprintf(&tmp, "ON_CONNECTIVITY_STATE_CHANGE:p=%p:unsubscribe",
if (op->stop_connectivity_watch != nullptr) { op->on_connectivity_state_change);
if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); gpr_strvec_add(&b, tmp);
first = false; }
gpr_asprintf(&tmp, "STOP_CONNECTIVITY_WATCH:watcher=%p",
op->stop_connectivity_watch);
gpr_strvec_add(&b, tmp);
} }
if (op->disconnect_with_error != GRPC_ERROR_NONE) { if (op->disconnect_with_error != GRPC_ERROR_NONE) {

@ -28,27 +28,31 @@
#include "test/core/end2end/cq_verifier.h" #include "test/core/end2end/cq_verifier.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
class Watcher : public grpc_core::ConnectivityStateWatcherInterface { grpc_closure transport_op_cb;
public:
void Notify(grpc_connectivity_state new_state) override {
GPR_ASSERT(new_state == GRPC_CHANNEL_SHUTDOWN);
}
};
static void* tag(intptr_t x) { return (void*)x; } static void* tag(intptr_t x) { return (void*)x; }
static grpc_closure transport_op_cb; void verify_connectivity(void* arg, grpc_error* error) {
grpc_connectivity_state* state = static_cast<grpc_connectivity_state*>(arg);
GPR_ASSERT(GRPC_CHANNEL_SHUTDOWN == *state);
GPR_ASSERT(error == GRPC_ERROR_NONE);
}
static void do_nothing(void* arg, grpc_error* error) {} void do_nothing(void* arg, grpc_error* error) {}
void test_transport_op(grpc_channel* channel) { void test_transport_op(grpc_channel* channel) {
grpc_transport_op* op;
grpc_channel_element* elem;
grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch = GRPC_CLOSURE_INIT(&transport_op_cb, verify_connectivity, &state,
grpc_core::OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface>( grpc_schedule_on_exec_ctx);
grpc_core::New<Watcher>());
grpc_channel_element* elem = op = grpc_make_transport_op(nullptr);
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); op->on_connectivity_state_change = &transport_op_cb;
op->connectivity_state = &state;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
elem->filter->start_transport_op(elem, op); elem->filter->start_transport_op(elem, op);
GRPC_CLOSURE_INIT(&transport_op_cb, do_nothing, nullptr, GRPC_CLOSURE_INIT(&transport_op_cb, do_nothing, nullptr,

@ -51,9 +51,6 @@ grpc_cc_test(
grpc_cc_test( grpc_cc_test(
name = "connectivity_state_test", name = "connectivity_state_test",
srcs = ["connectivity_state_test.cc"], srcs = ["connectivity_state_test.cc"],
external_deps = [
"gtest",
],
language = "C++", language = "C++",
deps = [ deps = [
"//:gpr", "//:gpr",

@ -20,146 +20,124 @@
#include <string.h> #include <string.h>
#include <gtest/gtest.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
#include "test/core/util/tracer_util.h" #include "test/core/util/tracer_util.h"
namespace grpc_core { #define THE_ARG ((void*)(size_t)0xcafebabe)
namespace {
int g_counter;
TEST(ConnectivityStateName, Basic) { static void must_succeed(void* arg, grpc_error* error) {
EXPECT_STREQ("IDLE", ConnectivityStateName(GRPC_CHANNEL_IDLE)); GPR_ASSERT(error == GRPC_ERROR_NONE);
EXPECT_STREQ("CONNECTING", ConnectivityStateName(GRPC_CHANNEL_CONNECTING)); GPR_ASSERT(arg == THE_ARG);
EXPECT_STREQ("READY", ConnectivityStateName(GRPC_CHANNEL_READY)); g_counter++;
EXPECT_STREQ("TRANSIENT_FAILURE",
ConnectivityStateName(GRPC_CHANNEL_TRANSIENT_FAILURE));
EXPECT_STREQ("SHUTDOWN", ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
} }
class Watcher : public ConnectivityStateWatcherInterface { static void must_fail(void* arg, grpc_error* error) {
public: GPR_ASSERT(error != GRPC_ERROR_NONE);
Watcher(int* count, grpc_connectivity_state* output, GPR_ASSERT(arg == THE_ARG);
bool* destroyed = nullptr) g_counter++;
: count_(count), output_(output), destroyed_(destroyed) {}
~Watcher() {
if (destroyed_ != nullptr) *destroyed_ = true;
}
void Notify(grpc_connectivity_state new_state) override {
++*count_;
*output_ = new_state;
}
private:
int* count_;
grpc_connectivity_state* output_;
bool* destroyed_;
};
TEST(StateTracker, SetAndGetState) {
ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_CONNECTING);
EXPECT_EQ(tracker.state(), GRPC_CHANNEL_CONNECTING);
tracker.SetState(GRPC_CHANNEL_READY, "whee");
EXPECT_EQ(tracker.state(), GRPC_CHANNEL_READY);
} }
TEST(StateTracker, NotificationUponAddingWatcher) { static void test_connectivity_state_name(void) {
int count = 0; gpr_log(GPR_DEBUG, "test_connectivity_state_name");
grpc_connectivity_state state = GRPC_CHANNEL_IDLE; GPR_ASSERT(0 ==
ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_CONNECTING); strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_IDLE), "IDLE"));
tracker.AddWatcher(GRPC_CHANNEL_IDLE, GPR_ASSERT(0 == strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_CONNECTING),
OrphanablePtr<ConnectivityStateWatcherInterface>( "CONNECTING"));
New<Watcher>(&count, &state))); GPR_ASSERT(0 ==
EXPECT_EQ(count, 1); strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_READY), "READY"));
EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING); GPR_ASSERT(
0 == strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_TRANSIENT_FAILURE),
"TRANSIENT_FAILURE"));
GPR_ASSERT(0 == strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_SHUTDOWN),
"SHUTDOWN"));
} }
TEST(StateTracker, NotificationUponStateChange) { static void test_check(void) {
int count = 0; grpc_connectivity_state_tracker tracker;
grpc_connectivity_state state = GRPC_CHANNEL_IDLE; grpc_core::ExecCtx exec_ctx;
ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_IDLE); gpr_log(GPR_DEBUG, "test_check");
tracker.AddWatcher(GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx");
OrphanablePtr<ConnectivityStateWatcherInterface>( GPR_ASSERT(grpc_connectivity_state_check(&tracker) == GRPC_CHANNEL_IDLE);
New<Watcher>(&count, &state))); grpc_connectivity_state_destroy(&tracker);
EXPECT_EQ(count, 0);
EXPECT_EQ(state, GRPC_CHANNEL_IDLE);
tracker.SetState(GRPC_CHANNEL_CONNECTING, "whee");
EXPECT_EQ(count, 1);
EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
} }
TEST(StateTracker, SubscribeThenUnsubscribe) { static void test_subscribe_then_unsubscribe(void) {
int count = 0; grpc_connectivity_state_tracker tracker;
grpc_closure* closure =
GRPC_CLOSURE_CREATE(must_fail, THE_ARG, grpc_schedule_on_exec_ctx);
grpc_connectivity_state state = GRPC_CHANNEL_IDLE; grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
bool destroyed = false; grpc_core::ExecCtx exec_ctx;
ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_IDLE); gpr_log(GPR_DEBUG, "test_subscribe_then_unsubscribe");
ConnectivityStateWatcherInterface* watcher = g_counter = 0;
New<Watcher>(&count, &state, &destroyed); grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx");
tracker.AddWatcher(GRPC_CHANNEL_IDLE, GPR_ASSERT(grpc_connectivity_state_notify_on_state_change(&tracker, &state,
OrphanablePtr<ConnectivityStateWatcherInterface>(watcher)); closure));
// No initial notification, since we started the watch from the grpc_core::ExecCtx::Get()->Flush();
// current state. GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
EXPECT_EQ(count, 0); GPR_ASSERT(g_counter == 0);
EXPECT_EQ(state, GRPC_CHANNEL_IDLE); grpc_connectivity_state_notify_on_state_change(&tracker, nullptr, closure);
// Cancel watch. This should not generate another notification. grpc_core::ExecCtx::Get()->Flush();
tracker.RemoveWatcher(watcher); GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
EXPECT_TRUE(destroyed); GPR_ASSERT(g_counter == 1);
EXPECT_EQ(count, 0);
EXPECT_EQ(state, GRPC_CHANNEL_IDLE); grpc_connectivity_state_destroy(&tracker);
} }
TEST(StateTracker, NotifyShutdownAtDestruction) { static void test_subscribe_then_destroy(void) {
int count = 0; grpc_connectivity_state_tracker tracker;
grpc_closure* closure =
GRPC_CLOSURE_CREATE(must_succeed, THE_ARG, grpc_schedule_on_exec_ctx);
grpc_connectivity_state state = GRPC_CHANNEL_IDLE; grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
{ grpc_core::ExecCtx exec_ctx;
ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_IDLE); gpr_log(GPR_DEBUG, "test_subscribe_then_destroy");
tracker.AddWatcher(GRPC_CHANNEL_IDLE, g_counter = 0;
OrphanablePtr<ConnectivityStateWatcherInterface>( grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx");
New<Watcher>(&count, &state))); GPR_ASSERT(grpc_connectivity_state_notify_on_state_change(&tracker, &state,
// No initial notification, since we started the watch from the closure));
// current state. grpc_core::ExecCtx::Get()->Flush();
EXPECT_EQ(count, 0); GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
EXPECT_EQ(state, GRPC_CHANNEL_IDLE); GPR_ASSERT(g_counter == 0);
} grpc_connectivity_state_destroy(&tracker);
// Upon tracker destruction, we get a notification for SHUTDOWN.
EXPECT_EQ(count, 1); grpc_core::ExecCtx::Get()->Flush();
EXPECT_EQ(state, GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(state == GRPC_CHANNEL_SHUTDOWN);
GPR_ASSERT(g_counter == 1);
} }
TEST(StateTracker, DoNotNotifyShutdownAtDestructionIfAlreadyInShutdown) { static void test_subscribe_with_failure_then_destroy(void) {
int count = 0; grpc_connectivity_state_tracker tracker;
grpc_closure* closure =
GRPC_CLOSURE_CREATE(must_fail, THE_ARG, grpc_schedule_on_exec_ctx);
grpc_connectivity_state state = GRPC_CHANNEL_SHUTDOWN; grpc_connectivity_state state = GRPC_CHANNEL_SHUTDOWN;
{ grpc_core::ExecCtx exec_ctx;
ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_SHUTDOWN); gpr_log(GPR_DEBUG, "test_subscribe_with_failure_then_destroy");
tracker.AddWatcher(GRPC_CHANNEL_SHUTDOWN, g_counter = 0;
OrphanablePtr<ConnectivityStateWatcherInterface>( grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_SHUTDOWN, "xxx");
New<Watcher>(&count, &state))); GPR_ASSERT(0 == grpc_connectivity_state_notify_on_state_change(
// No initial notification, since we started the watch from the &tracker, &state, closure));
// current state. grpc_core::ExecCtx::Get()->Flush();
EXPECT_EQ(count, 0); GPR_ASSERT(state == GRPC_CHANNEL_SHUTDOWN);
EXPECT_EQ(state, GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(g_counter == 0);
} grpc_connectivity_state_destroy(&tracker);
// No additional notification upon tracker destruction, since we were grpc_core::ExecCtx::Get()->Flush();
// already in state SHUTDOWN. GPR_ASSERT(state == GRPC_CHANNEL_SHUTDOWN);
EXPECT_EQ(count, 0); GPR_ASSERT(g_counter == 1);
EXPECT_EQ(state, GRPC_CHANNEL_SHUTDOWN);
} }
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv); grpc::testing::TestEnvironment env(argc, argv);
grpc_init(); grpc_init();
grpc_core::testing::grpc_tracer_enable_flag( grpc_core::testing::grpc_tracer_enable_flag(&grpc_connectivity_state_trace);
&grpc_core::grpc_connectivity_state_trace); test_connectivity_state_name();
::testing::InitGoogleTest(&argc, argv); test_check();
int ret = RUN_ALL_TESTS(); test_subscribe_then_unsubscribe();
test_subscribe_then_destroy();
test_subscribe_with_failure_then_destroy();
grpc_shutdown(); grpc_shutdown();
return ret; return 0;
} }

@ -2917,6 +2917,30 @@
], ],
"uses_polling": false "uses_polling": false
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "transport_connectivity_state_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,
@ -5970,30 +5994,6 @@
], ],
"uses_polling": true "uses_polling": true
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "transport_connectivity_state_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,

Loading…
Cancel
Save