Merge pull request #18580 from markdroth/data_plane_combiner

Second attempt: Split data plane and control plane into their own combiners.
reviewable/pr18489/r1^2
Mark D. Roth 6 years ago committed by GitHub
commit 56d8f9896b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 196
      src/core/ext/filters/client_channel/client_channel.cc
  2. 7
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 15
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  4. 8
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
  5. 14
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
  6. 4
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
  7. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
  8. 1
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc

@ -149,45 +149,48 @@ struct QueuedPick {
};
struct client_channel_channel_data {
//
// Fields set at construction and never modified.
//
bool deadline_checking_enabled;
bool enable_retries;
size_t per_rpc_retry_buffer_size;
/** combiner protecting all variables below in this data structure */
grpc_combiner* combiner;
/** owning stack */
grpc_channel_stack* owning_stack;
/** interested parties (owned) */
grpc_pollset_set* interested_parties;
// Client channel factory.
grpc_core::ClientChannelFactory* client_channel_factory;
// Subchannel pool.
grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
grpc_core::channelz::ClientChannelNode* channelz_node;
// Resolving LB policy.
grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
// Subchannel picker from LB policy.
//
// Fields used in the data plane. Protected by data_plane_combiner.
//
grpc_combiner* data_plane_combiner;
grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker;
// Linked list of queued picks.
QueuedPick* queued_picks;
bool have_service_config;
/** retry throttle data from service config */
QueuedPick* queued_picks; // Linked list of queued picks.
// Data from service config.
bool received_service_config_data;
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
/** per-method service config data */
grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
/* the following properties are guarded by a mutex since APIs require them
to be instantaneously available */
//
// Fields used in the control plane. Protected by combiner.
//
grpc_combiner* combiner;
grpc_pollset_set* interested_parties;
grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
grpc_connectivity_state_tracker state_tracker;
//
// Fields accessed from both data plane and control plane combiners.
//
grpc_core::Atomic<grpc_error*> disconnect_error;
// The following properties are guarded by a mutex since APIs require them
// to be instantaneously available.
gpr_mu info_mu;
grpc_core::UniquePtr<char> info_lb_policy_name;
grpc_core::UniquePtr<char> info_service_config_json;
grpc_connectivity_state_tracker state_tracker;
grpc_error* disconnect_error;
grpc_core::ManualConstructor<
grpc_core::ExternalConnectivityWatcher::WatcherList>
external_connectivity_watcher_list;
@ -214,30 +217,98 @@ static const char* get_channel_connectivity_state_change_string(
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
static void set_connectivity_state_and_picker_locked(
channel_data* chand, grpc_connectivity_state state, grpc_error* state_error,
const char* reason,
grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
// Update connectivity state.
namespace grpc_core {
namespace {
// A fire-and-forget class that sets the channel's connectivity state
// and then hops into the data plane combiner to update the picker.
// Must be instantiated while holding the control plane combiner.
// Deletes itself when done.
class ConnectivityStateAndPickerSetter {
public:
ConnectivityStateAndPickerSetter(
channel_data* chand, grpc_connectivity_state state,
grpc_error* state_error, const char* reason,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
: chand_(chand), picker_(std::move(picker)) {
// Update connectivity state here, while holding control plane combiner.
grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
reason);
if (chand->channelz_node != nullptr) {
chand->channelz_node->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string(
get_channel_connectivity_state_change_string(state)));
}
// Bounce into the data plane combiner to reset the picker.
GRPC_CHANNEL_STACK_REF(chand->owning_stack,
"ConnectivityStateAndPickerSetter");
GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
grpc_combiner_scheduler(chand->data_plane_combiner));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
private:
static void SetPicker(void* arg, grpc_error* ignored) {
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
// Update picker.
chand->picker = std::move(picker);
self->chand_->picker = std::move(self->picker_);
// Re-process queued picks.
for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
for (QueuedPick* pick = self->chand_->queued_picks; pick != nullptr;
pick = pick->next) {
start_pick_locked(pick->elem, GRPC_ERROR_NONE);
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack,
"ConnectivityStateAndPickerSetter");
Delete(self);
}
channel_data* chand_;
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
grpc_closure closure_;
};
// A fire-and-forget class that sets the channel's service config data
// in the data plane combiner. Deletes itself when done.
class ServiceConfigSetter {
public:
ServiceConfigSetter(
channel_data* chand,
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
RefCountedPtr<ClientChannelMethodParamsTable> method_params_table)
: chand_(chand),
retry_throttle_data_(std::move(retry_throttle_data)),
method_params_table_(std::move(method_params_table)) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "ServiceConfigSetter");
GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
grpc_combiner_scheduler(chand->data_plane_combiner));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
namespace grpc_core {
namespace {
private:
static void SetServiceConfigData(void* arg, grpc_error* ignored) {
ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
channel_data* chand = self->chand_;
// Update channel state.
chand->received_service_config_data = true;
chand->retry_throttle_data = std::move(self->retry_throttle_data_);
chand->method_params_table = std::move(self->method_params_table_);
// Apply service config to queued picks.
for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
pick = pick->next) {
maybe_apply_service_config_to_call_locked(pick->elem);
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack, "ServiceConfigSetter");
Delete(self);
}
channel_data* chand_;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_;
grpc_closure closure_;
};
//
// ExternalConnectivityWatcher::WatcherList
@ -387,8 +458,10 @@ class ClientChannelControlHelper
void UpdateState(
grpc_connectivity_state state, grpc_error* state_error,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
grpc_error* disconnect_error =
chand_->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
if (grpc_client_channel_routing_trace.enabled()) {
const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE
const char* extra = disconnect_error == GRPC_ERROR_NONE
? ""
: " (ignoring -- channel shutting down)";
gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s",
@ -396,8 +469,9 @@ class ClientChannelControlHelper
grpc_error_string(state_error), picker.get(), extra);
}
// Do update only if not shutting down.
if (chand_->disconnect_error == GRPC_ERROR_NONE) {
set_connectivity_state_and_picker_locked(chand_, state, state_error,
if (disconnect_error == GRPC_ERROR_NONE) {
// Will delete itself.
New<ConnectivityStateAndPickerSetter>(chand_, state, state_error,
"helper", std::move(picker));
} else {
GRPC_ERROR_UNREF(state_error);
@ -420,7 +494,6 @@ static bool process_resolver_result_locked(
void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name,
grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
channel_data* chand = static_cast<channel_data*>(arg);
chand->have_service_config = true;
ProcessedResolverResult resolver_result(result, chand->enable_retries);
grpc_core::UniquePtr<char> service_config_json =
resolver_result.service_config_json();
@ -428,9 +501,11 @@ static bool process_resolver_result_locked(
gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
chand, service_config_json.get());
}
// Update channel state.
chand->retry_throttle_data = resolver_result.retry_throttle_data();
chand->method_params_table = resolver_result.method_params_table();
// Create service config setter to update channel state in the data
// plane combiner. Destroys itself when done.
grpc_core::New<grpc_core::ServiceConfigSetter>(
chand, resolver_result.retry_throttle_data(),
resolver_result.method_params_table());
// Swap out the data used by cc_get_channel_info().
gpr_mu_lock(&chand->info_mu);
chand->info_lb_policy_name = resolver_result.lb_policy_name();
@ -445,11 +520,6 @@ static bool process_resolver_result_locked(
// Return results.
*lb_policy_name = chand->info_lb_policy_name.get();
*lb_policy_config = resolver_result.lb_policy_config();
// Apply service config to queued picks.
for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
pick = pick->next) {
maybe_apply_service_config_to_call_locked(pick->elem);
}
return service_config_changed;
}
@ -507,12 +577,16 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
}
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
chand->disconnect_error = op->disconnect_with_error;
grpc_error* error = GRPC_ERROR_NONE;
GPR_ASSERT(chand->disconnect_error.CompareExchangeStrong(
&error, op->disconnect_with_error, grpc_core::MemoryOrder::ACQ_REL,
grpc_core::MemoryOrder::ACQUIRE));
grpc_pollset_set_del_pollset_set(
chand->resolving_lb_policy->interested_parties(),
chand->interested_parties);
chand->resolving_lb_policy.reset();
set_connectivity_state_and_picker_locked(
// Will delete itself.
grpc_core::New<grpc_core::ConnectivityStateAndPickerSetter>(
chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error),
"shutdown from API",
grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
@ -562,10 +636,12 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
// Initialize data members.
chand->data_plane_combiner = grpc_combiner_create();
chand->combiner = grpc_combiner_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
chand->disconnect_error = GRPC_ERROR_NONE;
chand->disconnect_error.Store(GRPC_ERROR_NONE,
grpc_core::MemoryOrder::RELAXED);
gpr_mu_init(&chand->info_mu);
chand->external_connectivity_watcher_list.Init();
chand->owning_stack = args->channel_stack;
@ -671,8 +747,10 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
chand->method_params_table.reset();
grpc_client_channel_stop_backup_polling(chand->interested_parties);
grpc_pollset_set_destroy(chand->interested_parties);
GRPC_COMBINER_UNREF(chand->data_plane_combiner, "client_channel");
GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
GRPC_ERROR_UNREF(chand->disconnect_error);
GRPC_ERROR_UNREF(
chand->disconnect_error.Load(grpc_core::MemoryOrder::RELAXED));
grpc_connectivity_state_destroy(&chand->state_tracker);
gpr_mu_destroy(&chand->info_mu);
chand->external_connectivity_watcher_list.Destroy();
@ -1421,7 +1499,7 @@ static void do_retry(grpc_call_element* elem,
}
// Schedule retry after computed delay.
GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
grpc_combiner_scheduler(chand->combiner));
grpc_combiner_scheduler(chand->data_plane_combiner));
grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
// Update bookkeeping.
if (retry_state != nullptr) retry_state->retry_dispatched = true;
@ -2648,7 +2726,7 @@ class QueuedPickCanceller {
auto* chand = static_cast<channel_data*>(elem->channel_data);
GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller");
GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
grpc_combiner_scheduler(chand->combiner));
grpc_combiner_scheduler(chand->data_plane_combiner));
grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_);
}
@ -2788,7 +2866,7 @@ static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// Apply service config data to the call only once, and only if the
// channel has the data available.
if (GPR_LIKELY(chand->have_service_config &&
if (GPR_LIKELY(chand->received_service_config_data &&
!calld->service_config_applied)) {
calld->service_config_applied = true;
apply_service_config_to_call_locked(elem);
@ -2836,7 +2914,7 @@ static void start_pick_locked(void* arg, grpc_error* error) {
.send_initial_metadata_flags;
// Apply service config to call if needed.
maybe_apply_service_config_to_call_locked(elem);
// When done, we schedule this closure to leave the channel combiner.
// When done, we schedule this closure to leave the data plane combiner.
GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
grpc_schedule_on_exec_ctx);
// Attempt pick.
@ -2851,12 +2929,14 @@ static void start_pick_locked(void* arg, grpc_error* error) {
grpc_error_string(error));
}
switch (pick_result) {
case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: {
// If we're shutting down, fail all RPCs.
if (chand->disconnect_error != GRPC_ERROR_NONE) {
grpc_error* disconnect_error =
chand->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
if (disconnect_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error);
GRPC_CLOSURE_SCHED(&calld->pick_closure,
GRPC_ERROR_REF(chand->disconnect_error));
GRPC_ERROR_REF(disconnect_error));
break;
}
// If wait_for_ready is false, then the error indicates the RPC
@ -2882,6 +2962,7 @@ static void start_pick_locked(void* arg, grpc_error* error) {
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
GRPC_ERROR_UNREF(error);
}
// Fallthrough
case LoadBalancingPolicy::PICK_QUEUE:
if (!calld->pick_queued) add_call_to_queued_picks_locked(elem);
@ -2976,7 +3057,8 @@ static void cc_start_transport_stream_op_batch(
}
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
elem, grpc_combiner_scheduler(chand->combiner)),
elem,
grpc_combiner_scheduler(chand->data_plane_combiner)),
GRPC_ERROR_NONE);
} else {
// For all other batches, release the call combiner.

@ -140,10 +140,9 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
// the time this function returns, the pick will already have
// been processed, and we'll be trying to re-process the same
// pick again, leading to a crash.
// 2. In a subsequent PR, we will split the data plane and control
// plane synchronization into separate combiners, at which
// point this will need to hop from the data plane combiner into
// the control plane combiner.
// 2. We are currently running in the data plane combiner, but we
// need to bounce into the control plane combiner to call
// ExitIdleLocked().
if (!exit_idle_called_) {
exit_idle_called_ = true;
parent_->Ref().release(); // ref held by closure.

@ -234,12 +234,19 @@ class GrpcLb : public LoadBalancingPolicy {
// Returns the LB token to use for a drop, or null if the call
// should not be dropped.
// Intended to be called from picker, so calls will be externally
// synchronized.
//
// Note: This is called from the picker, so it will be invoked in
// the channel's data plane combiner, NOT the control plane
// combiner. It should not be accessed by any other part of the LB
// policy.
const char* ShouldDrop();
private:
grpc_grpclb_serverlist* serverlist_;
// Guarded by the channel's data plane combiner, NOT the control
// plane combiner. It should not be accessed by anything but the
// picker via the ShouldDrop() method.
size_t drop_index_ = 0;
};
@ -551,7 +558,7 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
if (client_stats_ != nullptr) {
client_stats_->AddCallDroppedLocked(drop_token);
client_stats_->AddCallDropped(drop_token);
}
return PICK_COMPLETE;
}
@ -910,7 +917,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
// Construct message payload.
GPR_ASSERT(send_message_payload_ == nullptr);
grpc_grpclb_request* request =
grpc_grpclb_load_report_request_create_locked(client_stats_.get());
grpc_grpclb_load_report_request_create(client_stats_.get());
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (LoadReportCountersAreZero(request)) {

@ -25,6 +25,8 @@
#include <grpc/support/atm.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/gprpp/mutex_lock.h"
namespace grpc_core {
void GrpcLbClientStats::AddCallStarted() {
@ -43,11 +45,12 @@ void GrpcLbClientStats::AddCallFinished(
}
}
void GrpcLbClientStats::AddCallDroppedLocked(const char* token) {
void GrpcLbClientStats::AddCallDropped(const char* token) {
// Increment num_calls_started and num_calls_finished.
gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
// Record the drop.
MutexLock lock(&drop_count_mu_);
if (drop_token_counts_ == nullptr) {
drop_token_counts_.reset(New<DroppedCallCounts>());
}
@ -69,7 +72,7 @@ void AtomicGetAndResetCounter(int64_t* value, gpr_atm* counter) {
} // namespace
void GrpcLbClientStats::GetLocked(
void GrpcLbClientStats::Get(
int64_t* num_calls_started, int64_t* num_calls_finished,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received,
@ -80,6 +83,7 @@ void GrpcLbClientStats::GetLocked(
&num_calls_finished_with_client_failed_to_send_);
AtomicGetAndResetCounter(num_calls_finished_known_received,
&num_calls_finished_known_received_);
MutexLock lock(&drop_count_mu_);
*drop_token_counts = std::move(drop_token_counts_);
}

@ -41,17 +41,16 @@ class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts;
GrpcLbClientStats() {}
GrpcLbClientStats() { gpr_mu_init(&drop_count_mu_); }
~GrpcLbClientStats() { gpr_mu_destroy(&drop_count_mu_); }
void AddCallStarted();
void AddCallFinished(bool finished_with_client_failed_to_send,
bool finished_known_received);
// This method is not thread-safe; caller must synchronize.
void AddCallDroppedLocked(const char* token);
void AddCallDropped(const char* token);
// This method is not thread-safe; caller must synchronize.
void GetLocked(int64_t* num_calls_started, int64_t* num_calls_finished,
void Get(int64_t* num_calls_started, int64_t* num_calls_finished,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received,
UniquePtr<DroppedCallCounts>* drop_token_counts);
@ -63,13 +62,12 @@ class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
}
private:
// This field must only be accessed via *_locked() methods.
UniquePtr<DroppedCallCounts> drop_token_counts_;
// These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started_ = 0;
gpr_atm num_calls_finished_ = 0;
gpr_atm num_calls_finished_with_client_failed_to_send_ = 0;
gpr_atm num_calls_finished_known_received_ = 0;
gpr_mu drop_count_mu_; // Guards drop_token_counts_.
UniquePtr<DroppedCallCounts> drop_token_counts_;
};
} // namespace grpc_core

@ -107,7 +107,7 @@ static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
return true;
}
grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_request* grpc_grpclb_load_report_request_create(
grpc_core::GrpcLbClientStats* client_stats) {
grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
gpr_zalloc(sizeof(grpc_grpclb_request)));
@ -122,7 +122,7 @@ grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
grpc_core::UniquePtr<grpc_core::GrpcLbClientStats::DroppedCallCounts>
drop_counts;
client_stats->GetLocked(
client_stats->Get(
&req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished,
&req->client_stats.num_calls_finished_with_client_failed_to_send,

@ -43,7 +43,7 @@ typedef struct {
/** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name);
grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_request* grpc_grpclb_load_report_request_create(
grpc_core::GrpcLbClientStats* client_stats);
/** Protocol Buffers v3-encode \a request */

@ -186,6 +186,7 @@ void PickFirst::ShutdownLocked() {
}
void PickFirst::ExitIdleLocked() {
if (shutdown_) return;
if (idle_) {
idle_ = false;
if (subchannel_list_ == nullptr ||

Loading…
Cancel
Save