From 0b5ec20ce8961e55a5d9bfa882c7c2449854f58a Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 29 Mar 2019 13:55:51 -0700 Subject: [PATCH] Revert "Merge pull request #18578 from grpc/revert-18437-data_plane_combiner" This reverts commit a8477b421fc7192d7e968b3ca9f011cb4d003de5, reversing changes made to 65b25ce24db7a5b1648d8c1808f35d30c345ed2a. --- .../filters/client_channel/client_channel.cc | 230 ++++++++++++------ .../ext/filters/client_channel/lb_policy.cc | 7 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 11 +- 3 files changed, 168 insertions(+), 80 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 82ce253c83c..dea6e059693 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -100,49 +100,52 @@ struct QueuedPick { }; typedef struct client_channel_channel_data { + // + // Fields set at construction and never modified. + // bool deadline_checking_enabled; bool enable_retries; size_t per_rpc_retry_buffer_size; - - /** combiner protecting all variables below in this data structure */ - grpc_combiner* combiner; - /** owning stack */ grpc_channel_stack* owning_stack; - /** interested parties (owned) */ - grpc_pollset_set* interested_parties; - // Client channel factory. grpc_core::ClientChannelFactory* client_channel_factory; - // Subchannel pool. - grpc_core::RefCountedPtr subchannel_pool; grpc_core::channelz::ClientChannelNode* channelz_node; - // Resolving LB policy. - grpc_core::OrphanablePtr resolving_lb_policy; - // Subchannel picker from LB policy. + // + // Fields used in the data plane. Protected by data_plane_combiner. + // + grpc_combiner* data_plane_combiner; grpc_core::UniquePtr picker; - // Linked list of queued picks. - QueuedPick* queued_picks; - - bool have_service_config; - /** retry throttle data from service config */ + QueuedPick* queued_picks; // Linked list of queued picks. + // Data from service config. + bool received_service_config_data; grpc_core::RefCountedPtr retry_throttle_data; - /** per-method service config data */ grpc_core::RefCountedPtr method_params_table; - /* the following properties are guarded by a mutex since APIs require them - to be instantaneously available */ - gpr_mu info_mu; - grpc_core::UniquePtr info_lb_policy_name; - grpc_core::UniquePtr info_service_config_json; - + // + // Fields used in the control plane. Protected by combiner. + // + grpc_combiner* combiner; + grpc_pollset_set* interested_parties; + grpc_core::RefCountedPtr subchannel_pool; + grpc_core::OrphanablePtr resolving_lb_policy; grpc_connectivity_state_tracker state_tracker; - grpc_error* disconnect_error; - /* external_connectivity_watcher_list head is guarded by its own mutex, since - * counts need to be grabbed immediately without polling on a cq */ + // + // Fields accessed from both data plane and control plane combiners. + // + grpc_core::Atomic disconnect_error; + + // external_connectivity_watcher_list head is guarded by its own mutex, since + // counts need to be grabbed immediately without polling on a CQ. gpr_mu external_connectivity_watcher_list_mu; struct external_connectivity_watcher* external_connectivity_watcher_list_head; + + // The following properties are guarded by a mutex since APIs require them + // to be instantaneously available. + gpr_mu info_mu; + grpc_core::UniquePtr info_lb_policy_name; + grpc_core::UniquePtr info_service_config_json; } channel_data; // Forward declarations. @@ -166,30 +169,98 @@ static const char* get_channel_connectivity_state_change_string( GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -static void set_connectivity_state_and_picker_locked( - channel_data* chand, grpc_connectivity_state state, grpc_error* state_error, - const char* reason, - grpc_core::UniquePtr picker) { - // Update connectivity state. - grpc_connectivity_state_set(&chand->state_tracker, state, state_error, - reason); - if (chand->channelz_node != nullptr) { - chand->channelz_node->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string( - get_channel_connectivity_state_change_string(state))); +namespace grpc_core { +namespace { + +// A fire-and-forget class that sets the channel's connectivity state +// and then hops into the data plane combiner to update the picker. +// Must be instantiated while holding the control plane combiner. +// Deletes itself when done. +class ConnectivityStateAndPickerSetter { + public: + ConnectivityStateAndPickerSetter( + channel_data* chand, grpc_connectivity_state state, + grpc_error* state_error, const char* reason, + UniquePtr picker) + : chand_(chand), picker_(std::move(picker)) { + // Update connectivity state here, while holding control plane combiner. + grpc_connectivity_state_set(&chand->state_tracker, state, state_error, + reason); + if (chand->channelz_node != nullptr) { + chand->channelz_node->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string( + get_channel_connectivity_state_change_string(state))); + } + // Bounce into the data plane combiner to reset the picker. + GRPC_CHANNEL_STACK_REF(chand->owning_stack, + "ConnectivityStateAndPickerSetter"); + GRPC_CLOSURE_INIT(&closure_, SetPicker, this, + grpc_combiner_scheduler(chand->data_plane_combiner)); + GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); } - // Update picker. - chand->picker = std::move(picker); - // Re-process queued picks. - for (QueuedPick* pick = chand->queued_picks; pick != nullptr; - pick = pick->next) { - start_pick_locked(pick->elem, GRPC_ERROR_NONE); + + private: + static void SetPicker(void* arg, grpc_error* ignored) { + auto* self = static_cast(arg); + // Update picker. + self->chand_->picker = std::move(self->picker_); + // Re-process queued picks. + for (QueuedPick* pick = self->chand_->queued_picks; pick != nullptr; + pick = pick->next) { + start_pick_locked(pick->elem, GRPC_ERROR_NONE); + } + // Clean up. + GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack, + "ConnectivityStateAndPickerSetter"); + Delete(self); } -} -namespace grpc_core { -namespace { + channel_data* chand_; + UniquePtr picker_; + grpc_closure closure_; +}; + +// A fire-and-forget class that sets the channel's service config data +// in the data plane combiner. Deletes itself when done. +class ServiceConfigSetter { + public: + ServiceConfigSetter( + channel_data* chand, + RefCountedPtr retry_throttle_data, + RefCountedPtr method_params_table) + : chand_(chand), + retry_throttle_data_(std::move(retry_throttle_data)), + method_params_table_(std::move(method_params_table)) { + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "ServiceConfigSetter"); + GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this, + grpc_combiner_scheduler(chand->data_plane_combiner)); + GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); + } + + private: + static void SetServiceConfigData(void* arg, grpc_error* ignored) { + ServiceConfigSetter* self = static_cast(arg); + channel_data* chand = self->chand_; + // Update channel state. + chand->received_service_config_data = true; + chand->retry_throttle_data = std::move(self->retry_throttle_data_); + chand->method_params_table = std::move(self->method_params_table_); + // Apply service config to queued picks. + for (QueuedPick* pick = chand->queued_picks; pick != nullptr; + pick = pick->next) { + maybe_apply_service_config_to_call_locked(pick->elem); + } + // Clean up. + GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack, "ServiceConfigSetter"); + Delete(self); + } + + channel_data* chand_; + RefCountedPtr retry_throttle_data_; + RefCountedPtr method_params_table_; + grpc_closure closure_; +}; class ClientChannelControlHelper : public LoadBalancingPolicy::ChannelControlHelper { @@ -222,8 +293,10 @@ class ClientChannelControlHelper void UpdateState( grpc_connectivity_state state, grpc_error* state_error, UniquePtr picker) override { + grpc_error* disconnect_error = + chand_->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE); if (grpc_client_channel_routing_trace.enabled()) { - const char* extra = chand_->disconnect_error == GRPC_ERROR_NONE + const char* extra = disconnect_error == GRPC_ERROR_NONE ? "" : " (ignoring -- channel shutting down)"; gpr_log(GPR_INFO, "chand=%p: update: state=%s error=%s picker=%p%s", @@ -231,9 +304,10 @@ class ClientChannelControlHelper grpc_error_string(state_error), picker.get(), extra); } // Do update only if not shutting down. - if (chand_->disconnect_error == GRPC_ERROR_NONE) { - set_connectivity_state_and_picker_locked(chand_, state, state_error, - "helper", std::move(picker)); + if (disconnect_error == GRPC_ERROR_NONE) { + // Will delete itself. + New(chand_, state, state_error, + "helper", std::move(picker)); } else { GRPC_ERROR_UNREF(state_error); } @@ -255,7 +329,6 @@ static bool process_resolver_result_locked( void* arg, grpc_core::Resolver::Result* result, const char** lb_policy_name, grpc_core::RefCountedPtr* lb_policy_config) { channel_data* chand = static_cast(arg); - chand->have_service_config = true; ProcessedResolverResult resolver_result(result, chand->enable_retries); grpc_core::UniquePtr service_config_json = resolver_result.service_config_json(); @@ -263,9 +336,11 @@ static bool process_resolver_result_locked( gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", chand, service_config_json.get()); } - // Update channel state. - chand->retry_throttle_data = resolver_result.retry_throttle_data(); - chand->method_params_table = resolver_result.method_params_table(); + // Create service config setter to update channel state in the data + // plane combiner. Destroys itself when done. + grpc_core::New( + chand, resolver_result.retry_throttle_data(), + resolver_result.method_params_table()); // Swap out the data used by cc_get_channel_info(). gpr_mu_lock(&chand->info_mu); chand->info_lb_policy_name = resolver_result.lb_policy_name(); @@ -280,11 +355,6 @@ static bool process_resolver_result_locked( // Return results. *lb_policy_name = chand->info_lb_policy_name.get(); *lb_policy_config = resolver_result.lb_policy_config(); - // Apply service config to queued picks. - for (QueuedPick* pick = chand->queued_picks; pick != nullptr; - pick = pick->next) { - maybe_apply_service_config_to_call_locked(pick->elem); - } return service_config_changed; } @@ -342,12 +412,16 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { } if (op->disconnect_with_error != GRPC_ERROR_NONE) { - chand->disconnect_error = op->disconnect_with_error; + grpc_error* error = GRPC_ERROR_NONE; + GPR_ASSERT(chand->disconnect_error.CompareExchangeStrong( + &error, op->disconnect_with_error, grpc_core::MemoryOrder::ACQ_REL, + grpc_core::MemoryOrder::ACQUIRE)); grpc_pollset_set_del_pollset_set( chand->resolving_lb_policy->interested_parties(), chand->interested_parties); chand->resolving_lb_policy.reset(); - set_connectivity_state_and_picker_locked( + // Will delete itself. + grpc_core::New( chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), "shutdown from API", grpc_core::UniquePtr( @@ -397,10 +471,12 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. + chand->data_plane_combiner = grpc_combiner_create(); chand->combiner = grpc_combiner_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); - chand->disconnect_error = GRPC_ERROR_NONE; + chand->disconnect_error.Store(GRPC_ERROR_NONE, + grpc_core::MemoryOrder::RELAXED); gpr_mu_init(&chand->info_mu); gpr_mu_init(&chand->external_connectivity_watcher_list_mu); @@ -511,8 +587,10 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { chand->method_params_table.reset(); grpc_client_channel_stop_backup_polling(chand->interested_parties); grpc_pollset_set_destroy(chand->interested_parties); + GRPC_COMBINER_UNREF(chand->data_plane_combiner, "client_channel"); GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); - GRPC_ERROR_UNREF(chand->disconnect_error); + GRPC_ERROR_UNREF( + chand->disconnect_error.Load(grpc_core::MemoryOrder::RELAXED)); grpc_connectivity_state_destroy(&chand->state_tracker); gpr_mu_destroy(&chand->info_mu); gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); @@ -1261,7 +1339,7 @@ static void do_retry(grpc_call_element* elem, } // Schedule retry after computed delay. GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem, - grpc_combiner_scheduler(chand->combiner)); + grpc_combiner_scheduler(chand->data_plane_combiner)); grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure); // Update bookkeeping. if (retry_state != nullptr) retry_state->retry_dispatched = true; @@ -2488,7 +2566,7 @@ class QueuedPickCanceller { auto* chand = static_cast(elem->channel_data); GRPC_CALL_STACK_REF(calld->owning_call, "QueuedPickCanceller"); GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, - grpc_combiner_scheduler(chand->combiner)); + grpc_combiner_scheduler(chand->data_plane_combiner)); grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, &closure_); } @@ -2628,7 +2706,7 @@ static void maybe_apply_service_config_to_call_locked(grpc_call_element* elem) { call_data* calld = static_cast(elem->call_data); // Apply service config data to the call only once, and only if the // channel has the data available. - if (GPR_LIKELY(chand->have_service_config && + if (GPR_LIKELY(chand->received_service_config_data && !calld->service_config_applied)) { calld->service_config_applied = true; apply_service_config_to_call_locked(elem); @@ -2676,7 +2754,7 @@ static void start_pick_locked(void* arg, grpc_error* error) { .send_initial_metadata_flags; // Apply service config to call if needed. maybe_apply_service_config_to_call_locked(elem); - // When done, we schedule this closure to leave the channel combiner. + // When done, we schedule this closure to leave the data plane combiner. GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, grpc_schedule_on_exec_ctx); // Attempt pick. @@ -2691,12 +2769,14 @@ static void start_pick_locked(void* arg, grpc_error* error) { grpc_error_string(error)); } switch (pick_result) { - case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: + case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: { // If we're shutting down, fail all RPCs. - if (chand->disconnect_error != GRPC_ERROR_NONE) { + grpc_error* disconnect_error = + chand->disconnect_error.Load(grpc_core::MemoryOrder::ACQUIRE); + if (disconnect_error != GRPC_ERROR_NONE) { GRPC_ERROR_UNREF(error); GRPC_CLOSURE_SCHED(&calld->pick_closure, - GRPC_ERROR_REF(chand->disconnect_error)); + GRPC_ERROR_REF(disconnect_error)); break; } // If wait_for_ready is false, then the error indicates the RPC @@ -2722,7 +2802,8 @@ static void start_pick_locked(void* arg, grpc_error* error) { // If wait_for_ready is true, then queue to retry when we get a new // picker. GRPC_ERROR_UNREF(error); - // Fallthrough + } + // Fallthrough case LoadBalancingPolicy::PICK_QUEUE: if (!calld->pick_queued) add_call_to_queued_picks_locked(elem); break; @@ -2816,7 +2897,8 @@ static void cc_start_transport_stream_op_batch( } GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked, - elem, grpc_combiner_scheduler(chand->combiner)), + elem, + grpc_combiner_scheduler(chand->data_plane_combiner)), GRPC_ERROR_NONE); } else { // For all other batches, release the call combiner. diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index c8f8e82e5d7..6b657465891 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -140,10 +140,9 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( // the time this function returns, the pick will already have // been processed, and we'll be trying to re-process the same // pick again, leading to a crash. - // 2. In a subsequent PR, we will split the data plane and control - // plane synchronization into separate combiners, at which - // point this will need to hop from the data plane combiner into - // the control plane combiner. + // 2. We are currently running in the data plane combiner, but we + // need to bounce into the control plane combiner to call + // ExitIdleLocked(). if (!exit_idle_called_) { exit_idle_called_ = true; parent_->Ref().release(); // ref held by closure. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 02fe06c4557..5bf15aa8f7f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -234,12 +234,19 @@ class GrpcLb : public LoadBalancingPolicy { // Returns the LB token to use for a drop, or null if the call // should not be dropped. - // Intended to be called from picker, so calls will be externally - // synchronized. + // + // Note: This is called from the picker, so it will be invoked in + // the channel's data plane combiner, NOT the control plane + // combiner. It should not be accessed by any other part of the LB + // policy. const char* ShouldDrop(); private: grpc_grpclb_serverlist* serverlist_; + + // Guarded by the channel's data plane combiner, NOT the control + // plane combiner. It should not be accessed by anything but the + // picker via the ShouldDrop() method. size_t drop_index_ = 0; };