Convert external connectivity watcher to C++.

reviewable/pr18476/r1
Mark D. Roth 6 years ago
parent 62ba0b89b1
commit f26ef91c98
  1. 320
      src/core/ext/filters/client_channel/client_channel.cc

@ -50,6 +50,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
@ -91,7 +92,53 @@ grpc_core::TraceFlag grpc_client_channel_routing_trace(
* CHANNEL-WIDE FUNCTIONS
*/
struct external_connectivity_watcher;
// Forward declaration.
typedef struct client_channel_channel_data channel_data;
namespace grpc_core {
namespace {
class ExternalConnectivityWatcher {
public:
class WatcherList {
public:
WatcherList() { gpr_mu_init(&mu_); }
~WatcherList() { gpr_mu_destroy(&mu_); }
int size() const;
ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
void Append(ExternalConnectivityWatcher* watcher);
void Remove(ExternalConnectivityWatcher* watcher);
private:
// head_ is guarded by its own mutex, since the size of the list needs
// to be grabbed immediately without polling on a CQ.
mutable gpr_mu mu_;
ExternalConnectivityWatcher* head_ = nullptr;
};
ExternalConnectivityWatcher(channel_data* chand, grpc_polling_entity pollent,
grpc_connectivity_state* state,
grpc_closure* on_complete,
grpc_closure* watcher_timer_init);
~ExternalConnectivityWatcher();
private:
static void OnExternalWatchCompleteLocked(void* arg, grpc_error* error);
static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
channel_data* chand_;
grpc_polling_entity pollent_;
grpc_connectivity_state* state_;
grpc_closure* on_complete_;
grpc_closure* watcher_timer_init_;
grpc_closure my_closure_;
ExternalConnectivityWatcher* next_ = nullptr;
};
} // namespace
} // namespace grpc_core
struct QueuedPick {
LoadBalancingPolicy::PickArgs pick;
@ -99,7 +146,7 @@ struct QueuedPick {
QueuedPick* next = nullptr;
};
typedef struct client_channel_channel_data {
struct client_channel_channel_data {
bool deadline_checking_enabled;
bool enable_retries;
size_t per_rpc_retry_buffer_size;
@ -139,11 +186,10 @@ typedef struct client_channel_channel_data {
grpc_connectivity_state_tracker state_tracker;
grpc_error* disconnect_error;
/* external_connectivity_watcher_list head is guarded by its own mutex, since
* counts need to be grabbed immediately without polling on a cq */
gpr_mu external_connectivity_watcher_list_mu;
struct external_connectivity_watcher* external_connectivity_watcher_list_head;
} channel_data;
grpc_core::ManualConstructor<
grpc_core::ExternalConnectivityWatcher::WatcherList>
external_connectivity_watcher_list;
};
// Forward declarations.
static void start_pick_locked(void* arg, grpc_error* ignored);
@ -191,6 +237,124 @@ static void set_connectivity_state_and_picker_locked(
namespace grpc_core {
namespace {
//
// ExternalConnectivityWatcher::WatcherList
//
int ExternalConnectivityWatcher::WatcherList::size() const {
MutexLock lock(&mu_);
int count = 0;
for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
++count;
}
return count;
}
ExternalConnectivityWatcher* ExternalConnectivityWatcher::WatcherList::Lookup(
grpc_closure* on_complete) const {
MutexLock lock(&mu_);
ExternalConnectivityWatcher* w = head_;
while (w != nullptr && w->on_complete_ != on_complete) {
w = w->next_;
}
return w;
}
void ExternalConnectivityWatcher::WatcherList::Append(
ExternalConnectivityWatcher* watcher) {
GPR_ASSERT(!Lookup(watcher->on_complete_));
MutexLock lock(&mu_);
GPR_ASSERT(watcher->next_ == nullptr);
watcher->next_ = head_;
head_ = watcher;
}
void ExternalConnectivityWatcher::WatcherList::Remove(
ExternalConnectivityWatcher* watcher) {
MutexLock lock(&mu_);
if (watcher == head_) {
head_ = watcher->next_;
return;
}
for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
if (w->next_ == watcher) {
w->next_ = w->next_->next_;
return;
}
}
GPR_UNREACHABLE_CODE(return );
}
//
// ExternalConnectivityWatcher
//
ExternalConnectivityWatcher::ExternalConnectivityWatcher(
channel_data* chand, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_closure* on_complete,
grpc_closure* watcher_timer_init)
: chand_(chand),
pollent_(pollent),
state_(state),
on_complete_(on_complete),
watcher_timer_init_(watcher_timer_init) {
grpc_polling_entity_add_to_pollset_set(&pollent_, chand_->interested_parties);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack, "ExternalConnectivityWatcher");
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
grpc_combiner_scheduler(chand_->combiner)),
GRPC_ERROR_NONE);
}
ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
grpc_polling_entity_del_from_pollset_set(&pollent_,
chand_->interested_parties);
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack, "ExternalConnectivityWatcher");
}
void ExternalConnectivityWatcher::OnExternalWatchCompleteLocked(
void* arg, grpc_error* error) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
grpc_closure* on_complete = self->on_complete_;
self->chand_->external_connectivity_watcher_list->Remove(self);
Delete(self);
GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
}
void ExternalConnectivityWatcher::WatchConnectivityStateLocked(
void* arg, grpc_error* ignored) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
if (self->state_ == nullptr) {
// Handle cancellation.
GPR_ASSERT(self->watcher_timer_init_ == nullptr);
ExternalConnectivityWatcher* found =
self->chand_->external_connectivity_watcher_list->Lookup(
self->on_complete_);
if (found != nullptr) {
GPR_ASSERT(found->on_complete_ == self->on_complete_);
grpc_connectivity_state_notify_on_state_change(
&found->chand_->state_tracker, nullptr, &found->my_closure_);
}
Delete(self);
return;
}
// New watcher.
self->chand_->external_connectivity_watcher_list->Append(self);
// This assumes that the closure is scheduled on the ExecCtx scheduler
// and that GRPC_CLOSURE_RUN would run the closure immediately.
GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&self->my_closure_, OnExternalWatchCompleteLocked, self,
grpc_combiner_scheduler(self->chand_->combiner));
grpc_connectivity_state_notify_on_state_change(
&self->chand_->state_tracker, self->state_, &self->my_closure_);
}
//
// ClientChannelControlHelper
//
class ClientChannelControlHelper
: public LoadBalancingPolicy::ChannelControlHelper {
public:
@ -402,12 +566,7 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
"client_channel");
chand->disconnect_error = GRPC_ERROR_NONE;
gpr_mu_init(&chand->info_mu);
gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
chand->external_connectivity_watcher_list_head = nullptr;
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
chand->external_connectivity_watcher_list.Init();
chand->owning_stack = args->channel_stack;
chand->deadline_checking_enabled =
grpc_deadline_checking_enabled(args->channel_args);
@ -515,7 +674,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
GRPC_ERROR_UNREF(chand->disconnect_error);
grpc_connectivity_state_destroy(&chand->state_tracker);
gpr_mu_destroy(&chand->info_mu);
gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
chand->external_connectivity_watcher_list.Destroy();
}
/*************************************************************************
@ -2875,6 +3034,10 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
//
// functions exported to the rest of core
//
void grpc_client_channel_set_channelz_node(
grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@ -2914,120 +3077,10 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
return out;
}
typedef struct external_connectivity_watcher {
channel_data* chand;
grpc_polling_entity pollent;
grpc_closure* on_complete;
grpc_closure* watcher_timer_init;
grpc_connectivity_state* state;
grpc_closure my_closure;
struct external_connectivity_watcher* next;
} external_connectivity_watcher;
static external_connectivity_watcher* lookup_external_connectivity_watcher(
channel_data* chand, grpc_closure* on_complete) {
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
external_connectivity_watcher* w =
chand->external_connectivity_watcher_list_head;
while (w != nullptr && w->on_complete != on_complete) {
w = w->next;
}
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
return w;
}
static void external_connectivity_watcher_list_append(
channel_data* chand, external_connectivity_watcher* w) {
GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
GPR_ASSERT(!w->next);
w->next = chand->external_connectivity_watcher_list_head;
chand->external_connectivity_watcher_list_head = w;
gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
}
static void external_connectivity_watcher_list_remove(
channel_data* chand, external_connectivity_watcher* to_remove) {
GPR_ASSERT(
lookup_external_connectivity_watcher(chand, to_remove->on_complete));
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
if (to_remove == chand->external_connectivity_watcher_list_head) {
chand->external_connectivity_watcher_list_head = to_remove->next;
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
return;
}
external_connectivity_watcher* w =
chand->external_connectivity_watcher_list_head;
while (w != nullptr) {
if (w->next == to_remove) {
w->next = w->next->next;
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
return;
}
w = w->next;
}
GPR_UNREACHABLE_CODE(return );
}
int grpc_client_channel_num_external_connectivity_watchers(
grpc_channel_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
int count = 0;
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
external_connectivity_watcher* w =
chand->external_connectivity_watcher_list_head;
while (w != nullptr) {
count++;
w = w->next;
}
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
return count;
}
static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
external_connectivity_watcher* w =
static_cast<external_connectivity_watcher*>(arg);
grpc_closure* follow_up = w->on_complete;
grpc_polling_entity_del_from_pollset_set(&w->pollent,
w->chand->interested_parties);
GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
"external_connectivity_watcher");
external_connectivity_watcher_list_remove(w->chand, w);
gpr_free(w);
GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
}
static void watch_connectivity_state_locked(void* arg,
grpc_error* error_ignored) {
external_connectivity_watcher* w =
static_cast<external_connectivity_watcher*>(arg);
external_connectivity_watcher* found = nullptr;
if (w->state != nullptr) {
external_connectivity_watcher_list_append(w->chand, w);
// An assumption is being made that the closure is scheduled on the exec ctx
// scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
grpc_combiner_scheduler(w->chand->combiner));
grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
w->state, &w->my_closure);
} else {
GPR_ASSERT(w->watcher_timer_init == nullptr);
found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
if (found) {
GPR_ASSERT(found->on_complete == w->on_complete);
grpc_connectivity_state_notify_on_state_change(
&found->chand->state_tracker, nullptr, &found->my_closure);
}
grpc_polling_entity_del_from_pollset_set(&w->pollent,
w->chand->interested_parties);
GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
}
return chand->external_connectivity_watcher_list->size();
}
void grpc_client_channel_watch_connectivity_state(
@ -3035,21 +3088,8 @@ void grpc_client_channel_watch_connectivity_state(
grpc_connectivity_state* state, grpc_closure* closure,
grpc_closure* watcher_timer_init) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
external_connectivity_watcher* w =
static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
w->chand = chand;
w->pollent = pollent;
w->on_complete = closure;
w->state = state;
w->watcher_timer_init = watcher_timer_init;
grpc_polling_entity_add_to_pollset_set(&w->pollent,
chand->interested_parties);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
grpc_core::New<grpc_core::ExternalConnectivityWatcher>(
chand, pollent, state, closure, watcher_timer_init);
}
grpc_core::RefCountedPtr<grpc_core::SubchannelCall>

Loading…
Cancel
Save