Use more normal API for client channel connectivity watches from internal code

pull/20406/head
Mark D. Roth 5 years ago
parent 9fc4adae63
commit 490be929b3
  1. 100
      src/core/ext/filters/client_channel/client_channel.cc
  2. 30
      src/core/ext/filters/client_channel/client_channel.h
  3. 94
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  4. 7
      src/core/lib/transport/connectivity_state.cc
  5. 8
      src/core/lib/transport/connectivity_state.h

@ -179,9 +179,17 @@ class ChannelData {
return static_cast<int>(external_watchers_.size()); return static_cast<int>(external_watchers_.size());
} }
void AddConnectivityWatcher(
grpc_connectivity_state initial_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher);
void RemoveConnectivityWatcher(
AsyncConnectivityStateWatcherInterface* watcher);
private: private:
class SubchannelWrapper; class SubchannelWrapper;
class ClientChannelControlHelper; class ClientChannelControlHelper;
class ConnectivityWatcherAdder;
class ConnectivityWatcherRemover;
// Represents a pending connectivity callback from an external caller // Represents a pending connectivity callback from an external caller
// via grpc_client_channel_watch_connectivity_state(). // via grpc_client_channel_watch_connectivity_state().
@ -1201,6 +1209,72 @@ void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked(
self->chand_->state_tracker_.RemoveWatcher(self); self->chand_->state_tracker_.RemoveWatcher(self);
} }
//
// ChannelData::ConnectivityWatcherAdder
//
class ChannelData::ConnectivityWatcherAdder {
public:
ConnectivityWatcherAdder(
ChannelData* chand, grpc_connectivity_state initial_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
: chand_(chand),
initial_state_(initial_state),
watcher_(std::move(watcher)) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
GRPC_CLOSURE_INIT(&closure_, &ConnectivityWatcherAdder::AddWatcherLocked,
this, grpc_combiner_scheduler(chand_->combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
private:
static void AddWatcherLocked(void* arg, grpc_error* error) {
ConnectivityWatcherAdder* self =
static_cast<ConnectivityWatcherAdder*>(arg);
self->chand_->state_tracker_.AddWatcher(self->initial_state_,
std::move(self->watcher_));
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
"ConnectivityWatcherAdder");
Delete(self);
}
ChannelData* chand_;
grpc_connectivity_state initial_state_;
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
grpc_closure closure_;
};
//
// ChannelData::ConnectivityWatcherRemover
//
class ChannelData::ConnectivityWatcherRemover {
public:
ConnectivityWatcherRemover(ChannelData* chand,
AsyncConnectivityStateWatcherInterface* watcher)
: chand_(chand), watcher_(watcher) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
GRPC_CLOSURE_INIT(&closure_,
&ConnectivityWatcherRemover::RemoveWatcherLocked, this,
grpc_combiner_scheduler(chand_->combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
private:
static void RemoveWatcherLocked(void* arg, grpc_error* error) {
ConnectivityWatcherRemover* self =
static_cast<ConnectivityWatcherRemover*>(arg);
self->chand_->state_tracker_.RemoveWatcher(self->watcher_);
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
"ConnectivityWatcherRemover");
Delete(self);
}
ChannelData* chand_;
AsyncConnectivityStateWatcherInterface* watcher_;
grpc_closure closure_;
};
// //
// ChannelData::ClientChannelControlHelper // ChannelData::ClientChannelControlHelper
// //
@ -1885,6 +1959,17 @@ grpc_connectivity_state ChannelData::CheckConnectivityState(
return out; return out;
} }
void ChannelData::AddConnectivityWatcher(
grpc_connectivity_state initial_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
New<ConnectivityWatcherAdder>(this, initial_state, std::move(watcher));
}
void ChannelData::RemoveConnectivityWatcher(
AsyncConnectivityStateWatcherInterface* watcher) {
New<ConnectivityWatcherRemover>(this, watcher);
}
// //
// CallData implementation // CallData implementation
// //
@ -3936,6 +4021,21 @@ void grpc_client_channel_watch_connectivity_state(
watcher_timer_init); watcher_timer_init);
} }
void grpc_client_channel_start_connectivity_watch(
grpc_channel_element* elem, grpc_connectivity_state initial_state,
grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface>
watcher) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->AddConnectivityWatcher(initial_state, std::move(watcher));
}
void grpc_client_channel_stop_connectivity_watch(
grpc_channel_element* elem,
grpc_core::AsyncConnectivityStateWatcherInterface* watcher) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->RemoveConnectivityWatcher(watcher);
}
grpc_core::RefCountedPtr<grpc_core::SubchannelCall> grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element* elem) { grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
auto* calld = static_cast<CallData*>(elem->call_data); auto* calld = static_cast<CallData*>(elem->call_data);

@ -46,17 +46,35 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
int grpc_client_channel_num_external_connectivity_watchers( int grpc_client_channel_num_external_connectivity_watchers(
grpc_channel_element* elem); grpc_channel_element* elem);
// TODO(roth): This function is used both when handling external // Starts a one-time connectivity state watch. When the channel's state
// connectivity watchers and for LB policies like grpclb and xds that // becomes different from *state, sets *state to the new state and
// contain nested channels. In the latter case, we ideally want // schedules on_complete. The watcher_timer_init callback is invoked as
// something closer to the normal connectivity state tracker API. // soon as the watch is actually started (i.e., after hopping into the
// When we have time, consider refactoring this somehow to allow each // client channel combiner). I/O will be serviced via pollent.
// use-case to be handled more cleanly. //
// This is intended to be used when starting a watch from outside of C-core
// via grpc_channel_watch_connectivity_state(). It should not be used
// by other callers.
void grpc_client_channel_watch_connectivity_state( void grpc_client_channel_watch_connectivity_state(
grpc_channel_element* elem, grpc_polling_entity pollent, grpc_channel_element* elem, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_closure* on_complete, grpc_connectivity_state* state, grpc_closure* on_complete,
grpc_closure* watcher_timer_init); grpc_closure* watcher_timer_init);
// Starts and stops a connectivity watch. The watcher will be initially
// notified as soon as the state changes from initial_state and then on
// every subsequent state change until either the watch is stopped or
// it is notified that the state has changed to SHUTDOWN.
//
// This is intended to be used when starting watches from code inside of
// C-core (e.g., for a nested control plane channel for things like xds).
void grpc_client_channel_start_connectivity_watch(
grpc_channel_element* elem, grpc_connectivity_state initial_state,
grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface>
watcher);
void grpc_client_channel_stop_connectivity_watch(
grpc_channel_element* elem,
grpc_core::AsyncConnectivityStateWatcherInterface* watcher);
/* Debug helper: pull the subchannel call from a call stack element */ /* Debug helper: pull the subchannel call from a call stack element */
grpc_core::RefCountedPtr<grpc_core::SubchannelCall> grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element* elem); grpc_client_channel_get_subchannel_call(grpc_call_element* elem);

