Revert "Merge pull request #18578 from grpc/revert-18437-data_plane_combiner"

This reverts commit a8477b421f, reversing
changes made to 65b25ce24d.
pull/18580/head
Mark D. Roth 6 years ago
parent a8477b421f
commit 0b5ec20ce8
  1. 230
      src/core/ext/filters/client_channel/client_channel.cc
  2. 7
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 11
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -100,49 +100,52 @@ struct QueuedPick {
};
typedef struct client_channel_channel_data {
//
// Fields set at construction and never modified.
//
bool deadline_checking_enabled;
bool enable_retries;
size_t per_rpc_retry_buffer_size;
/** combiner protecting all variables below in this data structure */
grpc_combiner* combiner;
/** owning stack */
grpc_channel_stack* owning_stack;
/** interested parties (owned) */
grpc_pollset_set* interested_parties;
// Client channel factory.
grpc_core::ClientChannelFactory* client_channel_factory;
// Subchannel pool.
grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
grpc_core::channelz::ClientChannelNode* channelz_node;
// Resolving LB policy.
grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
// Subchannel picker from LB policy.
//
// Fields used in the data plane. Protected by data_plane_combiner.
//
grpc_combiner* data_plane_combiner;
grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker;
// Linked list of queued picks.
QueuedPick* queued_picks;
bool have_service_config;
/** retry throttle data from service config */
QueuedPick* queued_picks; // Linked list of queued picks.
// Data from service config.
bool received_service_config_data;
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
/** per-method service config data */
grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
/* the following properties are guarded by a mutex since APIs require them
to be instantaneously available */
gpr_mu info_mu;
grpc_core::UniquePtr<char> info_lb_policy_name;
grpc_core::UniquePtr<char> info_service_config_json;
//
// Fields used in the control plane. Protected by combiner.
//
grpc_combiner* combiner;
grpc_pollset_set* interested_parties;
grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
grpc_connectivity_state_tracker state_tracker;
grpc_error* disconnect_error;
/* external_connectivity_watcher_list head is guarded by its own mutex, since
* counts need to be grabbed immediately without polling on a cq */
//
// Fields accessed from both data plane and control plane combiners.
//
grpc_core::Atomic<grpc_error*> disconnect_error;
// external_connectivity_watcher_list head is guarded by its own mutex, since
// counts need to be grabbed immediately without polling on a CQ.
gpr_mu external_connectivity_watcher_list_mu;
struct external_connectivity_watcher* external_connectivity_watcher_list_head;
// The following properties are guarded by a mutex since APIs require them
// to be instantaneously available.
gpr_mu info_mu;
grpc_core::UniquePtr<char> info_lb_policy_name;
grpc_core::UniquePtr<char> info_service_config_json;
} channel_data;
// Forward declarations.
@ -166,30 +169,98 @@ static const char* get_channel_connectivity_state_change_string(
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
static void set_connectivity_state_and_picker_locked(
channel_data* chand, grpc_connectivity_state state, grpc_error* state_error,
const char* reason,
grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
// Update connectivity state.
grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
reason);
if (chand->channelz_node != nullptr) {
chand->channelz_node->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string(
get_channel_connectivity_state_change_string(state)));
namespace grpc_core {
namespace {
// A fire-and-forget class that sets the channel's connectivity state
// and then hops into the data plane combiner to update the picker.
// Must be instantiated while holding the control plane combiner.
// Deletes itself when done.
class ConnectivityStateAndPickerSetter {
public:
ConnectivityStateAndPickerSetter(
channel_data* chand, grpc_connectivity_state state,
grpc_error* state_error, const char* reason,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
: chand_(chand), picker_(std::move(picker)) {
// Update connectivity state here, while holding control plane combiner.
grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
reason);
if (chand->channelz_node != nullptr) {
chand->channelz_node->AddTraceEvent(
channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string(
get_channel_connectivity_state_change_string(state)));
}
// Bounce into the data plane combiner to reset the picker.
GRPC_CHANNEL_STACK_REF(chand->owning_stack,
"ConnectivityStateAndPickerSetter");
GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
grpc_combiner_scheduler(chand->data_plane_combiner));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
// Update picker.
chand->picker = std::move(picker);
// Re-process queued picks.
for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
pick = pick->next) {
start_pick_locked(pick->elem, GRPC_ERROR_NONE);
private:
static void SetPicker(void* arg, grpc_error* ignored) {
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
// Update picker.
self->chand_->picker = std::move(self->picker_);
// Re-process queued picks.
for (QueuedPick* pick = self->chand_->queued_picks; pick != nullptr;
pick = pick->next) {
start_pick_locked(pick->elem, GRPC_ERROR_NONE);
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack,
"ConnectivityStateAndPickerSetter");
Delete(self);
}
}
namespace grpc_core {
namespace {
channel_data* chand_;
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
grpc_closure closure_;
};
// A fire-and-forget class that sets the channel's service config data
// in the data plane combiner. Deletes itself when done.
class ServiceConfigSetter {
public:
ServiceConfigSetter(
channel_data* chand,
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
RefCountedPtr<ClientChannelMethodParamsTable> method_params_table)
: chand_(chand),
retry_throttle_data_(std::move(retry_throttle_data)),
method_params_table_(std::move(method_params_table)) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "ServiceConfigSetter");
GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
grpc_combiner_scheduler(chand->data_plane_combiner));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
private:
static void SetServiceConfigData(void* arg, grpc_error* ignored) {
ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
channel_data* chand = self->chand_;
// Update channel state.
chand->received_service_config_data = true;
chand->retry_throttle_data = std::move(self->retry_throttle_data_);
chand->method_params_table = std::move(self->method_params_table_);
// Apply service config to queued picks.
for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
pick = pick->next) {
maybe_apply_service_config_to_call_locked(pick->elem);
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack, "ServiceConfigSetter");
Delete(self);
}
channel_data* chand_;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_;
grpc_closure closure_;
};
class ClientChannelControlHelper
: public LoadBalancingPolicy::ChannelControlHelper {
@ -222,8 +293,10 @@ class ClientChannelControlHelper
void UpdateState(
grpc_connectivity_state state, grpc_error* state_error,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
grpc_error* disconnect_error =
chand_->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
if (grpc_client_channel_routing_trace.enabled()) {
const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE
const char* extra = disconnect_error == GRPC_ERROR_NONE
? ""
: " (ignoring -- channel shutting down)";
gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s",
@ -231,9 +304,10 @@ class ClientChannelControlHelper
grpc_error_string(state_error), picker.get(), extra);
}
// Do update only if not shutting down.
if (chand_->disconnect_error == GRPC_ERROR_NONE) {
set_connectivity_state_and_picker_locked(chand_, state, state_error,
"helper", std::move(picker));
if (disconnect_error == GRPC_ERROR_NONE) {
// Will delete itself.
New<ConnectivityStateAndPickerSetter>(chand_, state, state_error,
"helper", std::move(picker));
} else {
GRPC_ERROR_UNREF(state_error);
}
@ -255,7 +329,6 @@ static bool process_resolver_result_locked(
void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name,
grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
channel_data* chand = static_cast<channel_data*>(arg);
chand->have_service_config = true;
ProcessedResolverResult resolver_result(result, chand->enable_retries);
grpc_core::UniquePtr<char> service_config_json =
resolver_result.service_config_json();
@ -263,9 +336,11 @@ static bool process_resolver_result_locked(
gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
chand, service_config_json.get());
}
// Update channel state.
chand->retry_throttle_data = resolver_result.retry_throttle_data();
chand->method_params_table = resolver_result.method_params_table();
// Create service config setter to update channel state in the data
// plane combiner. Destroys itself when done.
grpc_core::New<grpc_core::ServiceConfigSetter>(
chand, resolver_result.retry_throttle_data(),
resolver_result.method_params_table());
// Swap out the data used by cc_get_channel_info().
gpr_mu_lock(&chand->info_mu);
chand->info_lb_policy_name = resolver_result.lb_policy_name();
@ -280,11 +355,6 @@ static bool process_resolver_result_locked(
// Return results.
*lb_policy_name = chand->info_lb_policy_name.get();
*lb_policy_config = resolver_result.lb_policy_config();
// Apply service config to queued picks.
for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
pick = pick->next) {
maybe_apply_service_config_to_call_locked(pick->elem);
}
return service_config_changed;
}
@ -342,12 +412,16 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
}
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
chand->disconnect_error = op->disconnect_with_error;
grpc_error* error = GRPC_ERROR_NONE;
GPR_ASSERT(chand->disconnect_error.CompareExchangeStrong(
&error, op->disconnect_with_error, grpc_core::MemoryOrder::ACQ_REL,
grpc_core::MemoryOrder::ACQUIRE));
grpc_pollset_set_del_pollset_set(
chand->resolving_lb_policy->interested_parties(),
chand->interested_parties);
chand->resolving_lb_policy.reset();
set_connectivity_state_and_picker_locked(
// Will delete itself.
grpc_core::New<grpc_core::ConnectivityStateAndPickerSetter>(
chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error),
"shutdown from API",
grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
@ -397,10 +471,12 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
// Initialize data members.
chand->data_plane_combiner = grpc_combiner_create();
chand->combiner = grpc_combiner_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
chand->disconnect_error = GRPC_ERROR_NONE;
chand->disconnect_error.Store(GRPC_ERROR_NONE,
grpc_core::MemoryOrder::RELAXED);
gpr_mu_init(&chand->info_mu);
gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
@ -511,8 +587,10 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
chand->method_params_table.reset();
grpc_client_channel_stop_backup_polling(chand->interested_parties);
grpc_pollset_set_destroy(chand->interested_parties);
GRPC_COMBINER_UNREF(chand->data_plane_combiner, "client_channel");
GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
GRPC_ERROR_UNREF(chand->disconnect_error);
GRPC_ERROR_UNREF(
chand->disconnect_error.Load(grpc_core::MemoryOrder::RELAXED));
grpc_connectivity_state_destroy(&chand->state_tracker);
gpr_mu_destroy(&chand->info_mu);
gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
@ -1261,7 +1339,7 @@ static void do_retry(grpc_call_element* elem,
}
// Schedule retry after computed delay.
GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
grpc_combiner_scheduler(chand->combiner));
grpc_combiner_scheduler(chand->data_plane_combiner));
grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
// Update bookkeeping.
if (retry_state != nullptr) retry_state->retry_dispatched = true;
@ -2488,7 +2566,7 @@ class QueuedPickCanceller {
auto* chand = static_cast<channel_data*>(elem->channel_data);
GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller");
GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
grpc_combiner_scheduler(chand->combiner));
grpc_combiner_scheduler(chand->data_plane_combiner));
grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_);
}
@ -2628,7 +2706,7 @@ static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// Apply service config data to the call only once, and only if the
// channel has the data available.
if (GPR_LIKELY(chand->have_service_config &&
if (GPR_LIKELY(chand->received_service_config_data &&
!calld->service_config_applied)) {
calld->service_config_applied = true;
apply_service_config_to_call_locked(elem);
@ -2676,7 +2754,7 @@ static void start_pick_locked(void* arg, grpc_error* error) {
.send_initial_metadata_flags;
// Apply service config to call if needed.
maybe_apply_service_config_to_call_locked(elem);
// When done, we schedule this closure to leave the channel combiner.
// When done, we schedule this closure to leave the data plane combiner.
GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
grpc_schedule_on_exec_ctx);
// Attempt pick.
@ -2691,12 +2769,14 @@ static void start_pick_locked(void* arg, grpc_error* error) {
grpc_error_string(error));
}
switch (pick_result) {
case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: {
// If we're shutting down, fail all RPCs.
if (chand->disconnect_error != GRPC_ERROR_NONE) {
grpc_error* disconnect_error =
chand->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
if (disconnect_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error);
GRPC_CLOSURE_SCHED(&calld->pick_closure,
GRPC_ERROR_REF(chand->disconnect_error));
GRPC_ERROR_REF(disconnect_error));
break;
}
// If wait_for_ready is false, then the error indicates the RPC
@ -2722,7 +2802,8 @@ static void start_pick_locked(void* arg, grpc_error* error) {
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
GRPC_ERROR_UNREF(error);
// Fallthrough
}
// Fallthrough
case LoadBalancingPolicy::PICK_QUEUE:
if (!calld->pick_queued) add_call_to_queued_picks_locked(elem);
break;
@ -2816,7 +2897,8 @@ static void cc_start_transport_stream_op_batch(
}
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
elem, grpc_combiner_scheduler(chand->combiner)),
elem,
grpc_combiner_scheduler(chand->data_plane_combiner)),
GRPC_ERROR_NONE);
} else {
// For all other batches, release the call combiner.

@ -140,10 +140,9 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
// the time this function returns, the pick will already have
// been processed, and we'll be trying to re-process the same
// pick again, leading to a crash.
// 2. In a subsequent PR, we will split the data plane and control
// plane synchronization into separate combiners, at which
// point this will need to hop from the data plane combiner into
// the control plane combiner.
// 2. We are currently running in the data plane combiner, but we
// need to bounce into the control plane combiner to call
// ExitIdleLocked().
if (!exit_idle_called_) {
exit_idle_called_ = true;
parent_->Ref().release(); // ref held by closure.

@ -234,12 +234,19 @@ class GrpcLb : public LoadBalancingPolicy {
// Returns the LB token to use for a drop, or null if the call
// should not be dropped.
// Intended to be called from picker, so calls will be externally
// synchronized.
//
// Note: This is called from the picker, so it will be invoked in
// the channel's data plane combiner, NOT the control plane
// combiner. It should not be accessed by any other part of the LB
// policy.
const char* ShouldDrop();
private:
grpc_grpclb_serverlist* serverlist_;
// Guarded by the channel's data plane combiner, NOT the control
// plane combiner. It should not be accessed by anything but the
// picker via the ShouldDrop() method.
size_t drop_index_ = 0;
};

Loading…
Cancel
Save