Revert "allow connectivity state watching to work on lame channels (#27747)" (#27766)

This reverts commit a2bd7b8440.
reviewable/pr25586/r15^2
Craig Tiller 3 years ago committed by GitHub
parent d6a0c0b8b5
commit 790ce5c97f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 160
      src/core/ext/filters/client_channel/channel_connectivity.cc
  2. 8
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  3. 4
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  4. 2
      test/core/surface/lame_client_test.cc
  5. 18
      test/cpp/end2end/end2end_test.cc

@ -23,17 +23,6 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/lame_client.h"
namespace {
bool IsLameChannel(grpc_channel* channel) {
grpc_channel_element* elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
return elem->filter == &grpc_lame_filter;
}
} // namespace
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel* channel, int try_to_connect) {
@ -46,7 +35,6 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel);
if (GPR_UNLIKELY(client_channel == nullptr)) {
if (IsLameChannel(channel)) return GRPC_CHANNEL_TRANSIENT_FAILURE;
gpr_log(GPR_ERROR,
"grpc_channel_check_connectivity_state called on something that is "
"not a client channel");
@ -59,11 +47,9 @@ int grpc_channel_num_external_connectivity_watchers(grpc_channel* channel) {
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel);
if (client_channel == nullptr) {
if (!IsLameChannel(channel)) {
gpr_log(GPR_ERROR,
"grpc_channel_num_external_connectivity_watchers called on "
"something that is not a client channel");
}
gpr_log(GPR_ERROR,
"grpc_channel_num_external_connectivity_watchers called on "
"something that is not a client channel");
return 0;
}
return client_channel->NumExternalConnectivityWatchers();
@ -76,7 +62,7 @@ int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
namespace grpc_core {
namespace {
class StateWatcher : public DualRefCounted<StateWatcher> {
class StateWatcher {
public:
StateWatcher(grpc_channel* channel, grpc_completion_queue* cq, void* tag,
grpc_connectivity_state last_observed_state,
@ -86,35 +72,16 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
GRPC_CLOSURE_INIT(&on_timeout_, TimeoutComplete, this, nullptr);
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel);
if (client_channel == nullptr) {
// If the target URI used to create the channel was invalid, channel
// stack initialization failed, and that caused us to create a lame
// channel. In that case, connectivity state will never change (it
// will always be TRANSIENT_FAILURE), so we don't actually start a
// watch, but we are hiding that fact from the application.
if (IsLameChannel(channel)) {
// Ref from object creation is held by timer callback.
StartTimer(grpc_timespec_to_millis_round_up(deadline));
return;
}
gpr_log(GPR_ERROR,
"grpc_channel_watch_connectivity_state called on "
"something that is not a client channel");
GPR_ASSERT(false);
}
// Take an addition ref, so we have two (the first one is from the
// creation of this object). One will be held by the timer callback,
// the other by the watcher callback.
Ref().release();
auto* watcher_timer_init_state = new WatcherTimerInitState(
this, grpc_timespec_to_millis_round_up(deadline));
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel);
GPR_ASSERT(client_channel != nullptr);
client_channel->AddExternalConnectivityWatcher(
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &state_,
&on_complete_, watcher_timer_init_state->closure());
}
~StateWatcher() override {
~StateWatcher() {
GRPC_CHANNEL_INTERNAL_UNREF(channel_, "watch_channel_connectivity");
}
@ -133,7 +100,8 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
private:
static void WatcherTimerInit(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<WatcherTimerInitState*>(arg);
self->state_watcher_->StartTimer(self->deadline_);
grpc_timer_init(&self->state_watcher_->timer_, self->deadline_,
&self->state_watcher_->on_timeout_);
delete self;
}
@ -142,46 +110,94 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
grpc_closure closure_;
};
void StartTimer(grpc_millis deadline) {
grpc_timer_init(&timer_, deadline, &on_timeout_);
}
enum CallbackPhase { kWaiting, kReadyToCallBack, kCallingBackAndFinished };
static void WatchComplete(void* arg, grpc_error_handle error) {
// Called when the completion is returned to the CQ.
static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
auto* self = static_cast<StateWatcher*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
bool should_delete = false;
{
MutexLock lock(&self->mu_);
switch (self->phase_) {
case kWaiting:
case kReadyToCallBack:
GPR_UNREACHABLE_CODE(return );
case kCallingBackAndFinished:
should_delete = true;
}
}
grpc_timer_cancel(&self->timer_);
self->Unref();
if (should_delete) delete self;
}
static void TimeoutComplete(void* arg, grpc_error_handle error) {
auto* self = static_cast<StateWatcher*>(arg);
self->timer_fired_ = error == GRPC_ERROR_NONE;
// If this is a client channel (not a lame channel), cancel the watch.
ClientChannel* client_channel =
ClientChannel::GetFromChannel(self->channel_);
if (client_channel != nullptr) {
client_channel->CancelExternalConnectivityWatcher(&self->on_complete_);
void PartlyDone(bool due_to_completion, grpc_error_handle error) {
bool end_op = false;
void* end_op_tag = nullptr;
grpc_error_handle end_op_error = GRPC_ERROR_NONE;
grpc_completion_queue* end_op_cq = nullptr;
grpc_cq_completion* end_op_completion_storage = nullptr;
if (due_to_completion) {
grpc_timer_cancel(&timer_);
} else {
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel_);
GPR_ASSERT(client_channel != nullptr);
client_channel->CancelExternalConnectivityWatcher(&on_complete_);
}
{
MutexLock lock(&mu_);
if (due_to_completion) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
} else {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Timed out waiting for connection state change");
} else if (error == GRPC_ERROR_CANCELLED) {
error = GRPC_ERROR_NONE;
}
}
switch (phase_) {
case kWaiting:
GRPC_ERROR_REF(error);
error_ = error;
phase_ = kReadyToCallBack;
break;
case kReadyToCallBack:
if (error != GRPC_ERROR_NONE) {
GPR_ASSERT(!due_to_completion);
GRPC_ERROR_UNREF(error_);
GRPC_ERROR_REF(error);
error_ = error;
}
phase_ = kCallingBackAndFinished;
end_op = true;
end_op_cq = cq_;
end_op_tag = tag_;
end_op_error = error_;
end_op_completion_storage = &completion_storage_;
break;
case kCallingBackAndFinished:
GPR_UNREACHABLE_CODE(return );
}
}
if (end_op) {
grpc_cq_end_op(end_op_cq, end_op_tag, end_op_error, FinishedCompletion,
this, end_op_completion_storage);
}
self->Unref();
GRPC_ERROR_UNREF(error);
}
// Invoked when both strong refs are released.
void Orphan() override {
WeakRef().release(); // Take a weak ref until completion is finished.
grpc_error_handle error =
timer_fired_ ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Timed out waiting for connection state change")
: GRPC_ERROR_NONE;
grpc_cq_end_op(cq_, tag_, error, FinishedCompletion, this,
&completion_storage_);
static void WatchComplete(void* arg, grpc_error_handle error) {
auto* self = static_cast<StateWatcher*>(arg);
self->PartlyDone(/*due_to_completion=*/true, GRPC_ERROR_REF(error));
}
// Called when the completion is returned to the CQ.
static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
static void TimeoutComplete(void* arg, grpc_error_handle error) {
auto* self = static_cast<StateWatcher*>(arg);
self->WeakUnref();
self->PartlyDone(/*due_to_completion=*/false, GRPC_ERROR_REF(error));
}
grpc_channel* channel_;
@ -196,7 +212,9 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
grpc_timer timer_;
grpc_closure on_timeout_;
bool timer_fired_ = false;
Mutex mu_;
CallbackPhase phase_ ABSL_GUARDED_BY(mu_) = kWaiting;
grpc_error_handle error_ ABSL_GUARDED_BY(mu_) = GRPC_ERROR_NONE;
};
} // namespace

@ -445,13 +445,7 @@ void AresDnsResolver::StartResolvingLocked() {
class AresDnsResolverFactory : public ResolverFactory {
public:
bool IsValidUri(const URI& uri) const override {
if (absl::StripPrefix(uri.path(), "/").empty()) {
gpr_log(GPR_ERROR, "no server name supplied in dns URI");
return false;
}
return true;
}
bool IsValidUri(const URI& /*uri*/) const override { return true; }
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
return MakeOrphanable<AresDnsResolver>(std::move(args));

@ -291,10 +291,6 @@ class NativeDnsResolverFactory : public ResolverFactory {
gpr_log(GPR_ERROR, "authority based dns uri's not supported");
return false;
}
if (absl::StripPrefix(uri.path(), "/").empty()) {
gpr_log(GPR_ERROR, "no server name supplied in dns URI");
return false;
}
return true;
}

@ -85,7 +85,7 @@ int main(int argc, char** argv) {
test_transport_op(chan);
GPR_ASSERT(GRPC_CHANNEL_TRANSIENT_FAILURE ==
GPR_ASSERT(GRPC_CHANNEL_SHUTDOWN ==
grpc_channel_check_connectivity_state(chan, 0));
cq = grpc_completion_queue_create_for_next(nullptr);

@ -1383,24 +1383,6 @@ TEST_P(End2endTest, ChannelStateTimeout) {
}
}
TEST_P(End2endTest, ChannelStateOnLameChannel) {
if ((GetParam().credentials_type != kInsecureCredentialsType) ||
GetParam().inproc) {
return;
}
// Channel using invalid target URI. This creates a lame channel.
auto channel = grpc::CreateChannel("dns:///", InsecureChannelCredentials());
// Channel should immediately report TRANSIENT_FAILURE.
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel->GetState(true));
// And state will never change.
auto state = GRPC_CHANNEL_TRANSIENT_FAILURE;
for (int i = 0; i < 10; ++i) {
channel->WaitForStateChange(
state, std::chrono::system_clock::now() + std::chrono::seconds(1));
state = channel->GetState(false);
}
}
// Talking to a non-existing service.
TEST_P(End2endTest, NonExistingService) {
ResetChannel();

Loading…
Cancel
Save