@ -355,17 +355,17 @@ class XdsLb : public LoadBalancingPolicy {
void StartConnectivityWatchLocked(); void StartConnectivityWatchLocked();
void CancelConnectivityWatchLocked(); void CancelConnectivityWatchLocked();
static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
private: private:
class StateWatcher;
// The owning LB policy. // The owning LB policy.
RefCountedPtr<XdsLb> xdslb_policy_; RefCountedPtr<XdsLb> xdslb_policy_;
// The channel and its status. // The channel and its status.
grpc_channel* channel_; grpc_channel* channel_;
bool shutting_down_ = false; bool shutting_down_ = false;
grpc_connectivity_state connectivity_ = GRPC_CHANNEL_IDLE; StateWatcher* watcher_ = nullptr;
grpc_closure on_connectivity_changed_;
// The retryable XDS calls to the LB server. // The retryable XDS calls to the LB server.
OrphanablePtr<RetryableLbCall<EdsCallState>> eds_calld_; OrphanablePtr<RetryableLbCall<EdsCallState>> eds_calld_;
@ -862,6 +862,39 @@ void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity,
parent_->channel_control_helper()->AddTraceEvent(severity, message); parent_->channel_control_helper()->AddTraceEvent(severity, message);
} }
//
// XdsLb::LbChannelState::StateWatcher
//
class XdsLb::LbChannelState::StateWatcher
: public AsyncConnectivityStateWatcherInterface {
public:
explicit StateWatcher(RefCountedPtr<LbChannelState> parent)
: AsyncConnectivityStateWatcherInterface(
grpc_combiner_scheduler(parent->xdslb_policy_->combiner())),
parent_(std::move(parent)) {}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
if (!parent_->shutting_down_ &&
parent_->xdslb_policy_->fallback_at_startup_checks_pending_ &&
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log(GPR_INFO,
"[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode",
parent_->xdslb_policy_.get());
parent_->xdslb_policy_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&parent_->xdslb_policy_->lb_fallback_timer_);
parent_->xdslb_policy_->UpdateFallbackPolicyLocked();
parent_->CancelConnectivityWatchLocked();
}
}
RefCountedPtr<LbChannelState> parent_;
};
// //
// XdsLb::LbChannelState // XdsLb::LbChannelState
// //
@ -871,8 +904,6 @@ XdsLb::LbChannelState::LbChannelState(RefCountedPtr<XdsLb> xdslb_policy,
const grpc_channel_args& args) const grpc_channel_args& args)
: InternallyRefCounted<LbChannelState>(&grpc_lb_xds_trace), : InternallyRefCounted<LbChannelState>(&grpc_lb_xds_trace),
xdslb_policy_(std::move(xdslb_policy)) { xdslb_policy_(std::move(xdslb_policy)) {
GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChangedLocked,
this, grpc_combiner_scheduler(xdslb_policy_->combiner()));
channel_ = CreateXdsBalancerChannel(balancer_name, args); channel_ = CreateXdsBalancerChannel(balancer_name, args);
GPR_ASSERT(channel_ != nullptr); GPR_ASSERT(channel_ != nullptr);
eds_calld_.reset(New<RetryableLbCall<EdsCallState>>( eds_calld_.reset(New<RetryableLbCall<EdsCallState>>(
@ -900,56 +931,17 @@ void XdsLb::LbChannelState::StartConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem = grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_)); grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
// Ref held by callback. auto watcher = MakeOrphanable<StateWatcher>(Ref());
Ref(DEBUG_LOCATION, "LbChannelState+start_watch").release(); watcher_ = watcher.get();
grpc_client_channel_watch_connectivity_state( grpc_client_channel_start_connectivity_watch(
client_channel_elem, client_channel_elem, GRPC_CHANNEL_IDLE, std::move(watcher));
grpc_polling_entity_create_from_pollset_set(
xdslb_policy_->interested_parties()),
&connectivity_, &on_connectivity_changed_, nullptr);
} }
void XdsLb::LbChannelState::CancelConnectivityWatchLocked() { void XdsLb::LbChannelState::CancelConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem = grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_)); grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state( grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
xdslb_policy_->interested_parties()),
nullptr, &on_connectivity_changed_, nullptr);
}
void XdsLb::LbChannelState::OnConnectivityChangedLocked(void* arg,
grpc_error* error) {
LbChannelState* self = static_cast<LbChannelState*>(arg);
if (!self->shutting_down_ &&
self->xdslb_policy_->fallback_at_startup_checks_pending_) {
if (self->connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Not in TRANSIENT_FAILURE. Renew connectivity watch.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(self->channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
self->xdslb_policy_->interested_parties()),
&self->connectivity_, &self->on_connectivity_changed_, nullptr);
return; // Early out so we don't drop the ref below.
}
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log(GPR_INFO,
"[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode",
self);
self->xdslb_policy_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&self->xdslb_policy_->lb_fallback_timer_);
self->xdslb_policy_->UpdateFallbackPolicyLocked();
}
// Done watching connectivity state, so drop ref.
self->Unref(DEBUG_LOCATION, "LbChannelState+watch_done");
} }
// //
@ -1843,9 +1835,7 @@ void XdsLb::ShutdownLocked() {
gpr_log(GPR_INFO, "[xdslb %p] shutting down", this); gpr_log(GPR_INFO, "[xdslb %p] shutting down", this);
} }
shutting_down_ = true; shutting_down_ = true;
if (fallback_at_startup_checks_pending_) { MaybeCancelFallbackAtStartupChecks();
grpc_timer_cancel(&lb_fallback_timer_);
}
priority_list_.ShutdownLocked(); priority_list_.ShutdownLocked();
if (fallback_policy_ != nullptr) { if (fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(), grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(),

@ -57,10 +57,9 @@ const char* ConnectivityStateName(grpc_connectivity_state state) {
class AsyncConnectivityStateWatcherInterface::Notifier { class AsyncConnectivityStateWatcherInterface::Notifier {
public: public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher, Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
grpc_connectivity_state state) grpc_connectivity_state state, grpc_closure_scheduler* scheduler)
: watcher_(std::move(watcher)), state_(state) { : watcher_(std::move(watcher)), state_(state) {
GRPC_CLOSURE_INIT(&closure_, SendNotification, this, GRPC_CLOSURE_INIT(&closure_, SendNotification, this, scheduler);
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
} }
@ -82,7 +81,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier {
void AsyncConnectivityStateWatcherInterface::Notify( void AsyncConnectivityStateWatcherInterface::Notify(
grpc_connectivity_state state) { grpc_connectivity_state state) {
New<Notifier>(Ref(), state); // Deletes itself when done. New<Notifier>(Ref(), state, scheduler_); // Deletes itself when done.
} }
// //

@ -28,6 +28,7 @@
#include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core { namespace grpc_core {
@ -67,8 +68,15 @@ class AsyncConnectivityStateWatcherInterface
protected: protected:
class Notifier; class Notifier;
explicit AsyncConnectivityStateWatcherInterface(
grpc_closure_scheduler* scheduler = grpc_schedule_on_exec_ctx)
: scheduler_(scheduler) {}
// Invoked asynchronously when Notify() is called. // Invoked asynchronously when Notify() is called.
virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0; virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0;
private:
grpc_closure_scheduler* scheduler_;
}; };
// Tracks connectivity state. Maintains a list of watchers that are // Tracks connectivity state. Maintains a list of watchers that are

Loading…
Cancel
Save