diff --git a/CMakeLists.txt b/CMakeLists.txt index 518ca81c664..6ce00ecacdf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -427,6 +427,7 @@ add_dependencies(buildtests_c time_averaged_stats_test) add_dependencies(buildtests_c timeout_encoding_test) add_dependencies(buildtests_c timer_heap_test) add_dependencies(buildtests_c timer_list_test) +add_dependencies(buildtests_c transport_connectivity_state_test) add_dependencies(buildtests_c transport_metadata_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_c transport_security_test) @@ -725,7 +726,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx time_change_test) endif() add_dependencies(buildtests_cxx timer_test) -add_dependencies(buildtests_cxx transport_connectivity_state_test) add_dependencies(buildtests_cxx transport_pid_controller_test) add_dependencies(buildtests_cxx transport_security_common_api_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) @@ -9847,6 +9847,37 @@ target_link_libraries(timer_list_test ) +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + +add_executable(transport_connectivity_state_test + test/core/transport/connectivity_state_test.cc +) + + +target_include_directories(transport_connectivity_state_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_UPB_GENERATED_DIR} + PRIVATE ${_gRPC_UPB_GRPC_GENERATED_DIR} + PRIVATE ${_gRPC_UPB_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} +) + +target_link_libraries(transport_connectivity_state_test + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc + gpr +) + + endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) @@ -16661,46 +16692,6 @@ target_link_libraries(timer_test ) -endif (gRPC_BUILD_TESTS) -if (gRPC_BUILD_TESTS) - -add_executable(transport_connectivity_state_test - test/core/transport/connectivity_state_test.cc - third_party/googletest/googletest/src/gtest-all.cc - third_party/googletest/googlemock/src/gmock-all.cc -) - - -target_include_directories(transport_connectivity_state_test - PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} - PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include - PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} - PRIVATE ${_gRPC_CARES_INCLUDE_DIR} - PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} - PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} - PRIVATE ${_gRPC_SSL_INCLUDE_DIR} - PRIVATE ${_gRPC_UPB_GENERATED_DIR} - PRIVATE ${_gRPC_UPB_GRPC_GENERATED_DIR} - PRIVATE ${_gRPC_UPB_INCLUDE_DIR} - PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} - PRIVATE third_party/googletest/googletest/include - PRIVATE third_party/googletest/googletest - PRIVATE third_party/googletest/googlemock/include - PRIVATE third_party/googletest/googlemock - PRIVATE ${_gRPC_PROTO_GENS_DIR} -) - -target_link_libraries(transport_connectivity_state_test - ${_gRPC_PROTOBUF_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util - grpc - gpr - ${_gRPC_GFLAGS_LIBRARIES} -) - - endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index 20e01d6c10c..728402c8bc0 100644 --- a/Makefile +++ b/Makefile @@ -1137,6 +1137,7 @@ time_averaged_stats_test: $(BINDIR)/$(CONFIG)/time_averaged_stats_test timeout_encoding_test: $(BINDIR)/$(CONFIG)/timeout_encoding_test timer_heap_test: $(BINDIR)/$(CONFIG)/timer_heap_test timer_list_test: $(BINDIR)/$(CONFIG)/timer_list_test +transport_connectivity_state_test: $(BINDIR)/$(CONFIG)/transport_connectivity_state_test transport_metadata_test: $(BINDIR)/$(CONFIG)/transport_metadata_test transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test @@ -1293,7 +1294,6 @@ thread_manager_test: $(BINDIR)/$(CONFIG)/thread_manager_test thread_stress_test: $(BINDIR)/$(CONFIG)/thread_stress_test time_change_test: $(BINDIR)/$(CONFIG)/time_change_test timer_test: $(BINDIR)/$(CONFIG)/timer_test -transport_connectivity_state_test: $(BINDIR)/$(CONFIG)/transport_connectivity_state_test transport_pid_controller_test: $(BINDIR)/$(CONFIG)/transport_pid_controller_test transport_security_common_api_test: $(BINDIR)/$(CONFIG)/transport_security_common_api_test writes_per_rpc_test: $(BINDIR)/$(CONFIG)/writes_per_rpc_test @@ -1563,6 +1563,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/timeout_encoding_test \ $(BINDIR)/$(CONFIG)/timer_heap_test \ $(BINDIR)/$(CONFIG)/timer_list_test \ + $(BINDIR)/$(CONFIG)/transport_connectivity_state_test \ $(BINDIR)/$(CONFIG)/transport_metadata_test \ $(BINDIR)/$(CONFIG)/transport_security_test \ $(BINDIR)/$(CONFIG)/udp_server_test \ @@ -1765,7 +1766,6 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/thread_stress_test \ $(BINDIR)/$(CONFIG)/time_change_test \ $(BINDIR)/$(CONFIG)/timer_test \ - $(BINDIR)/$(CONFIG)/transport_connectivity_state_test \ $(BINDIR)/$(CONFIG)/transport_pid_controller_test \ $(BINDIR)/$(CONFIG)/transport_security_common_api_test \ $(BINDIR)/$(CONFIG)/writes_per_rpc_test \ @@ -1936,7 +1936,6 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/thread_stress_test \ $(BINDIR)/$(CONFIG)/time_change_test \ $(BINDIR)/$(CONFIG)/timer_test \ - $(BINDIR)/$(CONFIG)/transport_connectivity_state_test \ $(BINDIR)/$(CONFIG)/transport_pid_controller_test \ $(BINDIR)/$(CONFIG)/transport_security_common_api_test \ $(BINDIR)/$(CONFIG)/writes_per_rpc_test \ @@ -2214,6 +2213,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/timer_heap_test || ( echo test timer_heap_test failed ; exit 1 ) $(E) "[RUN] Testing timer_list_test" $(Q) $(BINDIR)/$(CONFIG)/timer_list_test || ( echo test timer_list_test failed ; exit 1 ) + $(E) "[RUN] Testing transport_connectivity_state_test" + $(Q) $(BINDIR)/$(CONFIG)/transport_connectivity_state_test || ( echo test transport_connectivity_state_test failed ; exit 1 ) $(E) "[RUN] Testing transport_metadata_test" $(Q) $(BINDIR)/$(CONFIG)/transport_metadata_test || ( echo test transport_metadata_test failed ; exit 1 ) $(E) "[RUN] Testing transport_security_test" @@ -2480,8 +2481,6 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/time_change_test || ( echo test time_change_test failed ; exit 1 ) $(E) "[RUN] Testing timer_test" $(Q) $(BINDIR)/$(CONFIG)/timer_test || ( echo test timer_test failed ; exit 1 ) - $(E) "[RUN] Testing transport_connectivity_state_test" - $(Q) $(BINDIR)/$(CONFIG)/transport_connectivity_state_test || ( echo test transport_connectivity_state_test failed ; exit 1 ) $(E) "[RUN] Testing transport_pid_controller_test" $(Q) $(BINDIR)/$(CONFIG)/transport_pid_controller_test || ( echo test transport_pid_controller_test failed ; exit 1 ) $(E) "[RUN] Testing transport_security_common_api_test" @@ -13180,6 +13179,38 @@ endif endif +TRANSPORT_CONNECTIVITY_STATE_TEST_SRC = \ + test/core/transport/connectivity_state_test.cc \ + +TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(TRANSPORT_CONNECTIVITY_STATE_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/transport_connectivity_state_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/transport/connectivity_state_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_transport_connectivity_state_test: $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep) +endif +endif + + TRANSPORT_METADATA_TEST_SRC = \ test/core/transport/metadata_test.cc \ @@ -19921,49 +19952,6 @@ endif endif -TRANSPORT_CONNECTIVITY_STATE_TEST_SRC = \ - test/core/transport/connectivity_state_test.cc \ - -TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(TRANSPORT_CONNECTIVITY_STATE_TEST_SRC)))) -ifeq ($(NO_SECURE),true) - -# You can't build secure targets if you don't have OpenSSL. - -$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: openssl_dep_error - -else - - - - -ifeq ($(NO_PROTOBUF),true) - -# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+. - -$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: protobuf_dep_error - -else - -$(BINDIR)/$(CONFIG)/transport_connectivity_state_test: $(PROTOBUF_DEP) $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a - $(E) "[LD] Linking $@" - $(Q) mkdir -p `dirname $@` - $(Q) $(LDXX) $(LDFLAGS) $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/transport_connectivity_state_test - -endif - -endif - -$(OBJDIR)/$(CONFIG)/test/core/transport/connectivity_state_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a - -deps_transport_connectivity_state_test: $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep) - -ifneq ($(NO_SECURE),true) -ifneq ($(NO_DEPS),true) --include $(TRANSPORT_CONNECTIVITY_STATE_TEST_OBJS:.o=.dep) -endif -endif - - TRANSPORT_PID_CONTROLLER_TEST_SRC = \ test/core/transport/pid_controller_test.cc \ diff --git a/build.yaml b/build.yaml index 81058a6d1c9..f37c59d7ccf 100644 --- a/build.yaml +++ b/build.yaml @@ -3847,6 +3847,15 @@ targets: exclude_iomgrs: - uv uses_polling: false +- name: transport_connectivity_state_test + build: test + language: c + src: + - test/core/transport/connectivity_state_test.cc + deps: + - grpc_test_util + - grpc + - gpr - name: transport_metadata_test build: test language: c @@ -5980,16 +5989,6 @@ targets: - grpc++ - grpc - gpr -- name: transport_connectivity_state_test - gtest: true - build: test - language: c++ - src: - - test/core/transport/connectivity_state_test.cc - deps: - - grpc_test_util - - grpc - - gpr - name: transport_pid_controller_test build: test language: c++ diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 5aed2e113e0..0a9b5ac43fb 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -152,41 +152,43 @@ class ChannelData { SubchannelInterface* subchannel) const; grpc_connectivity_state CheckConnectivityState(bool try_to_connect); - void AddExternalConnectivityWatcher(grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* on_complete, grpc_closure* watcher_timer_init) { - MutexLock lock(&external_watchers_mu_); - // Will be deleted when the watch is complete. - GPR_ASSERT(external_watchers_[on_complete] == nullptr); - external_watchers_[on_complete] = New( - this, pollent, state, on_complete, watcher_timer_init); - } - - void RemoveExternalConnectivityWatcher(grpc_closure* on_complete, - bool cancel) { - MutexLock lock(&external_watchers_mu_); - auto it = external_watchers_.find(on_complete); - if (it != external_watchers_.end()) { - if (cancel) it->second->Cancel(); - external_watchers_.erase(it); - } + // Will delete itself. + New(this, pollent, state, on_complete, + watcher_timer_init); } - int NumExternalConnectivityWatchers() const { - MutexLock lock(&external_watchers_mu_); - return static_cast(external_watchers_.size()); + return external_connectivity_watcher_list_.size(); } private: class SubchannelWrapper; class ClientChannelControlHelper; - // Represents a pending connectivity callback from an external caller - // via grpc_client_channel_watch_connectivity_state(). - class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface { + class ExternalConnectivityWatcher { public: + class WatcherList { + public: + WatcherList() { gpr_mu_init(&mu_); } + ~WatcherList() { gpr_mu_destroy(&mu_); } + + int size() const; + ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const; + void Add(ExternalConnectivityWatcher* watcher); + void Remove(const ExternalConnectivityWatcher* watcher); + + private: + // head_ is guarded by a mutex, since the size() method needs to + // iterate over the list, and it's called from the C-core API + // function grpc_channel_num_external_connectivity_watchers(), which + // is synchronous and therefore cannot run in the combiner. + mutable gpr_mu mu_; + ExternalConnectivityWatcher* head_ = nullptr; + }; + ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* on_complete, @@ -194,23 +196,17 @@ class ChannelData { ~ExternalConnectivityWatcher(); - void Notify(grpc_connectivity_state state) override; - - void Cancel(); - private: - static void AddWatcherLocked(void* arg, grpc_error* ignored); - static void RemoveWatcherLocked(void* arg, grpc_error* ignored); + static void OnWatchCompleteLocked(void* arg, grpc_error* error); + static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored); ChannelData* chand_; grpc_polling_entity pollent_; - grpc_connectivity_state initial_state_; grpc_connectivity_state* state_; grpc_closure* on_complete_; grpc_closure* watcher_timer_init_; - grpc_closure add_closure_; - grpc_closure remove_closure_; - Atomic done_{false}; + grpc_closure my_closure_; + ExternalConnectivityWatcher* next_ = nullptr; }; ChannelData(grpc_channel_element_args* args, grpc_error** error); @@ -277,7 +273,8 @@ class ChannelData { grpc_pollset_set* interested_parties_; RefCountedPtr subchannel_pool_; OrphanablePtr resolving_lb_policy_; - ConnectivityStateTracker state_tracker_; + grpc_connectivity_state_tracker state_tracker_; + ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_; UniquePtr health_check_service_name_; RefCountedPtr saved_service_config_; bool received_first_resolver_result_ = false; @@ -308,13 +305,6 @@ class ChannelData { gpr_mu info_mu_; UniquePtr info_lb_policy_name_; UniquePtr info_service_config_json_; - - // - // Fields guarded by a mutex, since they need to be accessed - // synchronously via grpc_channel_num_external_connectivity_watchers(). - // - mutable Mutex external_watchers_mu_; - Map external_watchers_; }; // @@ -1004,7 +994,8 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { "subchannel %p (connected_subchannel=%p state=%s); " "hopping into combiner", parent_->chand_, parent_.get(), parent_->subchannel_, - connected_subchannel.get(), ConnectivityStateName(new_state)); + connected_subchannel.get(), + grpc_connectivity_state_name(new_state)); } // Will delete itself. New(Ref(), new_state, std::move(connected_subchannel)); @@ -1053,7 +1044,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { self->parent_->parent_->chand_, self->parent_->parent_.get(), self->parent_->parent_->subchannel_, self->connected_subchannel_.get(), - ConnectivityStateName(self->state_), + grpc_connectivity_state_name(self->state_), self->parent_->watcher_.get()); } // Ignore update if the parent WatcherWrapper has been replaced @@ -1114,6 +1105,55 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { RefCountedPtr connected_subchannel_in_data_plane_; }; +// +// ChannelData::ExternalConnectivityWatcher::WatcherList +// + +int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const { + MutexLock lock(&mu_); + int count = 0; + for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) { + ++count; + } + return count; +} + +ChannelData::ExternalConnectivityWatcher* +ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup( + grpc_closure* on_complete) const { + MutexLock lock(&mu_); + ExternalConnectivityWatcher* w = head_; + while (w != nullptr && w->on_complete_ != on_complete) { + w = w->next_; + } + return w; +} + +void ChannelData::ExternalConnectivityWatcher::WatcherList::Add( + ExternalConnectivityWatcher* watcher) { + GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr); + MutexLock lock(&mu_); + GPR_ASSERT(watcher->next_ == nullptr); + watcher->next_ = head_; + head_ = watcher; +} + +void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove( + const ExternalConnectivityWatcher* watcher) { + MutexLock lock(&mu_); + if (watcher == head_) { + head_ = watcher->next_; + return; + } + for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) { + if (w->next_ == watcher) { + w->next_ = w->next_->next_; + return; + } + } + GPR_UNREACHABLE_CODE(return ); +} + // // ChannelData::ExternalConnectivityWatcher // @@ -1124,7 +1164,6 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher( grpc_closure* watcher_timer_init) : chand_(chand), pollent_(pollent), - initial_state_(*state), state_(state), on_complete_(on_complete), watcher_timer_init_(watcher_timer_init) { @@ -1132,7 +1171,7 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher( chand_->interested_parties_); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher"); GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this, + GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this, grpc_combiner_scheduler(chand_->combiner_)), GRPC_ERROR_NONE); } @@ -1144,61 +1183,42 @@ ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() { "ExternalConnectivityWatcher"); } -void ChannelData::ExternalConnectivityWatcher::Notify( - grpc_connectivity_state state) { - bool done = false; - if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED, - MemoryOrder::RELAXED)) { - return; // Already done. - } - // Report new state to the user. - *state_ = state; - GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_NONE); - // Remove external watcher. - chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false); - // Hop back into the combiner to clean up. - // Not needed in state SHUTDOWN, because the tracker will - // automatically remove all watchers in that case. - if (state != GRPC_CHANNEL_SHUTDOWN) { - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, - grpc_combiner_scheduler(chand_->combiner_)), - GRPC_ERROR_NONE); - } -} - -void ChannelData::ExternalConnectivityWatcher::Cancel() { - bool done = false; - if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED, - MemoryOrder::RELAXED)) { - return; // Already done. - } - GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED); - // Hop back into the combiner to clean up. - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, - grpc_combiner_scheduler(chand_->combiner_)), - GRPC_ERROR_NONE); -} - -void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked( - void* arg, grpc_error* ignored) { +void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked( + void* arg, grpc_error* error) { ExternalConnectivityWatcher* self = static_cast(arg); - // This assumes that the closure is scheduled on the ExecCtx scheduler - // and that GRPC_CLOSURE_RUN() will run the closure immediately. - GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE); - // Add new watcher. - self->chand_->state_tracker_.AddWatcher( - self->initial_state_, - OrphanablePtr(self)); + grpc_closure* on_complete = self->on_complete_; + self->chand_->external_connectivity_watcher_list_.Remove(self); + Delete(self); + GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error)); } -void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked( +void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked( void* arg, grpc_error* ignored) { ExternalConnectivityWatcher* self = static_cast(arg); - self->chand_->state_tracker_.RemoveWatcher(self); + if (self->state_ == nullptr) { + // Handle cancellation. + GPR_ASSERT(self->watcher_timer_init_ == nullptr); + ExternalConnectivityWatcher* found = + self->chand_->external_connectivity_watcher_list_.Lookup( + self->on_complete_); + if (found != nullptr) { + grpc_connectivity_state_notify_on_state_change( + &found->chand_->state_tracker_, nullptr, &found->my_closure_); + } + Delete(self); + return; + } + // New watcher. + self->chand_->external_connectivity_watcher_list_.Add(self); + // This assumes that the closure is scheduled on the ExecCtx scheduler + // and that GRPC_CLOSURE_RUN would run the closure immediately. + GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self, + grpc_combiner_scheduler(self->chand_->combiner_)); + grpc_connectivity_state_notify_on_state_change( + &self->chand_->state_tracker_, self->state_, &self->my_closure_); } // @@ -1251,7 +1271,7 @@ class ChannelData::ClientChannelControlHelper ? "" : " (ignoring -- channel shutting down)"; gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_, - ConnectivityStateName(state), picker.get(), extra); + grpc_connectivity_state_name(state), picker.get(), extra); } // Do update only if not shutting down. if (disconnect_error == GRPC_ERROR_NONE) { @@ -1342,13 +1362,14 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error) combiner_(grpc_combiner_create()), interested_parties_(grpc_pollset_set_create()), subchannel_pool_(GetSubchannelPool(args->channel_args)), - state_tracker_("client_channel", GRPC_CHANNEL_IDLE), disconnect_error_(GRPC_ERROR_NONE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p", this, owning_stack_); } // Initialize data members. + grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, + "client_channel"); gpr_mu_init(&info_mu_); // Start backup polling. grpc_client_channel_start_backup_polling(interested_parties_); @@ -1412,6 +1433,7 @@ ChannelData::~ChannelData() { grpc_pollset_set_destroy(interested_parties_); GRPC_COMBINER_UNREF(combiner_, "client_channel"); GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED)); + grpc_connectivity_state_destroy(&state_tracker_); gpr_mu_destroy(&info_mu_); } @@ -1425,7 +1447,7 @@ void ChannelData::UpdateStateAndPickerLocked( received_first_resolver_result_ = false; } // Update connectivity state. - state_tracker_.SetState(state, reason); + grpc_connectivity_state_set(&state_tracker_, state, reason); if (channelz_node_ != nullptr) { channelz_node_->SetConnectivityState(state); channelz_node_->AddTraceEvent( @@ -1714,7 +1736,7 @@ bool ChannelData::ProcessResolverResultLocked( } grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { - if (state_tracker_.state() != GRPC_CHANNEL_READY) { + if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected"); } LoadBalancingPolicy::PickResult result = @@ -1742,12 +1764,12 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { static_cast(op->handler_private.extra_arg); ChannelData* chand = static_cast(elem->channel_data); // Connectivity watch. - if (op->start_connectivity_watch != nullptr) { - chand->state_tracker_.AddWatcher(op->start_connectivity_watch_state, - std::move(op->start_connectivity_watch)); - } - if (op->stop_connectivity_watch != nullptr) { - chand->state_tracker_.RemoveWatcher(op->stop_connectivity_watch); + if (op->on_connectivity_state_change != nullptr) { + grpc_connectivity_state_notify_on_state_change( + &chand->state_tracker_, op->connectivity_state, + op->on_connectivity_state_change); + op->on_connectivity_state_change = nullptr; + op->connectivity_state = nullptr; } // Ping. if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { @@ -1878,7 +1900,7 @@ void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) { grpc_connectivity_state ChannelData::CheckConnectivityState( bool try_to_connect) { - grpc_connectivity_state out = state_tracker_.state(); + grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect"); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this, @@ -3928,13 +3950,6 @@ void grpc_client_channel_watch_connectivity_state( grpc_connectivity_state* state, grpc_closure* closure, grpc_closure* watcher_timer_init) { auto* chand = static_cast(elem->channel_data); - if (state == nullptr) { - // Handle cancellation. - GPR_ASSERT(watcher_timer_init == nullptr); - chand->RemoveExternalConnectivityWatcher(closure, /*cancel=*/true); - return; - } - // Handle addition. return chand->AddExternalConnectivityWatcher(pollent, state, closure, watcher_timer_init); } diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 72bcb404ce0..caaa079dd9b 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -46,12 +46,6 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( int grpc_client_channel_num_external_connectivity_watchers( grpc_channel_element* elem); -// TODO(roth): This function is used both when handling external -// connectivity watchers and for LB policies like grpclb and xds that -// contain nested channels. In the latter case, we ideally want -// something closer to the normal connectivity state tracker API. -// When we have time, consider refactoring this somehow to allow each -// use-case to be handled more cleanly. void grpc_client_channel_watch_connectivity_state( grpc_channel_element* elem, grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* on_complete, diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index a47766031a3..87a76601f02 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -53,8 +53,9 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) { connectivity_state_.Load(MemoryOrder::RELAXED); json = grpc_json_create_child(nullptr, json, "state", nullptr, GRPC_JSON_OBJECT, false); - grpc_json_create_child(nullptr, json, "state", ConnectivityStateName(state), - GRPC_JSON_STRING, false); + grpc_json_create_child(nullptr, json, "state", + grpc_connectivity_state_name(state), GRPC_JSON_STRING, + false); } grpc_json* SubchannelNode::RenderJson() { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index af7632c1728..7f3c2b20f21 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -660,7 +660,7 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, gpr_log(GPR_INFO, "[grpclb %p helper %p] pending child policy %p reports state=%s", parent_.get(), this, parent_->pending_child_policy_.get(), - ConnectivityStateName(state)); + grpc_connectivity_state_name(state)); } if (state != GRPC_CHANNEL_READY) return; grpc_pollset_set_del_pollset_set( @@ -700,7 +700,8 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s passing child picker %p as-is", - parent_.get(), this, ConnectivityStateName(state), picker.get()); + parent_.get(), this, grpc_connectivity_state_name(state), + picker.get()); } parent_->channel_control_helper()->UpdateState(state, std::move(picker)); return; @@ -708,7 +709,8 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, // Cases 2 and 3a: wrap picker from the child in our own picker. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p", - parent_.get(), this, ConnectivityStateName(state), picker.get()); + parent_.get(), this, grpc_connectivity_state_name(state), + picker.get()); } RefCountedPtr client_stats; if (parent_->lb_calld_ != nullptr && diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 000462092ac..b40b0325421 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -294,7 +294,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p selected subchannel connectivity changed to %s", p, - ConnectivityStateName(connectivity_state)); + grpc_connectivity_state_name(connectivity_state)); } // If the new state is anything other than READY and there is a // pending update, switch to the pending update. diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 69b3d6746e0..5f69a657b61 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -379,8 +379,8 @@ void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", p, subchannel(), subchannel_list(), Index(), subchannel_list()->num_subchannels(), - ConnectivityStateName(last_connectivity_state_), - ConnectivityStateName(connectivity_state)); + grpc_connectivity_state_name(last_connectivity_state_), + grpc_connectivity_state_name(connectivity_state)); } // Decide what state to report for aggregation purposes. // If we haven't seen a failure since the last time we were in state diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index b4d3fab91be..34cd0f549fe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -254,7 +254,8 @@ void SubchannelData::Watcher:: subchannel_list_.get(), subchannel_data_->Index(), subchannel_list_->num_subchannels(), subchannel_data_->subchannel_.get(), - ConnectivityStateName(new_state), subchannel_list_->shutting_down(), + grpc_connectivity_state_name(new_state), + subchannel_list_->shutting_down(), subchannel_data_->pending_watcher_); } if (!subchannel_list_->shutting_down() && @@ -317,7 +318,8 @@ void SubchannelDatatracer()->name(), subchannel_list_->policy(), subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_.get(), ConnectivityStateName(connectivity_state_)); + subchannel_.get(), + grpc_connectivity_state_name(connectivity_state_)); } GPR_ASSERT(pending_watcher_ == nullptr); pending_watcher_ = diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index c5b9b3dc437..7c4b1faffe2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -823,7 +823,7 @@ void XdsLb::FallbackHelper::UpdateState(grpc_connectivity_state state, GPR_INFO, "[xdslb %p helper %p] pending fallback policy %p reports state=%s", parent_.get(), this, parent_->pending_fallback_policy_.get(), - ConnectivityStateName(state)); + grpc_connectivity_state_name(state)); } if (state != GRPC_CHANNEL_READY) return; grpc_pollset_set_del_pollset_set( @@ -2502,7 +2502,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { gpr_log(GPR_INFO, "[xdslb %p] Priority %" PRIu32 " (%p) connectivity changed to %s", xds_policy(), priority_, this, - ConnectivityStateName(connectivity_state_)); + grpc_connectivity_state_name(connectivity_state_)); } } @@ -2834,7 +2834,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState( "[xdslb %p helper %p] pending child policy %p reports state=%s", locality_->xds_policy(), this, locality_->pending_child_policy_.get(), - ConnectivityStateName(state)); + grpc_connectivity_state_name(state)); } if (state != GRPC_CHANNEL_READY) return; grpc_pollset_set_del_pollset_set( diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index d997e26f979..f4c0f92b7b6 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -123,7 +123,8 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper gpr_log(GPR_INFO, "resolving_lb=%p helper=%p: pending child policy %p reports " "state=%s", - parent_.get(), this, child_, ConnectivityStateName(state)); + parent_.get(), this, child_, + grpc_connectivity_state_name(state)); } if (state != GRPC_CHANNEL_READY) return; grpc_pollset_set_del_pollset_set( diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 2c3f899d2a7..e30d915d03c 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -95,14 +95,15 @@ ConnectedSubchannel::~ConnectedSubchannel() { GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); } -void ConnectedSubchannel::StartWatch( - grpc_pollset_set* interested_parties, - OrphanablePtr watcher) { +void ConnectedSubchannel::NotifyOnStateChange( + grpc_pollset_set* interested_parties, grpc_connectivity_state* state, + grpc_closure* closure) { grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->start_connectivity_watch = std::move(watcher); - op->start_connectivity_watch_state = GRPC_CHANNEL_READY; + grpc_channel_element* elem; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; op->bind_pollset_set = interested_parties; - grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0); + elem = grpc_channel_stack_element(channel_stack_, 0); elem->filter->start_transport_op(elem, op); } @@ -309,14 +310,19 @@ void SubchannelCall::IncrementRefCount(const grpc_core::DebugLocation& location, // Subchannel::ConnectedSubchannelStateWatcher // -class Subchannel::ConnectedSubchannelStateWatcher - : public AsyncConnectivityStateWatcherInterface { +class Subchannel::ConnectedSubchannelStateWatcher { public: // Must be instantiated while holding c->mu. explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) { // Steal subchannel ref for connecting. GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); + // Start watching for connectivity state changes. + GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this, + grpc_schedule_on_exec_ctx); + c->connected_subchannel_->NotifyOnStateChange(c->pollset_set_, + &pending_connectivity_state_, + &on_connectivity_changed_); } ~ConnectedSubchannelStateWatcher() { @@ -324,41 +330,54 @@ class Subchannel::ConnectedSubchannelStateWatcher } private: - void OnConnectivityStateChange(grpc_connectivity_state new_state) override { - Subchannel* c = subchannel_; - MutexLock lock(&c->mu_); - switch (new_state) { - case GRPC_CHANNEL_TRANSIENT_FAILURE: - case GRPC_CHANNEL_SHUTDOWN: { - if (!c->disconnected_ && c->connected_subchannel_ != nullptr) { - if (grpc_trace_subchannel.enabled()) { - gpr_log(GPR_INFO, - "Connected subchannel %p of subchannel %p has gone into " - "%s. Attempting to reconnect.", - c->connected_subchannel_.get(), c, - ConnectivityStateName(new_state)); - } - c->connected_subchannel_.reset(); - if (c->channelz_node() != nullptr) { - c->channelz_node()->SetChildSocket(nullptr); + static void OnConnectivityChanged(void* arg, grpc_error* error) { + auto* self = static_cast(arg); + Subchannel* c = self->subchannel_; + { + MutexLock lock(&c->mu_); + switch (self->pending_connectivity_state_) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: { + if (!c->disconnected_ && c->connected_subchannel_ != nullptr) { + if (grpc_trace_subchannel.enabled()) { + gpr_log(GPR_INFO, + "Connected subchannel %p of subchannel %p has gone into " + "%s. Attempting to reconnect.", + c->connected_subchannel_.get(), c, + grpc_connectivity_state_name( + self->pending_connectivity_state_)); + } + c->connected_subchannel_.reset(); + if (c->channelz_node() != nullptr) { + c->channelz_node()->SetChildSocket(nullptr); + } + c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE); + c->backoff_begun_ = false; + c->backoff_.Reset(); } - c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE); - c->backoff_begun_ = false; - c->backoff_.Reset(); + break; + } + default: { + // In principle, this should never happen. We should not get + // a callback for READY, because that was the state we started + // this watch from. And a connected subchannel should never go + // from READY to CONNECTING or IDLE. + c->SetConnectivityStateLocked(self->pending_connectivity_state_); + c->connected_subchannel_->NotifyOnStateChange( + nullptr, &self->pending_connectivity_state_, + &self->on_connectivity_changed_); + return; // So we don't delete ourself below. } - break; - } - default: { - // In principle, this should never happen. We should not get - // a callback for READY, because that was the state we started - // this watch from. And a connected subchannel should never go - // from READY to CONNECTING or IDLE. - c->SetConnectivityStateLocked(new_state); } } + // Don't delete until we've released the lock, because this might + // cause the subchannel (which contains the lock) to be destroyed. + Delete(self); } Subchannel* subchannel_; + grpc_closure on_connectivity_changed_; + grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY; }; // @@ -1069,10 +1088,8 @@ bool Subchannel::PublishTransportLocked() { if (channelz_node_ != nullptr) { channelz_node_->SetChildSocket(std::move(socket)); } - // Start watching connected subchannel. - connected_subchannel_->StartWatch( - pollset_set_, OrphanablePtr( - New(this))); + // Instantiate state watcher. Will clean itself up. + New(this); // Report initial state. SetConnectivityStateLocked(GRPC_CHANNEL_READY); return true; diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index d6fb65814b7..c178401ca8a 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -77,9 +77,9 @@ class ConnectedSubchannel : public RefCounted { RefCountedPtr channelz_subchannel); ~ConnectedSubchannel(); - void StartWatch(grpc_pollset_set* interested_parties, - OrphanablePtr watcher); - + void NotifyOnStateChange(grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, + grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); grpc_channel_stack* channel_stack() const { return channel_stack_; } diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index 982fabca833..7ab506429af 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -90,6 +90,10 @@ struct channel_data { grpc_closure start_max_age_timer_after_init; /* Closure to run when the goaway op is finished and the max_age_timer */ grpc_closure start_max_age_grace_timer_after_goaway_op; + /* Closure to run when the channel connectivity state changes */ + grpc_closure channel_connectivity_changed; + /* Records the current connectivity state */ + grpc_connectivity_state connectivity_state; /* Number of active calls */ gpr_atm call_count; /* TODO(zyc): C++lize this state machine */ @@ -216,47 +220,6 @@ static void start_max_idle_timer_after_init(void* arg, grpc_error* error) { "max_age start_max_idle_timer_after_init"); } -namespace grpc_core { - -class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface { - public: - explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) { - GRPC_CHANNEL_STACK_REF(chand_->channel_stack, "max_age conn_watch"); - } - - ~ConnectivityWatcher() { - GRPC_CHANNEL_STACK_UNREF(chand_->channel_stack, "max_age conn_watch"); - } - - private: - void OnConnectivityStateChange(grpc_connectivity_state new_state) override { - if (new_state != GRPC_CHANNEL_SHUTDOWN) return; - { - MutexLock lock(&chand_->max_age_timer_mu); - if (chand_->max_age_timer_pending) { - grpc_timer_cancel(&chand_->max_age_timer); - chand_->max_age_timer_pending = false; - } - if (chand_->max_age_grace_timer_pending) { - grpc_timer_cancel(&chand_->max_age_grace_timer); - chand_->max_age_grace_timer_pending = false; - } - } - /* If there are no active calls, this increasement will cancel - max_idle_timer, and prevent max_idle_timer from being started in the - future. */ - increase_call_count(chand_); - if (gpr_atm_acq_load(&chand_->idle_state) == - MAX_IDLE_STATE_SEEN_EXIT_IDLE) { - grpc_timer_cancel(&chand_->max_idle_timer); - } - } - - channel_data* chand_; -}; - -} // namespace grpc_core - static void start_max_age_timer_after_init(void* arg, grpc_error* error) { channel_data* chand = static_cast(arg); gpr_mu_lock(&chand->max_age_timer_mu); @@ -267,9 +230,8 @@ static void start_max_age_timer_after_init(void* arg, grpc_error* error) { &chand->close_max_age_channel); gpr_mu_unlock(&chand->max_age_timer_mu); grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->start_connectivity_watch.reset( - grpc_core::New(chand)); - op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE; + op->on_connectivity_state_change = &chand->channel_connectivity_changed; + op->connectivity_state = &chand->connectivity_state; grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), op); GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age start_max_age_timer_after_init"); @@ -388,6 +350,35 @@ static void force_close_max_age_channel(void* arg, grpc_error* error) { GRPC_CHANNEL_STACK_UNREF(chand->channel_stack, "max_age max_age_grace_timer"); } +static void channel_connectivity_changed(void* arg, grpc_error* error) { + channel_data* chand = static_cast(arg); + if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->on_connectivity_state_change = &chand->channel_connectivity_changed; + op->connectivity_state = &chand->connectivity_state; + grpc_channel_next_op(grpc_channel_stack_element(chand->channel_stack, 0), + op); + } else { + gpr_mu_lock(&chand->max_age_timer_mu); + if (chand->max_age_timer_pending) { + grpc_timer_cancel(&chand->max_age_timer); + chand->max_age_timer_pending = false; + } + if (chand->max_age_grace_timer_pending) { + grpc_timer_cancel(&chand->max_age_grace_timer); + chand->max_age_grace_timer_pending = false; + } + gpr_mu_unlock(&chand->max_age_timer_mu); + /* If there are no active calls, this increasement will cancel + max_idle_timer, and prevent max_idle_timer from being started in the + future. */ + increase_call_count(chand); + if (gpr_atm_acq_load(&chand->idle_state) == MAX_IDLE_STATE_SEEN_EXIT_IDLE) { + grpc_timer_cancel(&chand->max_idle_timer); + } + } +} + /* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out connection storms. Note that the MAX_CONNECTION_AGE option without jitter would not create connection storms by itself, but if there happened to be a @@ -481,6 +472,9 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem, GRPC_CLOSURE_INIT(&chand->start_max_age_grace_timer_after_goaway_op, start_max_age_grace_timer_after_goaway_op, chand, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed, + channel_connectivity_changed, chand, + grpc_schedule_on_exec_ctx); if (chand->max_connection_age != GRPC_MILLIS_INF_FUTURE) { /* When the channel reaches its max age, we send down an op with diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index acb3b4c2ddd..647442eb290 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -196,6 +196,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0); grpc_chttp2_stream_map_destroy(&stream_map); + grpc_connectivity_state_destroy(&channel_callback.state_tracker); cancel_pings(this, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); @@ -465,8 +466,6 @@ grpc_chttp2_transport::grpc_chttp2_transport( ep(ep), peer_string(grpc_endpoint_get_peer(ep)), resource_user(resource_user), - state_tracker(is_client ? "client_transport" : "server_transport", - GRPC_CHANNEL_READY), is_client(is_client), next_stream_id(is_client ? 1 : 2), deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) { @@ -481,6 +480,9 @@ grpc_chttp2_transport::grpc_chttp2_transport( grpc_chttp2_stream_map_init(&stream_map, 8); grpc_slice_buffer_init(&read_buffer); + grpc_connectivity_state_init( + &channel_callback.state_tracker, GRPC_CHANNEL_READY, + is_client ? "client_transport" : "server_transport"); grpc_slice_buffer_init(&outbuf); if (is_client) { grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string( @@ -768,7 +770,7 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, uint32_t id) { - if (t->accept_stream_cb == nullptr) { + if (t->channel_callback.accept_stream == nullptr) { return nullptr; } // Don't accept the stream if memory quota doesn't allow. Note that we should @@ -786,8 +788,9 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* accepting = nullptr; GPR_ASSERT(t->accepting_stream == nullptr); t->accepting_stream = &accepting; - t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base, - (void*)static_cast(id)); + t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, + &t->base, + (void*)static_cast(id)); t->accepting_stream = nullptr; return accepting; } @@ -1840,8 +1843,9 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { } if (op->set_accept_stream) { - t->accept_stream_cb = op->set_accept_stream_fn; - t->accept_stream_cb_user_data = op->set_accept_stream_user_data; + t->channel_callback.accept_stream = op->set_accept_stream_fn; + t->channel_callback.accept_stream_user_data = + op->set_accept_stream_user_data; } if (op->bind_pollset) { @@ -1857,12 +1861,10 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } - if (op->start_connectivity_watch != nullptr) { - t->state_tracker.AddWatcher(op->start_connectivity_watch_state, - std::move(op->start_connectivity_watch)); - } - if (op->stop_connectivity_watch != nullptr) { - t->state_tracker.RemoveWatcher(op->stop_connectivity_watch); + if (op->on_connectivity_state_change != nullptr) { + grpc_connectivity_state_notify_on_state_change( + &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); } if (op->disconnect_with_error != GRPC_ERROR_NONE) { @@ -2848,7 +2850,8 @@ static void connectivity_state_set(grpc_chttp2_transport* t, const char* reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state)); - t->state_tracker.SetState(state, reason); + grpc_connectivity_state_set(&t->channel_callback.state_tracker, state, + reason); } /******************************************************************************* diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6d13d368be7..314e5fdf650 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -339,13 +339,15 @@ struct grpc_chttp2_transport { publish the accepted server stream */ grpc_chttp2_stream** accepting_stream = nullptr; - /* accept stream callback */ - void (*accept_stream_cb)(void* user_data, grpc_transport* transport, - const void* server_data); - void* accept_stream_cb_user_data; - - /** connectivity tracking */ - grpc_core::ConnectivityStateTracker state_tracker; + struct { + /* accept stream callback */ + void (*accept_stream)(void* user_data, grpc_transport* transport, + const void* server_data); + void* accept_stream_user_data; + + /** connectivity tracking */ + grpc_connectivity_state_tracker state_tracker; + } channel_callback; /** data to write now */ grpc_slice_buffer outbuf; diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index a6d91ef1e05..b1dcbbba29b 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -75,17 +75,17 @@ struct shared_mu { struct inproc_transport { inproc_transport(const grpc_transport_vtable* vtable, shared_mu* mu, bool is_client) - : mu(mu), - is_client(is_client), - state_tracker(is_client ? "inproc_client" : "inproc_server", - GRPC_CHANNEL_READY) { + : mu(mu), is_client(is_client) { base.vtable = vtable; // Start each side of transport with 2 refs since they each have a ref // to the other gpr_ref_init(&refs, 2); + grpc_connectivity_state_init(&connectivity, GRPC_CHANNEL_READY, + is_client ? "inproc_client" : "inproc_server"); } ~inproc_transport() { + grpc_connectivity_state_destroy(&connectivity); if (gpr_unref(&mu->refs)) { mu->~shared_mu(); gpr_free(mu); @@ -111,7 +111,7 @@ struct inproc_transport { shared_mu* mu; gpr_refcount refs; bool is_client; - grpc_core::ConnectivityStateTracker state_tracker; + grpc_connectivity_state_tracker connectivity; void (*accept_stream_cb)(void* user_data, grpc_transport* transport, const void* server_data); void* accept_stream_data; @@ -1090,7 +1090,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, void close_transport_locked(inproc_transport* t) { INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed); - t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, "close transport"); + grpc_connectivity_state_set(&t->connectivity, GRPC_CHANNEL_SHUTDOWN, + "close transport"); if (!t->is_closed) { t->is_closed = true; /* Also end all streams on this transport */ @@ -1109,12 +1110,10 @@ void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { inproc_transport* t = reinterpret_cast(gt); INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", t, op); gpr_mu_lock(&t->mu->mu); - if (op->start_connectivity_watch != nullptr) { - t->state_tracker.AddWatcher(op->start_connectivity_watch_state, - std::move(op->start_connectivity_watch)); - } - if (op->stop_connectivity_watch != nullptr) { - t->state_tracker.RemoveWatcher(op->stop_connectivity_watch); + if (op->on_connectivity_state_change) { + grpc_connectivity_state_notify_on_state_change( + &t->connectivity, op->connectivity_state, + op->on_connectivity_state_change); } if (op->set_accept_stream) { t->accept_stream_cb = op->set_accept_stream_fn; diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 24746e70d0b..fb913beb0f0 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -234,7 +234,8 @@ grpc_json* ChannelNode::RenderJson() { static_cast(state_field >> 1); json = grpc_json_create_child(nullptr, json, "state", nullptr, GRPC_JSON_OBJECT, false); - grpc_json_create_child(nullptr, json, "state", ConnectivityStateName(state), + grpc_json_create_child(nullptr, json, "state", + grpc_connectivity_state_name(state), GRPC_JSON_STRING, false); json = data; } diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 565f386ba74..9208160938e 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -32,7 +32,6 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/lame_client.h" -#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" namespace grpc_core { @@ -40,19 +39,15 @@ namespace grpc_core { namespace { struct CallData { - CallCombiner* call_combiner; + grpc_core::CallCombiner* call_combiner; grpc_linked_mdelem status; grpc_linked_mdelem details; - Atomic filled_metadata; + grpc_core::Atomic filled_metadata; }; struct ChannelData { - ChannelData() : state_tracker("lame_channel", GRPC_CHANNEL_SHUTDOWN) {} - grpc_status_code error_code; const char* error_message; - Mutex mu; - ConnectivityStateTracker state_tracker; }; static void fill_metadata(grpc_call_element* elem, grpc_metadata_batch* mdb) { @@ -99,16 +94,10 @@ static void lame_get_channel_info(grpc_channel_element* elem, static void lame_start_transport_op(grpc_channel_element* elem, grpc_transport_op* op) { - ChannelData* chand = static_cast(elem->channel_data); - { - MutexLock lock(&chand->mu); - if (op->start_connectivity_watch != nullptr) { - chand->state_tracker.AddWatcher(op->start_connectivity_watch_state, - std::move(op->start_connectivity_watch)); - } - if (op->stop_connectivity_watch != nullptr) { - chand->state_tracker.RemoveWatcher(op->stop_connectivity_watch); - } + if (op->on_connectivity_state_change) { + GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN); + *op->connectivity_state = GRPC_CHANNEL_SHUTDOWN; + GRPC_CLOSURE_SCHED(op->on_connectivity_state_change, GRPC_ERROR_NONE); } if (op->send_ping.on_initiate != nullptr) { GRPC_CLOSURE_SCHED( @@ -143,14 +132,10 @@ static grpc_error* lame_init_channel_elem(grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(args->is_first); GPR_ASSERT(args->is_last); - new (elem->channel_data) ChannelData; return GRPC_ERROR_NONE; } -static void lame_destroy_channel_elem(grpc_channel_element* elem) { - ChannelData* chand = static_cast(elem->channel_data); - chand->~ChannelData(); -} +static void lame_destroy_channel_elem(grpc_channel_element* elem) {} } // namespace diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 4fd3b0e99f1..c14b7ba62b3 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -105,6 +105,7 @@ struct channel_registered_method { struct channel_data { grpc_server* server; + grpc_connectivity_state connectivity_state; grpc_channel* channel; size_t cq_idx; /* linked list of all channels on a server */ @@ -114,6 +115,7 @@ struct channel_data { uint32_t registered_method_slots; uint32_t registered_method_max_probes; grpc_closure finish_destroy_channel_closure; + grpc_closure channel_connectivity_changed; intptr_t channelz_socket_uuid; }; @@ -456,7 +458,7 @@ static void finish_destroy_channel(void* cd, grpc_error* error) { server_unref(server); } -static void destroy_channel(channel_data* chand) { +static void destroy_channel(channel_data* chand, grpc_error* error) { if (is_channel_orphaned(chand)) return; GPR_ASSERT(chand->server != nullptr); orphan_channel(chand); @@ -465,9 +467,12 @@ static void destroy_channel(channel_data* chand) { GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure, finish_destroy_channel, chand, grpc_schedule_on_exec_ctx); - if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) { - gpr_log(GPR_INFO, "Disconnected client"); + if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace) && + error != GRPC_ERROR_NONE) { + const char* msg = grpc_error_string(error); + gpr_log(GPR_INFO, "Disconnected client: %s", msg); } + GRPC_ERROR_UNREF(error); grpc_transport_op* op = grpc_make_transport_op(&chand->finish_destroy_channel_closure); @@ -886,6 +891,24 @@ static void accept_stream(void* cd, grpc_transport* transport, grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata); } +static void channel_connectivity_changed(void* cd, grpc_error* error) { + channel_data* chand = static_cast(cd); + grpc_server* server = chand->server; + if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->on_connectivity_state_change = &chand->channel_connectivity_changed; + op->connectivity_state = &chand->connectivity_state; + grpc_channel_next_op(grpc_channel_stack_element( + grpc_channel_get_channel_stack(chand->channel), 0), + op); + } else { + gpr_mu_lock(&server->mu_global); + destroy_channel(chand, GRPC_ERROR_REF(error)); + gpr_mu_unlock(&server->mu_global); + GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity"); + } +} + static grpc_error* server_init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { channel_data* chand = static_cast(elem->channel_data); @@ -912,6 +935,10 @@ static grpc_error* server_init_channel_elem(grpc_channel_element* elem, chand->channel = nullptr; chand->next = chand->prev = chand; chand->registered_methods = nullptr; + chand->connectivity_state = GRPC_CHANNEL_IDLE; + GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed, + channel_connectivity_changed, chand, + grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } @@ -1122,31 +1149,6 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, *pollsets = server->pollsets; } -class ConnectivityWatcher - : public grpc_core::AsyncConnectivityStateWatcherInterface { - public: - explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) { - GRPC_CHANNEL_INTERNAL_REF(chand_->channel, "connectivity"); - } - - ~ConnectivityWatcher() { - GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel, "connectivity"); - } - - private: - void OnConnectivityStateChange(grpc_connectivity_state new_state) override { - // Don't do anything until we are being shut down. - if (new_state != GRPC_CHANNEL_SHUTDOWN) return; - // Shut down channel. - grpc_server* server = chand_->server; - gpr_mu_lock(&server->mu_global); - destroy_channel(chand_); - gpr_mu_unlock(&server->mu_global); - } - - channel_data* chand_; -}; - void grpc_server_setup_transport( grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args, @@ -1239,12 +1241,13 @@ void grpc_server_setup_transport( chand->next->prev = chand->prev->next = chand; gpr_mu_unlock(&s->mu_global); + GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); op = grpc_make_transport_op(nullptr); op->set_accept_stream = true; op->set_accept_stream_fn = accept_stream; op->set_accept_stream_user_data = chand; - op->start_connectivity_watch.reset( - grpc_core::New(chand)); + op->on_connectivity_state_change = &chand->channel_connectivity_changed; + op->connectivity_state = &chand->connectivity_state; if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"); diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index 45ebdca4f0d..bf35fd09def 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -26,13 +26,9 @@ #include #include -#include "src/core/lib/iomgr/exec_ctx.h" +grpc_core::TraceFlag grpc_connectivity_state_trace(false, "connectivity_state"); -namespace grpc_core { - -TraceFlag grpc_connectivity_state_trace(false, "connectivity_state"); - -const char* ConnectivityStateName(grpc_connectivity_state state) { +const char* grpc_connectivity_state_name(grpc_connectivity_state state) { switch (state) { case GRPC_CHANNEL_IDLE: return "IDLE"; @@ -48,121 +44,122 @@ const char* ConnectivityStateName(grpc_connectivity_state state) { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -// -// AsyncConnectivityStateWatcherInterface -// - -// A fire-and-forget class to asynchronously deliver a connectivity -// state notification to a watcher. -class AsyncConnectivityStateWatcherInterface::Notifier { - public: - Notifier(RefCountedPtr watcher, - grpc_connectivity_state state) - : watcher_(std::move(watcher)), state_(state) { - GRPC_CLOSURE_INIT(&closure_, SendNotification, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); - } - - private: - static void SendNotification(void* arg, grpc_error* ignored) { - Notifier* self = static_cast(arg); - if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { - gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s", - self->watcher_.get(), ConnectivityStateName(self->state_)); - } - self->watcher_->OnConnectivityStateChange(self->state_); - Delete(self); - } - - RefCountedPtr watcher_; - const grpc_connectivity_state state_; - grpc_closure closure_; -}; - -void AsyncConnectivityStateWatcherInterface::Notify( - grpc_connectivity_state state) { - New(Ref(), state); // Deletes itself when done. +void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker, + grpc_connectivity_state init_state, + const char* name) { + gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state); + tracker->watchers = nullptr; + tracker->name = gpr_strdup(name); } -// -// ConnectivityStateTracker -// - -ConnectivityStateTracker::~ConnectivityStateTracker() { - grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED); - if (current_state == GRPC_CHANNEL_SHUTDOWN) return; - for (const auto& p : watchers_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { - gpr_log(GPR_INFO, - "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s", - name_, this, p.first, ConnectivityStateName(current_state), - ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN)); +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) { + grpc_error* error; + grpc_connectivity_state_watcher* w; + while ((w = tracker->watchers)) { + tracker->watchers = w->next; + + if (GRPC_CHANNEL_SHUTDOWN != *w->current) { + *w->current = GRPC_CHANNEL_SHUTDOWN; + error = GRPC_ERROR_NONE; + } else { + error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner"); } - p.second->Notify(GRPC_CHANNEL_SHUTDOWN); + GRPC_CLOSURE_SCHED(w->notify, error); + gpr_free(w); } + gpr_free(tracker->name); } -void ConnectivityStateTracker::AddWatcher( - grpc_connectivity_state initial_state, - OrphanablePtr watcher) { +grpc_connectivity_state grpc_connectivity_state_check( + grpc_connectivity_state_tracker* tracker) { + grpc_connectivity_state cur = static_cast( + gpr_atm_no_barrier_load(&tracker->current_state_atm)); if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { - gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_, - this, watcher.get()); - } - grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED); - if (initial_state != current_state) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { - gpr_log(GPR_INFO, - "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s", - name_, this, watcher.get(), ConnectivityStateName(initial_state), - ConnectivityStateName(current_state)); - } - watcher->Notify(current_state); + gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name, + grpc_connectivity_state_name(cur)); } - watchers_.insert(MakePair(watcher.get(), std::move(watcher))); + return cur; } -void ConnectivityStateTracker::RemoveWatcher( - ConnectivityStateWatcherInterface* watcher) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { - gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p", - name_, this, watcher); - } - watchers_.erase(watcher); +bool grpc_connectivity_state_has_watchers( + grpc_connectivity_state_tracker* connectivity_state) { + return connectivity_state->watchers != nullptr; } -void ConnectivityStateTracker::SetState(grpc_connectivity_state state, - const char* reason) { - grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED); - if (state == current_state) return; +bool grpc_connectivity_state_notify_on_state_change( + grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current, + grpc_closure* notify) { + grpc_connectivity_state cur = static_cast( + gpr_atm_no_barrier_load(&tracker->current_state_atm)); if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { - gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s)", name_, - this, ConnectivityStateName(current_state), - ConnectivityStateName(state), reason); + if (current == nullptr) { + gpr_log(GPR_INFO, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, + tracker->name, notify); + } else { + gpr_log(GPR_INFO, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, + tracker->name, grpc_connectivity_state_name(*current), + grpc_connectivity_state_name(cur), notify); + } } - state_.Store(state, MemoryOrder::RELAXED); - for (const auto& p : watchers_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { - gpr_log(GPR_INFO, - "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s", - name_, this, p.first, ConnectivityStateName(current_state), - ConnectivityStateName(state)); + if (current == nullptr) { + grpc_connectivity_state_watcher* w = tracker->watchers; + if (w != nullptr && w->notify == notify) { + GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED); + tracker->watchers = w->next; + gpr_free(w); + return false; + } + while (w != nullptr) { + grpc_connectivity_state_watcher* rm_candidate = w->next; + if (rm_candidate != nullptr && rm_candidate->notify == notify) { + GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED); + w->next = w->next->next; + gpr_free(rm_candidate); + return false; + } + w = w->next; } - p.second->Notify(state); + return false; + } else { + if (cur != *current) { + *current = cur; + GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_NONE); + } else { + grpc_connectivity_state_watcher* w = + static_cast(gpr_malloc(sizeof(*w))); + w->current = current; + w->notify = notify; + w->next = tracker->watchers; + tracker->watchers = w; + } + return cur == GRPC_CHANNEL_IDLE; } - // If the new state is SHUTDOWN, orphan all of the watchers. This - // avoids the need for the callers to explicitly cancel them. - if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear(); } -grpc_connectivity_state ConnectivityStateTracker::state() const { - grpc_connectivity_state state = state_.Load(MemoryOrder::RELAXED); +void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker, + grpc_connectivity_state state, + const char* reason) { + grpc_connectivity_state cur = static_cast( + gpr_atm_no_barrier_load(&tracker->current_state_atm)); + grpc_connectivity_state_watcher* w; if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { - gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s", - name_, this, ConnectivityStateName(state)); + gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s]", tracker, tracker->name, + grpc_connectivity_state_name(cur), + grpc_connectivity_state_name(state), reason); + } + if (cur == state) { + return; + } + GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN); + gpr_atm_no_barrier_store(&tracker->current_state_atm, state); + while ((w = tracker->watchers) != nullptr) { + *w->current = state; + tracker->watchers = w->next; + if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) { + gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify); + } + GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_NONE); + gpr_free(w); } - return state; } - -} // namespace grpc_core diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 41f7bf08e41..0ff1432cb9d 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -22,98 +22,58 @@ #include #include - #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gprpp/atomic.h" -#include "src/core/lib/gprpp/map.h" -#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/closure.h" -namespace grpc_core { - -extern TraceFlag grpc_connectivity_state_trace; - -// Enum to string conversion. -const char* ConnectivityStateName(grpc_connectivity_state state); - -// Interface for watching connectivity state. -// Subclasses must implement the Notify() method. -// -// Note: Most callers will want to use -// AsyncConnectivityStateWatcherInterface instead. -class ConnectivityStateWatcherInterface - : public InternallyRefCounted { - public: - virtual ~ConnectivityStateWatcherInterface() = default; - - // Notifies the watcher that the state has changed to new_state. - virtual void Notify(grpc_connectivity_state new_state) GRPC_ABSTRACT; - - void Orphan() override { Unref(); } - - GRPC_ABSTRACT_BASE_CLASS -}; - -// An alternative watcher interface that performs notifications via an -// asynchronous callback scheduled on the ExecCtx. -// Subclasses must implement the OnConnectivityStateChange() method. -class AsyncConnectivityStateWatcherInterface - : public ConnectivityStateWatcherInterface { - public: - virtual ~AsyncConnectivityStateWatcherInterface() = default; - - // Schedules a closure on the ExecCtx to invoke - // OnConnectivityStateChange() asynchronously. - void Notify(grpc_connectivity_state new_state) override final; - - protected: - class Notifier; - - // Invoked asynchronously when Notify() is called. - virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) - GRPC_ABSTRACT; -}; - -// Tracks connectivity state. Maintains a list of watchers that are -// notified whenever the state changes. -class ConnectivityStateTracker { - public: - ConnectivityStateTracker(const char* name, - grpc_connectivity_state state = GRPC_CHANNEL_IDLE) - : name_(name), state_(state) {} - - ~ConnectivityStateTracker(); - - // Adds a watcher. - // If the current state is different than initial_state, the watcher - // will be notified immediately. Otherwise, it will be notified - // whenever the state changes. - // Not thread safe; access must be serialized with an external lock. - void AddWatcher(grpc_connectivity_state initial_state, - OrphanablePtr watcher); - - // Removes a watcher. The watcher will be orphaned. - // Not thread safe; access must be serialized with an external lock. - void RemoveWatcher(ConnectivityStateWatcherInterface* watcher); - - // Sets connectivity state. - // Not thread safe; access must be serialized with an external lock. - void SetState(grpc_connectivity_state state, const char* reason); - - // Gets the current state. - // Thread safe; no need to use an external lock. - grpc_connectivity_state state() const; - - private: - const char* name_; - Atomic state_; - // TODO(roth): This could be a set instead of a map if we had a set - // implementation. - Map> - watchers_; -}; - -} // namespace grpc_core +typedef struct grpc_connectivity_state_watcher { + /** we keep watchers in a linked list */ + struct grpc_connectivity_state_watcher* next; + /** closure to notify on change */ + grpc_closure* notify; + /** the current state as believed by the watcher */ + grpc_connectivity_state* current; +} grpc_connectivity_state_watcher; + +typedef struct { + /** current grpc_connectivity_state */ + gpr_atm current_state_atm; + /** all our watchers */ + grpc_connectivity_state_watcher* watchers; + /** a name to help debugging */ + char* name; +} grpc_connectivity_state_tracker; + +extern grpc_core::TraceFlag grpc_connectivity_state_trace; + +/** enum --> string conversion */ +const char* grpc_connectivity_state_name(grpc_connectivity_state state); + +void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker, + grpc_connectivity_state init_state, + const char* name); +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker); + +/** Set connectivity state; not thread safe; access must be serialized with an + * external lock */ +void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker, + grpc_connectivity_state state, + const char* reason); + +/** Return true if this connectivity state has watchers. + Access must be serialized with an external lock. */ +bool grpc_connectivity_state_has_watchers( + grpc_connectivity_state_tracker* tracker); + +/** Return the last seen connectivity state. No need to synchronize access. */ +grpc_connectivity_state grpc_connectivity_state_check( + grpc_connectivity_state_tracker* tracker); + +/** Return 1 if the channel should start connecting, 0 otherwise. + If current==NULL cancel notify if it is already queued (success==0 in that + case). + Access must be serialized with an external lock. */ +bool grpc_connectivity_state_notify_on_state_change( + grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current, + grpc_closure* notify); #endif /* GRPC_CORE_LIB_TRANSPORT_CONNECTIVITY_STATE_H */ diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index a46338310ae..c40b290d535 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -25,7 +25,6 @@ #include "src/core/lib/channel/context.h" #include "src/core/lib/gprpp/arena.h" -#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/polling_entity.h" @@ -33,7 +32,6 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/byte_stream.h" -#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata_batch.h" /* Minimum and maximum protocol accepted versions. */ @@ -322,11 +320,8 @@ typedef struct grpc_transport_op { /** Called when processing of this op is done. */ grpc_closure* on_consumed = nullptr; /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ - grpc_core::OrphanablePtr - start_connectivity_watch; - grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE; - grpc_core::ConnectivityStateWatcherInterface* stop_connectivity_watch = - nullptr; + grpc_closure* on_connectivity_state_change = nullptr; + grpc_connectivity_state* connectivity_state = nullptr; /** should the transport be disconnected * Error contract: the transport that gets this op must cause * disconnect_with_error to be unref'ed after processing it */ diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index 34b36c3aec9..8c7db642a54 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -134,22 +134,19 @@ char* grpc_transport_op_string(grpc_transport_op* op) { gpr_strvec b; gpr_strvec_init(&b); - if (op->start_connectivity_watch != nullptr) { + if (op->on_connectivity_state_change != nullptr) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = false; - gpr_asprintf( - &tmp, "START_CONNECTIVITY_WATCH:watcher=%p:from=%s", - op->start_connectivity_watch.get(), - grpc_core::ConnectivityStateName(op->start_connectivity_watch_state)); - gpr_strvec_add(&b, tmp); - } - - if (op->stop_connectivity_watch != nullptr) { - if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); - first = false; - gpr_asprintf(&tmp, "STOP_CONNECTIVITY_WATCH:watcher=%p", - op->stop_connectivity_watch); - gpr_strvec_add(&b, tmp); + if (op->connectivity_state != nullptr) { + gpr_asprintf(&tmp, "ON_CONNECTIVITY_STATE_CHANGE:p=%p:from=%s", + op->on_connectivity_state_change, + grpc_connectivity_state_name(*op->connectivity_state)); + gpr_strvec_add(&b, tmp); + } else { + gpr_asprintf(&tmp, "ON_CONNECTIVITY_STATE_CHANGE:p=%p:unsubscribe", + op->on_connectivity_state_change); + gpr_strvec_add(&b, tmp); + } } if (op->disconnect_with_error != GRPC_ERROR_NONE) { diff --git a/test/core/surface/lame_client_test.cc b/test/core/surface/lame_client_test.cc index 34cafbbd5b7..09c3d431977 100644 --- a/test/core/surface/lame_client_test.cc +++ b/test/core/surface/lame_client_test.cc @@ -28,27 +28,31 @@ #include "test/core/end2end/cq_verifier.h" #include "test/core/util/test_config.h" -class Watcher : public grpc_core::ConnectivityStateWatcherInterface { - public: - void Notify(grpc_connectivity_state new_state) override { - GPR_ASSERT(new_state == GRPC_CHANNEL_SHUTDOWN); - } -}; +grpc_closure transport_op_cb; static void* tag(intptr_t x) { return (void*)x; } -static grpc_closure transport_op_cb; +void verify_connectivity(void* arg, grpc_error* error) { + grpc_connectivity_state* state = static_cast(arg); + GPR_ASSERT(GRPC_CHANNEL_SHUTDOWN == *state); + GPR_ASSERT(error == GRPC_ERROR_NONE); +} -static void do_nothing(void* arg, grpc_error* error) {} +void do_nothing(void* arg, grpc_error* error) {} void test_transport_op(grpc_channel* channel) { + grpc_transport_op* op; + grpc_channel_element* elem; + grpc_connectivity_state state = GRPC_CHANNEL_IDLE; grpc_core::ExecCtx exec_ctx; - grpc_transport_op* op = grpc_make_transport_op(nullptr); - op->start_connectivity_watch = - grpc_core::OrphanablePtr( - grpc_core::New()); - grpc_channel_element* elem = - grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); + + GRPC_CLOSURE_INIT(&transport_op_cb, verify_connectivity, &state, + grpc_schedule_on_exec_ctx); + + op = grpc_make_transport_op(nullptr); + op->on_connectivity_state_change = &transport_op_cb; + op->connectivity_state = &state; + elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); elem->filter->start_transport_op(elem, op); GRPC_CLOSURE_INIT(&transport_op_cb, do_nothing, nullptr, diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD index 12e748f7d10..94ebb602d80 100644 --- a/test/core/transport/BUILD +++ b/test/core/transport/BUILD @@ -51,9 +51,6 @@ grpc_cc_test( grpc_cc_test( name = "connectivity_state_test", srcs = ["connectivity_state_test.cc"], - external_deps = [ - "gtest", - ], language = "C++", deps = [ "//:gpr", diff --git a/test/core/transport/connectivity_state_test.cc b/test/core/transport/connectivity_state_test.cc index e0141cd02d0..26c09a76039 100644 --- a/test/core/transport/connectivity_state_test.cc +++ b/test/core/transport/connectivity_state_test.cc @@ -20,146 +20,124 @@ #include -#include - #include #include "src/core/lib/iomgr/exec_ctx.h" #include "test/core/util/test_config.h" #include "test/core/util/tracer_util.h" -namespace grpc_core { -namespace { +#define THE_ARG ((void*)(size_t)0xcafebabe) + +int g_counter; -TEST(ConnectivityStateName, Basic) { - EXPECT_STREQ("IDLE", ConnectivityStateName(GRPC_CHANNEL_IDLE)); - EXPECT_STREQ("CONNECTING", ConnectivityStateName(GRPC_CHANNEL_CONNECTING)); - EXPECT_STREQ("READY", ConnectivityStateName(GRPC_CHANNEL_READY)); - EXPECT_STREQ("TRANSIENT_FAILURE", - ConnectivityStateName(GRPC_CHANNEL_TRANSIENT_FAILURE)); - EXPECT_STREQ("SHUTDOWN", ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN)); +static void must_succeed(void* arg, grpc_error* error) { + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(arg == THE_ARG); + g_counter++; } -class Watcher : public ConnectivityStateWatcherInterface { - public: - Watcher(int* count, grpc_connectivity_state* output, - bool* destroyed = nullptr) - : count_(count), output_(output), destroyed_(destroyed) {} - - ~Watcher() { - if (destroyed_ != nullptr) *destroyed_ = true; - } - - void Notify(grpc_connectivity_state new_state) override { - ++*count_; - *output_ = new_state; - } - - private: - int* count_; - grpc_connectivity_state* output_; - bool* destroyed_; -}; - -TEST(StateTracker, SetAndGetState) { - ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_CONNECTING); - EXPECT_EQ(tracker.state(), GRPC_CHANNEL_CONNECTING); - tracker.SetState(GRPC_CHANNEL_READY, "whee"); - EXPECT_EQ(tracker.state(), GRPC_CHANNEL_READY); +static void must_fail(void* arg, grpc_error* error) { + GPR_ASSERT(error != GRPC_ERROR_NONE); + GPR_ASSERT(arg == THE_ARG); + g_counter++; } -TEST(StateTracker, NotificationUponAddingWatcher) { - int count = 0; - grpc_connectivity_state state = GRPC_CHANNEL_IDLE; - ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_CONNECTING); - tracker.AddWatcher(GRPC_CHANNEL_IDLE, - OrphanablePtr( - New(&count, &state))); - EXPECT_EQ(count, 1); - EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING); +static void test_connectivity_state_name(void) { + gpr_log(GPR_DEBUG, "test_connectivity_state_name"); + GPR_ASSERT(0 == + strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_IDLE), "IDLE")); + GPR_ASSERT(0 == strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_CONNECTING), + "CONNECTING")); + GPR_ASSERT(0 == + strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_READY), "READY")); + GPR_ASSERT( + 0 == strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_TRANSIENT_FAILURE), + "TRANSIENT_FAILURE")); + GPR_ASSERT(0 == strcmp(grpc_connectivity_state_name(GRPC_CHANNEL_SHUTDOWN), + "SHUTDOWN")); } -TEST(StateTracker, NotificationUponStateChange) { - int count = 0; - grpc_connectivity_state state = GRPC_CHANNEL_IDLE; - ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_IDLE); - tracker.AddWatcher(GRPC_CHANNEL_IDLE, - OrphanablePtr( - New(&count, &state))); - EXPECT_EQ(count, 0); - EXPECT_EQ(state, GRPC_CHANNEL_IDLE); - tracker.SetState(GRPC_CHANNEL_CONNECTING, "whee"); - EXPECT_EQ(count, 1); - EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING); +static void test_check(void) { + grpc_connectivity_state_tracker tracker; + grpc_core::ExecCtx exec_ctx; + gpr_log(GPR_DEBUG, "test_check"); + grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx"); + GPR_ASSERT(grpc_connectivity_state_check(&tracker) == GRPC_CHANNEL_IDLE); + grpc_connectivity_state_destroy(&tracker); } -TEST(StateTracker, SubscribeThenUnsubscribe) { - int count = 0; +static void test_subscribe_then_unsubscribe(void) { + grpc_connectivity_state_tracker tracker; + grpc_closure* closure = + GRPC_CLOSURE_CREATE(must_fail, THE_ARG, grpc_schedule_on_exec_ctx); grpc_connectivity_state state = GRPC_CHANNEL_IDLE; - bool destroyed = false; - ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_IDLE); - ConnectivityStateWatcherInterface* watcher = - New(&count, &state, &destroyed); - tracker.AddWatcher(GRPC_CHANNEL_IDLE, - OrphanablePtr(watcher)); - // No initial notification, since we started the watch from the - // current state. - EXPECT_EQ(count, 0); - EXPECT_EQ(state, GRPC_CHANNEL_IDLE); - // Cancel watch. This should not generate another notification. - tracker.RemoveWatcher(watcher); - EXPECT_TRUE(destroyed); - EXPECT_EQ(count, 0); - EXPECT_EQ(state, GRPC_CHANNEL_IDLE); + grpc_core::ExecCtx exec_ctx; + gpr_log(GPR_DEBUG, "test_subscribe_then_unsubscribe"); + g_counter = 0; + grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx"); + GPR_ASSERT(grpc_connectivity_state_notify_on_state_change(&tracker, &state, + closure)); + grpc_core::ExecCtx::Get()->Flush(); + GPR_ASSERT(state == GRPC_CHANNEL_IDLE); + GPR_ASSERT(g_counter == 0); + grpc_connectivity_state_notify_on_state_change(&tracker, nullptr, closure); + grpc_core::ExecCtx::Get()->Flush(); + GPR_ASSERT(state == GRPC_CHANNEL_IDLE); + GPR_ASSERT(g_counter == 1); + + grpc_connectivity_state_destroy(&tracker); } -TEST(StateTracker, NotifyShutdownAtDestruction) { - int count = 0; +static void test_subscribe_then_destroy(void) { + grpc_connectivity_state_tracker tracker; + grpc_closure* closure = + GRPC_CLOSURE_CREATE(must_succeed, THE_ARG, grpc_schedule_on_exec_ctx); grpc_connectivity_state state = GRPC_CHANNEL_IDLE; - { - ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_IDLE); - tracker.AddWatcher(GRPC_CHANNEL_IDLE, - OrphanablePtr( - New(&count, &state))); - // No initial notification, since we started the watch from the - // current state. - EXPECT_EQ(count, 0); - EXPECT_EQ(state, GRPC_CHANNEL_IDLE); - } - // Upon tracker destruction, we get a notification for SHUTDOWN. - EXPECT_EQ(count, 1); - EXPECT_EQ(state, GRPC_CHANNEL_SHUTDOWN); + grpc_core::ExecCtx exec_ctx; + gpr_log(GPR_DEBUG, "test_subscribe_then_destroy"); + g_counter = 0; + grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx"); + GPR_ASSERT(grpc_connectivity_state_notify_on_state_change(&tracker, &state, + closure)); + grpc_core::ExecCtx::Get()->Flush(); + GPR_ASSERT(state == GRPC_CHANNEL_IDLE); + GPR_ASSERT(g_counter == 0); + grpc_connectivity_state_destroy(&tracker); + + grpc_core::ExecCtx::Get()->Flush(); + GPR_ASSERT(state == GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(g_counter == 1); } -TEST(StateTracker, DoNotNotifyShutdownAtDestructionIfAlreadyInShutdown) { - int count = 0; +static void test_subscribe_with_failure_then_destroy(void) { + grpc_connectivity_state_tracker tracker; + grpc_closure* closure = + GRPC_CLOSURE_CREATE(must_fail, THE_ARG, grpc_schedule_on_exec_ctx); grpc_connectivity_state state = GRPC_CHANNEL_SHUTDOWN; - { - ConnectivityStateTracker tracker("xxx", GRPC_CHANNEL_SHUTDOWN); - tracker.AddWatcher(GRPC_CHANNEL_SHUTDOWN, - OrphanablePtr( - New(&count, &state))); - // No initial notification, since we started the watch from the - // current state. - EXPECT_EQ(count, 0); - EXPECT_EQ(state, GRPC_CHANNEL_SHUTDOWN); - } - // No additional notification upon tracker destruction, since we were - // already in state SHUTDOWN. - EXPECT_EQ(count, 0); - EXPECT_EQ(state, GRPC_CHANNEL_SHUTDOWN); + grpc_core::ExecCtx exec_ctx; + gpr_log(GPR_DEBUG, "test_subscribe_with_failure_then_destroy"); + g_counter = 0; + grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_SHUTDOWN, "xxx"); + GPR_ASSERT(0 == grpc_connectivity_state_notify_on_state_change( + &tracker, &state, closure)); + grpc_core::ExecCtx::Get()->Flush(); + GPR_ASSERT(state == GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(g_counter == 0); + grpc_connectivity_state_destroy(&tracker); + grpc_core::ExecCtx::Get()->Flush(); + GPR_ASSERT(state == GRPC_CHANNEL_SHUTDOWN); + GPR_ASSERT(g_counter == 1); } -} // namespace -} // namespace grpc_core - int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); - grpc_core::testing::grpc_tracer_enable_flag( - &grpc_core::grpc_connectivity_state_trace); - ::testing::InitGoogleTest(&argc, argv); - int ret = RUN_ALL_TESTS(); + grpc_core::testing::grpc_tracer_enable_flag(&grpc_connectivity_state_trace); + test_connectivity_state_name(); + test_check(); + test_subscribe_then_unsubscribe(); + test_subscribe_then_destroy(); + test_subscribe_with_failure_then_destroy(); grpc_shutdown(); - return ret; + return 0; } diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index b4b3822bde5..1f9634e34e3 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2917,6 +2917,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "transport_connectivity_state_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false, @@ -5970,30 +5994,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": true, - "language": "c++", - "name": "transport_connectivity_state_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": true - }, { "args": [], "benchmark": false,