Revert "Split data plane and control plane into their own combiners."

pull/18578/head
Mark D. Roth 6 years ago committed by GitHub
parent a10d4825c1
commit 7e3b8d38ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 204
      src/core/ext/filters/client_channel/client_channel.cc
  2. 7
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 11
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -100,52 +100,49 @@ struct QueuedPick {
}; };
typedef struct client_channel_channel_data { typedef struct client_channel_channel_data {
//
// Fields set at construction and never modified.
//
bool deadline_checking_enabled; bool deadline_checking_enabled;
bool enable_retries; bool enable_retries;
size_t per_rpc_retry_buffer_size; size_t per_rpc_retry_buffer_size;
/** combiner protecting all variables below in this data structure */
grpc_combiner* combiner;
/** owning stack */
grpc_channel_stack* owning_stack; grpc_channel_stack* owning_stack;
/** interested parties (owned) */
grpc_pollset_set* interested_parties;
// Client channel factory.
grpc_core::ClientChannelFactory* client_channel_factory; grpc_core::ClientChannelFactory* client_channel_factory;
// Subchannel pool.
grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
grpc_core::channelz::ClientChannelNode* channelz_node; grpc_core::channelz::ClientChannelNode* channelz_node;
// // Resolving LB policy.
// Fields used in the data plane. Protected by data_plane_combiner. grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
// // Subchannel picker from LB policy.
grpc_combiner* data_plane_combiner;
grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker; grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker;
QueuedPick* queued_picks; // Linked list of queued picks. // Linked list of queued picks.
// Data from service config. QueuedPick* queued_picks;
bool received_service_config_data;
bool have_service_config;
/** retry throttle data from service config */
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
/** per-method service config data */
grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table; grpc_core::RefCountedPtr<ClientChannelMethodParamsTable> method_params_table;
// /* the following properties are guarded by a mutex since APIs require them
// Fields used in the control plane. Protected by combiner. to be instantaneously available */
// gpr_mu info_mu;
grpc_combiner* combiner; grpc_core::UniquePtr<char> info_lb_policy_name;
grpc_pollset_set* interested_parties; grpc_core::UniquePtr<char> info_service_config_json;
grpc_core::RefCountedPtr<grpc_core::SubchannelPoolInterface> subchannel_pool;
grpc_core::OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy;
grpc_connectivity_state_tracker state_tracker;
// grpc_connectivity_state_tracker state_tracker;
// Fields accessed from both data plane and control plane combiners. grpc_error* disconnect_error;
//
grpc_core::Atomic<grpc_error*> disconnect_error;
// external_connectivity_watcher_list head is guarded by its own mutex, since /* external_connectivity_watcher_list head is guarded by its own mutex, since
// counts need to be grabbed immediately without polling on a CQ. * counts need to be grabbed immediately without polling on a cq */
gpr_mu external_connectivity_watcher_list_mu; gpr_mu external_connectivity_watcher_list_mu;
struct external_connectivity_watcher* external_connectivity_watcher_list_head; struct external_connectivity_watcher* external_connectivity_watcher_list_head;
// The following properties are guarded by a mutex since APIs require them
// to be instantaneously available.
gpr_mu info_mu;
grpc_core::UniquePtr<char> info_lb_policy_name;
grpc_core::UniquePtr<char> info_service_config_json;
} channel_data; } channel_data;
// Forward declarations. // Forward declarations.
@ -169,98 +166,30 @@ static const char* get_channel_connectivity_state_change_string(
GPR_UNREACHABLE_CODE(return "UNKNOWN"); GPR_UNREACHABLE_CODE(return "UNKNOWN");
} }
namespace grpc_core { static void set_connectivity_state_and_picker_locked(
namespace { channel_data* chand, grpc_connectivity_state state, grpc_error* state_error,
const char* reason,
// A fire-and-forget class that sets the channel's connectivity state grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) {
// and then hops into the data plane combiner to update the picker. // Update connectivity state.
// Must be instantiated while holding the control plane combiner.
// Deletes itself when done.
class ConnectivityStateAndPickerSetter {
public:
ConnectivityStateAndPickerSetter(
channel_data* chand, grpc_connectivity_state state,
grpc_error* state_error, const char* reason,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
: chand_(chand), picker_(std::move(picker)) {
// Update connectivity state here, while holding control plane combiner.
grpc_connectivity_state_set(&chand->state_tracker, state, state_error, grpc_connectivity_state_set(&chand->state_tracker, state, state_error,
reason); reason);
if (chand->channelz_node != nullptr) { if (chand->channelz_node != nullptr) {
chand->channelz_node->AddTraceEvent( chand->channelz_node->AddTraceEvent(
channelz::ChannelTrace::Severity::Info, grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string( grpc_slice_from_static_string(
get_channel_connectivity_state_change_string(state))); get_channel_connectivity_state_change_string(state)));
} }
// Bounce into the data plane combiner to reset the picker.
GRPC_CHANNEL_STACK_REF(chand->owning_stack,
"ConnectivityStateAndPickerSetter");
GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
grpc_combiner_scheduler(chand->data_plane_combiner));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
private:
static void SetPicker(void* arg, grpc_error* ignored) {
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
// Update picker. // Update picker.
self->chand_->picker = std::move(self->picker_); chand->picker = std::move(picker);
// Re-process queued picks. // Re-process queued picks.
for (QueuedPick* pick = self->chand_->queued_picks; pick != nullptr;
pick = pick->next) {
start_pick_locked(pick->elem, GRPC_ERROR_NONE);
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack,
"ConnectivityStateAndPickerSetter");
Delete(self);
}
channel_data* chand_;
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
grpc_closure closure_;
};
// A fire-and-forget class that sets the channel's service config data
// in the data plane combiner. Deletes itself when done.
class ServiceConfigSetter {
public:
ServiceConfigSetter(
channel_data* chand,
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
RefCountedPtr<ClientChannelMethodParamsTable> method_params_table)
: chand_(chand),
retry_throttle_data_(std::move(retry_throttle_data)),
method_params_table_(std::move(method_params_table)) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "ServiceConfigSetter");
GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
grpc_combiner_scheduler(chand->data_plane_combiner));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
private:
static void SetServiceConfigData(void* arg, grpc_error* ignored) {
ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
channel_data* chand = self->chand_;
// Update channel state.
chand->received_service_config_data = true;
chand->retry_throttle_data = std::move(self->retry_throttle_data_);
chand->method_params_table = std::move(self->method_params_table_);
// Apply service config to queued picks.
for (QueuedPick* pick = chand->queued_picks; pick != nullptr; for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
pick = pick->next) { pick = pick->next) {
maybe_apply_service_config_to_call_locked(pick->elem); start_pick_locked(pick->elem, GRPC_ERROR_NONE);
} }
// Clean up.
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack, "ServiceConfigSetter");
Delete(self);
} }
channel_data* chand_; namespace grpc_core {
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_; namespace {
RefCountedPtr<ClientChannelMethodParamsTable> method_params_table_;
grpc_closure closure_;
};
class ClientChannelControlHelper class ClientChannelControlHelper
: public LoadBalancingPolicy::ChannelControlHelper { : public LoadBalancingPolicy::ChannelControlHelper {
@ -293,10 +222,8 @@ class ClientChannelControlHelper
void UpdateState( void UpdateState(
grpc_connectivity_state state, grpc_error* state_error, grpc_connectivity_state state, grpc_error* state_error,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override { UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
grpc_error* disconnect_error =
chand_->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
if (grpc_client_channel_routing_trace.enabled()) { if (grpc_client_channel_routing_trace.enabled()) {
const char* extra = disconnect_error == GRPC_ERROR_NONE const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE
? "" ? ""
: " (ignoring -- channel shutting down)"; : " (ignoring -- channel shutting down)";
gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s", gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s",
@ -304,9 +231,8 @@ class ClientChannelControlHelper
grpc_error_string(state_error), picker.get(), extra); grpc_error_string(state_error), picker.get(), extra);
} }
// Do update only if not shutting down. // Do update only if not shutting down.
if (disconnect_error == GRPC_ERROR_NONE) { if (chand_->disconnect_error == GRPC_ERROR_NONE) {
// Will delete itself. set_connectivity_state_and_picker_locked(chand_, state, state_error,
New<ConnectivityStateAndPickerSetter>(chand_, state, state_error,
"helper", std::move(picker)); "helper", std::move(picker));
} else { } else {
GRPC_ERROR_UNREF(state_error); GRPC_ERROR_UNREF(state_error);
@ -329,6 +255,7 @@ static bool process_resolver_result_locked(
void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name, void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name,
grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) { grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
channel_data* chand = static_cast<channel_data*>(arg); channel_data* chand = static_cast<channel_data*>(arg);
chand->have_service_config = true;
ProcessedResolverResult resolver_result(result, chand->enable_retries); ProcessedResolverResult resolver_result(result, chand->enable_retries);
grpc_core::UniquePtr<char> service_config_json = grpc_core::UniquePtr<char> service_config_json =
resolver_result.service_config_json(); resolver_result.service_config_json();
@ -336,11 +263,9 @@ static bool process_resolver_result_locked(
gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
chand, service_config_json.get()); chand, service_config_json.get());
} }
// Create service config setter to update channel state in the data // Update channel state.
// plane combiner. Destroys itself when done. chand->retry_throttle_data = resolver_result.retry_throttle_data();
grpc_core::New<grpc_core::ServiceConfigSetter>( chand->method_params_table = resolver_result.method_params_table();
chand, resolver_result.retry_throttle_data(),
resolver_result.method_params_table());
// Swap out the data used by cc_get_channel_info(). // Swap out the data used by cc_get_channel_info().
gpr_mu_lock(&chand->info_mu); gpr_mu_lock(&chand->info_mu);
chand->info_lb_policy_name = resolver_result.lb_policy_name(); chand->info_lb_policy_name = resolver_result.lb_policy_name();
@ -355,6 +280,11 @@ static bool process_resolver_result_locked(
// Return results. // Return results.
*lb_policy_name = chand->info_lb_policy_name.get(); *lb_policy_name = chand->info_lb_policy_name.get();
*lb_policy_config = resolver_result.lb_policy_config(); *lb_policy_config = resolver_result.lb_policy_config();
// Apply service config to queued picks.
for (QueuedPick* pick = chand->queued_picks; pick != nullptr;
pick = pick->next) {
maybe_apply_service_config_to_call_locked(pick->elem);
}
return service_config_changed; return service_config_changed;
} }
@ -412,16 +342,12 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
} }
if (op->disconnect_with_error != GRPC_ERROR_NONE) { if (op->disconnect_with_error != GRPC_ERROR_NONE) {
grpc_error* error = GRPC_ERROR_NONE; chand->disconnect_error = op->disconnect_with_error;
GPR_ASSERT(chand->disconnect_error.CompareExchangeStrong(
&error, op->disconnect_with_error, grpc_core::MemoryOrder::ACQ_REL,
grpc_core::MemoryOrder::ACQUIRE));
grpc_pollset_set_del_pollset_set( grpc_pollset_set_del_pollset_set(
chand->resolving_lb_policy->interested_parties(), chand->resolving_lb_policy->interested_parties(),
chand->interested_parties); chand->interested_parties);
chand->resolving_lb_policy.reset(); chand->resolving_lb_policy.reset();
// Will delete itself. set_connectivity_state_and_picker_locked(
grpc_core::New<grpc_core::ConnectivityStateAndPickerSetter>(
chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error),
"shutdown from API", "shutdown from API",
grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>( grpc_core::UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
@ -471,12 +397,10 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
GPR_ASSERT(args->is_last); GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
// Initialize data members. // Initialize data members.
chand->data_plane_combiner = grpc_combiner_create();
chand->combiner = grpc_combiner_create(); chand->combiner = grpc_combiner_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel"); "client_channel");
chand->disconnect_error.Store(GRPC_ERROR_NONE, chand->disconnect_error = GRPC_ERROR_NONE;
grpc_core::MemoryOrder::RELAXED);
gpr_mu_init(&chand->info_mu); gpr_mu_init(&chand->info_mu);
gpr_mu_init(&chand->external_connectivity_watcher_list_mu); gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
@ -587,10 +511,8 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
chand->method_params_table.reset(); chand->method_params_table.reset();
grpc_client_channel_stop_backup_polling(chand->interested_parties); grpc_client_channel_stop_backup_polling(chand->interested_parties);
grpc_pollset_set_destroy(chand->interested_parties); grpc_pollset_set_destroy(chand->interested_parties);
GRPC_COMBINER_UNREF(chand->data_plane_combiner, "client_channel");
GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
GRPC_ERROR_UNREF( GRPC_ERROR_UNREF(chand->disconnect_error);
chand->disconnect_error.Load(grpc_core::MemoryOrder::RELAXED));
grpc_connectivity_state_destroy(&chand->state_tracker); grpc_connectivity_state_destroy(&chand->state_tracker);
gpr_mu_destroy(&chand->info_mu); gpr_mu_destroy(&chand->info_mu);
gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
@ -1339,7 +1261,7 @@ static void do_retry(grpc_call_element* elem,
} }
// Schedule retry after computed delay. // Schedule retry after computed delay.
GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem, GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
grpc_combiner_scheduler(chand->data_plane_combiner)); grpc_combiner_scheduler(chand->combiner));
grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure); grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
// Update bookkeeping. // Update bookkeeping.
if (retry_state != nullptr) retry_state->retry_dispatched = true; if (retry_state != nullptr) retry_state->retry_dispatched = true;
@ -2566,7 +2488,7 @@ class QueuedPickCanceller {
auto* chand = static_cast<channel_data*>(elem->channel_data); auto* chand = static_cast<channel_data*>(elem->channel_data);
GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller"); GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller");
GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
grpc_combiner_scheduler(chand->data_plane_combiner)); grpc_combiner_scheduler(chand->combiner));
grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_); grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_);
} }
@ -2706,7 +2628,7 @@ static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) {
call_data* calld = static_cast<call_data*>(elem->call_data); call_data* calld = static_cast<call_data*>(elem->call_data);
// Apply service config data to the call only once, and only if the // Apply service config data to the call only once, and only if the
// channel has the data available. // channel has the data available.
if (GPR_LIKELY(chand->received_service_config_data && if (GPR_LIKELY(chand->have_service_config &&
!calld->service_config_applied)) { !calld->service_config_applied)) {
calld->service_config_applied = true; calld->service_config_applied = true;
apply_service_config_to_call_locked(elem); apply_service_config_to_call_locked(elem);
@ -2754,7 +2676,7 @@ static void start_pick_locked(void* arg, grpc_error* error) {
.send_initial_metadata_flags; .send_initial_metadata_flags;
// Apply service config to call if needed. // Apply service config to call if needed.
maybe_apply_service_config_to_call_locked(elem); maybe_apply_service_config_to_call_locked(elem);
// When done, we schedule this closure to leave the data plane combiner. // When done, we schedule this closure to leave the channel combiner.
GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
// Attempt pick. // Attempt pick.
@ -2769,14 +2691,12 @@ static void start_pick_locked(void* arg, grpc_error* error) {
grpc_error_string(error)); grpc_error_string(error));
} }
switch (pick_result) { switch (pick_result) {
case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: { case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
// If we're shutting down, fail all RPCs. // If we're shutting down, fail all RPCs.
grpc_error* disconnect_error = if (chand->disconnect_error != GRPC_ERROR_NONE) {
chand->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE);
if (disconnect_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
GRPC_CLOSURE_SCHED(&calld->pick_closure, GRPC_CLOSURE_SCHED(&calld->pick_closure,
GRPC_ERROR_REF(disconnect_error)); GRPC_ERROR_REF(chand->disconnect_error));
break; break;
} }
// If wait_for_ready is false, then the error indicates the RPC // If wait_for_ready is false, then the error indicates the RPC
@ -2802,7 +2722,6 @@ static void start_pick_locked(void* arg, grpc_error* error) {
// If wait_for_ready is true, then queue to retry when we get a new // If wait_for_ready is true, then queue to retry when we get a new
// picker. // picker.
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
}
// Fallthrough // Fallthrough
case LoadBalancingPolicy::PICK_QUEUE: case LoadBalancingPolicy::PICK_QUEUE:
if (!calld->pick_queued) add_call_to_queued_picks_locked(elem); if (!calld->pick_queued) add_call_to_queued_picks_locked(elem);
@ -2897,8 +2816,7 @@ static void cc_start_transport_stream_op_batch(
} }
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked, GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
elem, elem, grpc_combiner_scheduler(chand->combiner)),
grpc_combiner_scheduler(chand->data_plane_combiner)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} else { } else {
// For all other batches, release the call combiner. // For all other batches, release the call combiner.

@ -140,9 +140,10 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
// the time this function returns, the pick will already have // the time this function returns, the pick will already have
// been processed, and we'll be trying to re-process the same // been processed, and we'll be trying to re-process the same
// pick again, leading to a crash. // pick again, leading to a crash.
// 2. We are currently running in the data plane combiner, but we // 2. In a subsequent PR, we will split the data plane and control
// need to bounce into the control plane combiner to call // plane synchronization into separate combiners, at which
// ExitIdleLocked(). // point this will need to hop from the data plane combiner into
// the control plane combiner.
if (!exit_idle_called_) { if (!exit_idle_called_) {
exit_idle_called_ = true; exit_idle_called_ = true;
parent_->Ref().release(); // ref held by closure. parent_->Ref().release(); // ref held by closure.

@ -234,19 +234,12 @@ class GrpcLb : public LoadBalancingPolicy {
// Returns the LB token to use for a drop, or null if the call // Returns the LB token to use for a drop, or null if the call
// should not be dropped. // should not be dropped.
// // Intended to be called from picker, so calls will be externally
// Note: This is called from the picker, so it will be invoked in // synchronized.
// the channel's data plane combiner, NOT the control plane
// combiner. It should not be accessed by any other part of the LB
// policy.
const char* ShouldDrop(); const char* ShouldDrop();
private: private:
grpc_grpclb_serverlist* serverlist_; grpc_grpclb_serverlist* serverlist_;
// Guarded by the channel's data plane combiner, NOT the control
// plane combiner. It should not be accessed by anything but the
// picker via the ShouldDrop() method.
size_t drop_index_ = 0; size_t drop_index_ = 0;
}; };

Loading…
Cancel
Save