diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index a656294970b..a75b97462e1 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -23,6 +23,17 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/lame_client.h" + +namespace { + +bool IsLameChannel(grpc_channel* channel) { + grpc_channel_element* elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + return elem->filter == &grpc_lame_filter; +} + +} // namespace grpc_connectivity_state grpc_channel_check_connectivity_state( grpc_channel* channel, int try_to_connect) { @@ -35,6 +46,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( grpc_core::ClientChannel* client_channel = grpc_core::ClientChannel::GetFromChannel(channel); if (GPR_UNLIKELY(client_channel == nullptr)) { + if (IsLameChannel(channel)) return GRPC_CHANNEL_TRANSIENT_FAILURE; gpr_log(GPR_ERROR, "grpc_channel_check_connectivity_state called on something that is " "not a client channel"); @@ -47,9 +59,11 @@ int grpc_channel_num_external_connectivity_watchers(grpc_channel* channel) { grpc_core::ClientChannel* client_channel = grpc_core::ClientChannel::GetFromChannel(channel); if (client_channel == nullptr) { - gpr_log(GPR_ERROR, - "grpc_channel_num_external_connectivity_watchers called on " - "something that is not a client channel"); + if (!IsLameChannel(channel)) { + gpr_log(GPR_ERROR, + "grpc_channel_num_external_connectivity_watchers called on " + "something that is not a client channel"); + } return 0; } return client_channel->NumExternalConnectivityWatchers(); @@ -62,7 +76,7 @@ int grpc_channel_support_connectivity_watcher(grpc_channel* channel) { namespace grpc_core { namespace { -class StateWatcher { +class StateWatcher : public DualRefCounted { public: StateWatcher(grpc_channel* channel, grpc_completion_queue* cq, void* tag, grpc_connectivity_state last_observed_state, @@ -72,16 +86,35 @@ class StateWatcher { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr); GRPC_CLOSURE_INIT(&on_timeout_, TimeoutComplete, this, nullptr); + ClientChannel* client_channel = ClientChannel::GetFromChannel(channel); + if (client_channel == nullptr) { + // If the target URI used to create the channel was invalid, channel + // stack initialization failed, and that caused us to create a lame + // channel. In that case, connectivity state will never change (it + // will always be TRANSIENT_FAILURE), so we don't actually start a + // watch, but we are hiding that fact from the application. + if (IsLameChannel(channel)) { + // Ref from object creation is held by timer callback. + StartTimer(grpc_timespec_to_millis_round_up(deadline)); + return; + } + gpr_log(GPR_ERROR, + "grpc_channel_watch_connectivity_state called on " + "something that is not a client channel"); + GPR_ASSERT(false); + } + // Take an addition ref, so we have two (the first one is from the + // creation of this object). One will be held by the timer callback, + // the other by the watcher callback. + Ref().release(); auto* watcher_timer_init_state = new WatcherTimerInitState( this, grpc_timespec_to_millis_round_up(deadline)); - ClientChannel* client_channel = ClientChannel::GetFromChannel(channel); - GPR_ASSERT(client_channel != nullptr); client_channel->AddExternalConnectivityWatcher( grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &state_, &on_complete_, watcher_timer_init_state->closure()); } - ~StateWatcher() { + ~StateWatcher() override { GRPC_CHANNEL_INTERNAL_UNREF(channel_, "watch_channel_connectivity"); } @@ -100,8 +133,7 @@ class StateWatcher { private: static void WatcherTimerInit(void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); - grpc_timer_init(&self->state_watcher_->timer_, self->deadline_, - &self->state_watcher_->on_timeout_); + self->state_watcher_->StartTimer(self->deadline_); delete self; } @@ -110,94 +142,46 @@ class StateWatcher { grpc_closure closure_; }; - enum CallbackPhase { kWaiting, kReadyToCallBack, kCallingBackAndFinished }; + void StartTimer(grpc_millis deadline) { + grpc_timer_init(&timer_, deadline, &on_timeout_); + } - // Called when the completion is returned to the CQ. - static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) { + static void WatchComplete(void* arg, grpc_error_handle error) { auto* self = static_cast(arg); - bool should_delete = false; - { - MutexLock lock(&self->mu_); - switch (self->phase_) { - case kWaiting: - case kReadyToCallBack: - GPR_UNREACHABLE_CODE(return ); - case kCallingBackAndFinished: - should_delete = true; - } + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) { + GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error)); } - if (should_delete) delete self; + grpc_timer_cancel(&self->timer_); + self->Unref(); } - void PartlyDone(bool due_to_completion, grpc_error_handle error) { - bool end_op = false; - void* end_op_tag = nullptr; - grpc_error_handle end_op_error = GRPC_ERROR_NONE; - grpc_completion_queue* end_op_cq = nullptr; - grpc_cq_completion* end_op_completion_storage = nullptr; - if (due_to_completion) { - grpc_timer_cancel(&timer_); - } else { - grpc_core::ClientChannel* client_channel = - grpc_core::ClientChannel::GetFromChannel(channel_); - GPR_ASSERT(client_channel != nullptr); - client_channel->CancelExternalConnectivityWatcher(&on_complete_); - } - { - MutexLock lock(&mu_); - if (due_to_completion) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) { - GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error)); - } - GRPC_ERROR_UNREF(error); - error = GRPC_ERROR_NONE; - } else { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Timed out waiting for connection state change"); - } else if (error == GRPC_ERROR_CANCELLED) { - error = GRPC_ERROR_NONE; - } - } - switch (phase_) { - case kWaiting: - GRPC_ERROR_REF(error); - error_ = error; - phase_ = kReadyToCallBack; - break; - case kReadyToCallBack: - if (error != GRPC_ERROR_NONE) { - GPR_ASSERT(!due_to_completion); - GRPC_ERROR_UNREF(error_); - GRPC_ERROR_REF(error); - error_ = error; - } - phase_ = kCallingBackAndFinished; - end_op = true; - end_op_cq = cq_; - end_op_tag = tag_; - end_op_error = error_; - end_op_completion_storage = &completion_storage_; - break; - case kCallingBackAndFinished: - GPR_UNREACHABLE_CODE(return ); - } - } - if (end_op) { - grpc_cq_end_op(end_op_cq, end_op_tag, end_op_error, FinishedCompletion, - this, end_op_completion_storage); + static void TimeoutComplete(void* arg, grpc_error_handle error) { + auto* self = static_cast(arg); + self->timer_fired_ = error == GRPC_ERROR_NONE; + // If this is a client channel (not a lame channel), cancel the watch. + ClientChannel* client_channel = + ClientChannel::GetFromChannel(self->channel_); + if (client_channel != nullptr) { + client_channel->CancelExternalConnectivityWatcher(&self->on_complete_); } - GRPC_ERROR_UNREF(error); + self->Unref(); } - static void WatchComplete(void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - self->PartlyDone(/*due_to_completion=*/true, GRPC_ERROR_REF(error)); + // Invoked when both strong refs are released. + void Orphan() override { + WeakRef().release(); // Take a weak ref until completion is finished. + grpc_error_handle error = + timer_fired_ ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Timed out waiting for connection state change") + : GRPC_ERROR_NONE; + grpc_cq_end_op(cq_, tag_, error, FinishedCompletion, this, + &completion_storage_); } - static void TimeoutComplete(void* arg, grpc_error_handle error) { + // Called when the completion is returned to the CQ. + static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) { auto* self = static_cast(arg); - self->PartlyDone(/*due_to_completion=*/false, GRPC_ERROR_REF(error)); + self->WeakUnref(); } grpc_channel* channel_; @@ -212,9 +196,7 @@ class StateWatcher { grpc_timer timer_; grpc_closure on_timeout_; - Mutex mu_; - CallbackPhase phase_ ABSL_GUARDED_BY(mu_) = kWaiting; - grpc_error_handle error_ ABSL_GUARDED_BY(mu_) = GRPC_ERROR_NONE; + bool timer_fired_ = false; }; } // namespace diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 75a9feff049..f6e627f07d3 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -445,7 +445,13 @@ void AresDnsResolver::StartResolvingLocked() { class AresDnsResolverFactory : public ResolverFactory { public: - bool IsValidUri(const URI& /*uri*/) const override { return true; } + bool IsValidUri(const URI& uri) const override { + if (absl::StripPrefix(uri.path(), "/").empty()) { + gpr_log(GPR_ERROR, "no server name supplied in dns URI"); + return false; + } + return true; + } OrphanablePtr CreateResolver(ResolverArgs args) const override { return MakeOrphanable(std::move(args)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index b1ef2075dc1..9e705397539 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -291,6 +291,10 @@ class NativeDnsResolverFactory : public ResolverFactory { gpr_log(GPR_ERROR, "authority based dns uri's not supported"); return false; } + if (absl::StripPrefix(uri.path(), "/").empty()) { + gpr_log(GPR_ERROR, "no server name supplied in dns URI"); + return false; + } return true; } diff --git a/test/core/surface/lame_client_test.cc b/test/core/surface/lame_client_test.cc index 7b89ab33490..24bf65d49ac 100644 --- a/test/core/surface/lame_client_test.cc +++ b/test/core/surface/lame_client_test.cc @@ -85,7 +85,7 @@ int main(int argc, char** argv) { test_transport_op(chan); - GPR_ASSERT(GRPC_CHANNEL_SHUTDOWN == + GPR_ASSERT(GRPC_CHANNEL_TRANSIENT_FAILURE == grpc_channel_check_connectivity_state(chan, 0)); cq = grpc_completion_queue_create_for_next(nullptr); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 08980120223..0d375167e1f 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1383,6 +1383,24 @@ TEST_P(End2endTest, ChannelStateTimeout) { } } +TEST_P(End2endTest, ChannelStateOnLameChannel) { + if ((GetParam().credentials_type != kInsecureCredentialsType) || + GetParam().inproc) { + return; + } + // Channel using invalid target URI. This creates a lame channel. + auto channel = grpc::CreateChannel("dns:///", InsecureChannelCredentials()); + // Channel should immediately report TRANSIENT_FAILURE. + EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(true)); + // And state will never change. + auto state = GRPC_CHANNEL_TRANSIENT_FAILURE; + for (int i = 0; i < 10; ++i) { + channel->WaitForStateChange( + state, std::chrono::system_clock::now() + std::chrono::seconds(1)); + state = channel->GetState(false); + } +} + // Talking to a non-existing service. TEST_P(End2endTest, NonExistingService) { ResetChannel();