diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index c0586d459b2..5717d3e66d2 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -105,7 +105,6 @@ namespace { class ChannelData { public: struct QueuedPick { - LoadBalancingPolicy::PickArgs pick; grpc_call_element* elem; QueuedPick* next = nullptr; }; @@ -223,7 +222,7 @@ class ChannelData { static bool ProcessResolverResultLocked( void* arg, Resolver::Result* result, const char** lb_policy_name, - RefCountedPtr* lb_policy_config, + RefCountedPtr* lb_policy_config, grpc_error** service_config_error); grpc_error* DoPingLocked(grpc_transport_op* op); @@ -236,7 +235,7 @@ class ChannelData { const Resolver::Result& resolver_result, const internal::ClientChannelGlobalParsedConfig* parsed_service_config, UniquePtr* lb_policy_name, - RefCountedPtr* lb_policy_config); + RefCountedPtr* lb_policy_config); // // Fields set at construction and never modified. @@ -314,6 +313,16 @@ class CallData { private: class QueuedPickCanceller; + class LbCallState : public LoadBalancingPolicy::CallState { + public: + explicit LbCallState(CallData* calld) : calld_(calld) {} + + void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); } + + private: + CallData* calld_; + }; + // State used for starting a retryable batch on a subchannel call. // This provides its own grpc_transport_stream_op_batch and other data // structures needed to populate the ops in the batch. @@ -449,8 +458,9 @@ class CallData { grpc_call_element* elem, SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state); - static void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( - const LoadBalancingPolicy::PickArgs& pick, + static void RecvTrailingMetadataReadyForLoadBalancingPolicy( + void* arg, grpc_error* error); + void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( grpc_transport_stream_op_batch* batch); // Returns the index into pending_batches_ to be used for batch. @@ -640,8 +650,19 @@ class CallData { bool pick_queued_ = false; bool service_config_applied_ = false; QueuedPickCanceller* pick_canceller_ = nullptr; + LbCallState lb_call_state_; + RefCountedPtr connected_subchannel_; + void (*lb_recv_trailing_metadata_ready_)( + void* user_data, grpc_metadata_batch* recv_trailing_metadata, + LoadBalancingPolicy::CallState* call_state) = nullptr; + void* lb_recv_trailing_metadata_ready_user_data_ = nullptr; grpc_closure pick_closure_; + // For intercepting recv_trailing_metadata_ready for the LB policy. + grpc_metadata_batch* recv_trailing_metadata_ = nullptr; + grpc_closure recv_trailing_metadata_ready_; + grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; + grpc_polling_entity* pollent_ = nullptr; // Batches are added to this list when received from above. @@ -1143,7 +1164,7 @@ void ChannelData::ProcessLbPolicy( const Resolver::Result& resolver_result, const internal::ClientChannelGlobalParsedConfig* parsed_service_config, UniquePtr* lb_policy_name, - RefCountedPtr* lb_policy_config) { + RefCountedPtr* lb_policy_config) { // Prefer the LB policy name found in the service config. if (parsed_service_config != nullptr && parsed_service_config->parsed_lb_config() != nullptr) { @@ -1191,7 +1212,7 @@ void ChannelData::ProcessLbPolicy( // resolver result update. bool ChannelData::ProcessResolverResultLocked( void* arg, Resolver::Result* result, const char** lb_policy_name, - RefCountedPtr* lb_policy_config, + RefCountedPtr* lb_policy_config, grpc_error** service_config_error) { ChannelData* chand = static_cast(arg); RefCountedPtr service_config; @@ -1312,19 +1333,18 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) { if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected"); } - LoadBalancingPolicy::PickArgs pick; - grpc_error* error = GRPC_ERROR_NONE; - picker_->Pick(&pick, &error); - if (pick.connected_subchannel != nullptr) { - pick.connected_subchannel->Ping(op->send_ping.on_initiate, - op->send_ping.on_ack); + LoadBalancingPolicy::PickResult result = + picker_->Pick(LoadBalancingPolicy::PickArgs()); + if (result.connected_subchannel != nullptr) { + result.connected_subchannel->Ping(op->send_ping.on_initiate, + op->send_ping.on_ack); } else { - if (error == GRPC_ERROR_NONE) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + if (result.error == GRPC_ERROR_NONE) { + result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "LB policy dropped call on ping"); } } - return error; + return result.error; } void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) { @@ -1505,6 +1525,7 @@ CallData::CallData(grpc_call_element* elem, const ChannelData& chand, owning_call_(args.call_stack), call_combiner_(args.call_combiner), call_context_(args.context), + lb_call_state_(this), pending_send_initial_metadata_(false), pending_send_message_(false), pending_send_trailing_metadata_(false), @@ -1737,18 +1758,30 @@ void CallData::FreeCachedSendOpDataForCompletedBatch( // LB recv_trailing_metadata_ready handling // +void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy( + void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + // Invoke callback to LB policy. + calld->lb_recv_trailing_metadata_ready_( + calld->lb_recv_trailing_metadata_ready_user_data_, + calld->recv_trailing_metadata_, &calld->lb_call_state_); + // Chain to original callback. + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_, + GRPC_ERROR_REF(error)); +} + void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( - const LoadBalancingPolicy::PickArgs& pick, grpc_transport_stream_op_batch* batch) { - if (pick.recv_trailing_metadata_ready != nullptr) { - *pick.original_recv_trailing_metadata_ready = + if (lb_recv_trailing_metadata_ready_ != nullptr) { + recv_trailing_metadata_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + original_recv_trailing_metadata_ready_ = batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, + RecvTrailingMetadataReadyForLoadBalancingPolicy, this, + grpc_schedule_on_exec_ctx); batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - pick.recv_trailing_metadata_ready; - if (pick.recv_trailing_metadata != nullptr) { - *pick.recv_trailing_metadata = - batch->payload->recv_trailing_metadata.recv_trailing_metadata; - } + &recv_trailing_metadata_ready_; } } @@ -1894,8 +1927,7 @@ void CallData::PendingBatchesFail( grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { if (batch->recv_trailing_metadata) { - MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick, - batch); + MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch); } batch->handler_private.extra_arg = this; GRPC_CLOSURE_INIT(&batch->handler_private.closure, @@ -1949,8 +1981,7 @@ void CallData::PendingBatchesResume(grpc_call_element* elem) { grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { if (batch->recv_trailing_metadata) { - MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick, - batch); + MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch); } batch->handler_private.extra_arg = subchannel_call_.get(); GRPC_CLOSURE_INIT(&batch->handler_private.closure, @@ -2011,7 +2042,7 @@ void CallData::DoRetry(grpc_call_element* elem, GPR_ASSERT(retry_policy != nullptr); // Reset subchannel call and connected subchannel. subchannel_call_.reset(); - pick_.pick.connected_subchannel.reset(); + connected_subchannel_.reset(); // Compute backoff delay. grpc_millis next_attempt_time; if (server_pushback_ms >= 0) { @@ -2868,7 +2899,7 @@ void CallData::AddRetriableRecvTrailingMetadataOp( .recv_trailing_metadata_ready = &retry_state->recv_trailing_metadata_ready; MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( - pick_.pick, &batch_data->batch); + &batch_data->batch); } void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) { @@ -3135,8 +3166,7 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) { // need to use a separate call context for each subchannel call. call_context_, call_combiner_, parent_data_size}; grpc_error* error = GRPC_ERROR_NONE; - subchannel_call_ = - pick_.pick.connected_subchannel->CreateCall(call_args, &error); + subchannel_call_ = connected_subchannel_->CreateCall(call_args, &error); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, this, subchannel_call_.get(), grpc_error_string(error)); @@ -3297,13 +3327,14 @@ void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) { } } -const char* PickResultName(LoadBalancingPolicy::PickResult result) { - switch (result) { - case LoadBalancingPolicy::PICK_COMPLETE: +const char* PickResultTypeName( + LoadBalancingPolicy::PickResult::ResultType type) { + switch (type) { + case LoadBalancingPolicy::PickResult::PICK_COMPLETE: return "COMPLETE"; - case LoadBalancingPolicy::PICK_QUEUE: + case LoadBalancingPolicy::PickResult::PICK_QUEUE: return "QUEUE"; - case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: + case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: return "TRANSIENT_FAILURE"; } GPR_UNREACHABLE_CODE(return "UNKNOWN"); @@ -3313,8 +3344,10 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); CallData* calld = static_cast(elem->call_data); ChannelData* chand = static_cast(elem->channel_data); - GPR_ASSERT(calld->pick_.pick.connected_subchannel == nullptr); + GPR_ASSERT(calld->connected_subchannel_ == nullptr); GPR_ASSERT(calld->subchannel_call_ == nullptr); + // Apply service config to call if needed. + calld->MaybeApplyServiceConfigToCallLocked(elem); // If this is a retry, use the send_initial_metadata payload that // we've cached; otherwise, use the pending batch. The // send_initial_metadata batch will be the first pending batch in the @@ -3325,58 +3358,58 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { // allocate the subchannel batch earlier so that we can give the // subchannel's copy of the metadata batch (which is copied for each // attempt) to the LB policy instead the one from the parent channel. - calld->pick_.pick.initial_metadata = + LoadBalancingPolicy::PickArgs pick_args; + pick_args.call_state = &calld->lb_call_state_; + pick_args.initial_metadata = calld->seen_send_initial_metadata_ ? &calld->send_initial_metadata_ : calld->pending_batches_[0] .batch->payload->send_initial_metadata.send_initial_metadata; - uint32_t* send_initial_metadata_flags = + // Grab initial metadata flags so that we can check later if the call has + // wait_for_ready enabled. + const uint32_t send_initial_metadata_flags = calld->seen_send_initial_metadata_ - ? &calld->send_initial_metadata_flags_ - : &calld->pending_batches_[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - // Apply service config to call if needed. - calld->MaybeApplyServiceConfigToCallLocked(elem); + ? calld->send_initial_metadata_flags_ + : calld->pending_batches_[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; // When done, we schedule this closure to leave the data plane combiner. GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx); // Attempt pick. - error = GRPC_ERROR_NONE; - auto pick_result = chand->picker()->Pick(&calld->pick_.pick, &error); + auto result = chand->picker()->Pick(pick_args); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, " "error=%s)", - chand, calld, PickResultName(pick_result), - calld->pick_.pick.connected_subchannel.get(), - grpc_error_string(error)); + chand, calld, PickResultTypeName(result.type), + result.connected_subchannel.get(), grpc_error_string(result.error)); } - switch (pick_result) { - case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: { + switch (result.type) { + case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: { // If we're shutting down, fail all RPCs. grpc_error* disconnect_error = chand->disconnect_error(); if (disconnect_error != GRPC_ERROR_NONE) { - GRPC_ERROR_UNREF(error); + GRPC_ERROR_UNREF(result.error); GRPC_CLOSURE_SCHED(&calld->pick_closure_, GRPC_ERROR_REF(disconnect_error)); break; } // If wait_for_ready is false, then the error indicates the RPC // attempt's final status. - if ((*send_initial_metadata_flags & + if ((send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { // Retry if appropriate; otherwise, fail. grpc_status_code status = GRPC_STATUS_OK; - grpc_error_get_status(error, calld->deadline_, &status, nullptr, + grpc_error_get_status(result.error, calld->deadline_, &status, nullptr, nullptr, nullptr); if (!calld->enable_retries_ || !calld->MaybeRetry(elem, nullptr /* batch_data */, status, nullptr /* server_pushback_md */)) { grpc_error* new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to pick subchannel", &error, 1); - GRPC_ERROR_UNREF(error); + "Failed to pick subchannel", &result.error, 1); + GRPC_ERROR_UNREF(result.error); GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error); } if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem); @@ -3384,19 +3417,24 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) { } // If wait_for_ready is true, then queue to retry when we get a new // picker. - GRPC_ERROR_UNREF(error); + GRPC_ERROR_UNREF(result.error); } // Fallthrough - case LoadBalancingPolicy::PICK_QUEUE: + case LoadBalancingPolicy::PickResult::PICK_QUEUE: if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem); break; default: // PICK_COMPLETE // Handle drops. - if (GPR_UNLIKELY(calld->pick_.pick.connected_subchannel == nullptr)) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) { + result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); } - GRPC_CLOSURE_SCHED(&calld->pick_closure_, error); + calld->connected_subchannel_ = std::move(result.connected_subchannel); + calld->lb_recv_trailing_metadata_ready_ = + result.recv_trailing_metadata_ready; + calld->lb_recv_trailing_metadata_ready_user_data_ = + result.recv_trailing_metadata_ready_user_data; + GRPC_CLOSURE_SCHED(&calld->pick_closure_, result.error); if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem); } } diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 6fa799343ca..3e4d3703c82 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -105,7 +105,7 @@ LoadBalancingPolicy::UpdateArgs& LoadBalancingPolicy::UpdateArgs::operator=( // LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( - PickArgs* pick, grpc_error** error) { + PickArgs args) { // We invoke the parent's ExitIdleLocked() via a closure instead // of doing it directly here, for two reasons: // 1. ExitIdleLocked() may cause the policy's state to change and @@ -125,7 +125,9 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( grpc_combiner_scheduler(parent_->combiner())), GRPC_ERROR_NONE); } - return PICK_QUEUE; + PickResult result; + result.type = PickResult::PICK_QUEUE; + return result; } void LoadBalancingPolicy::QueuePicker::CallExitIdle(void* arg, @@ -135,4 +137,16 @@ void LoadBalancingPolicy::QueuePicker::CallExitIdle(void* arg, parent->Unref(); } +// +// LoadBalancingPolicy::TransientFailurePicker +// + +LoadBalancingPolicy::PickResult +LoadBalancingPolicy::TransientFailurePicker::Pick(PickArgs args) { + PickResult result; + result.type = PickResult::PICK_TRANSIENT_FAILURE; + result.error = GRPC_ERROR_REF(error_); + return result; +} + } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 2ac7df63b7d..5920254a9ef 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -32,21 +32,9 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" -extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount; - namespace grpc_core { -/// Interface for parsed forms of load balancing configs found in a service -/// config. -class ParsedLoadBalancingConfig : public RefCounted { - public: - virtual ~ParsedLoadBalancingConfig() = default; - - // Returns the load balancing policy name - virtual const char* name() const GRPC_ABSTRACT; - - GRPC_ABSTRACT_BASE_CLASS; -}; +extern DebugOnlyTraceFlag grpc_trace_lb_policy_refcount; /// Interface for load balancing policies. /// @@ -89,66 +77,77 @@ class ParsedLoadBalancingConfig : public RefCounted { // interested_parties() hooks from the API. class LoadBalancingPolicy : public InternallyRefCounted { public: + /// Interface for accessing per-call state. + class CallState { + public: + CallState() = default; + virtual ~CallState() = default; + + /// Allocates memory associated with the call, which will be + /// automatically freed when the call is complete. + /// It is more efficient to use this than to allocate memory directly + /// for allocations that need to be made on a per-call basis. + virtual void* Alloc(size_t size) GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS + }; + /// Arguments used when picking a subchannel for an RPC. struct PickArgs { - /// - /// Input parameters. - /// /// Initial metadata associated with the picking call. /// The LB policy may use the existing metadata to influence its routing /// decision, and it may add new metadata elements to be sent with the /// call to the chosen backend. // TODO(roth): Provide a more generic metadata API here. grpc_metadata_batch* initial_metadata = nullptr; - /// Storage for LB token in \a initial_metadata, or nullptr if not used. - // TODO(roth): Remove this from the API. Maybe have the LB policy - // allocate this on the arena instead? - grpc_linked_mdelem lb_token_mdelem_storage; - /// - /// Output parameters. - /// - /// Will be set to the selected subchannel, or nullptr on failure or when - /// the LB policy decides to drop the call. - RefCountedPtr connected_subchannel; - /// Callback set by lb policy to be notified of trailing metadata. - /// The callback must be scheduled on grpc_schedule_on_exec_ctx. - // TODO(roth): Provide a cleaner callback API. - grpc_closure* recv_trailing_metadata_ready = nullptr; - /// The address that will be set to point to the original - /// recv_trailing_metadata_ready callback, to be invoked by the LB - /// policy's recv_trailing_metadata_ready callback when complete. - /// Must be non-null if recv_trailing_metadata_ready is non-null. - // TODO(roth): Consider making the recv_trailing_metadata closure a - // synchronous callback, in which case it is not responsible for - // chaining to the next callback, so this can be removed from the API. - grpc_closure** original_recv_trailing_metadata_ready = nullptr; - /// If this is not nullptr, then the client channel will point it to the - /// call's trailing metadata before invoking recv_trailing_metadata_ready. - /// If this is nullptr, then the callback will still be called. - /// The lb does not have ownership of the metadata. - // TODO(roth): If we make this a synchronous callback, then this can - // be passed to the callback as a parameter and can be removed from - // the API here. - grpc_metadata_batch** recv_trailing_metadata = nullptr; + /// An interface for accessing call state. Can be used to allocate + /// data associated with the call in an efficient way. + CallState* call_state; }; /// The result of picking a subchannel for an RPC. - enum PickResult { - // Pick complete. If connected_subchannel is non-null, client channel - // can immediately proceed with the call on connected_subchannel; - // otherwise, call should be dropped. - PICK_COMPLETE, - // Pick cannot be completed until something changes on the control - // plane. Client channel will queue the pick and try again the - // next time the picker is updated. - PICK_QUEUE, - // LB policy is in transient failure. If the pick is wait_for_ready, - // client channel will wait for the next picker and try again; - // otherwise, the call will be failed immediately (although it may - // be retried if the client channel is configured to do so). - // The Pick() method will set its error parameter if this value is - // returned. - PICK_TRANSIENT_FAILURE, + struct PickResult { + enum ResultType { + /// Pick complete. If connected_subchannel is non-null, client channel + /// can immediately proceed with the call on connected_subchannel; + /// otherwise, call should be dropped. + PICK_COMPLETE, + /// Pick cannot be completed until something changes on the control + /// plane. Client channel will queue the pick and try again the + /// next time the picker is updated. + PICK_QUEUE, + /// LB policy is in transient failure. If the pick is wait_for_ready, + /// client channel will wait for the next picker and try again; + /// otherwise, the call will be failed immediately (although it may + /// be retried if the client channel is configured to do so). + /// The Pick() method will set its error parameter if this value is + /// returned. + PICK_TRANSIENT_FAILURE, + }; + ResultType type; + + /// Used only if type is PICK_COMPLETE. Will be set to the selected + /// subchannel, or nullptr if the LB policy decides to drop the call. + RefCountedPtr connected_subchannel; + + /// Used only if type is PICK_TRANSIENT_FAILURE. + /// Error to be set when returning a transient failure. + // TODO(roth): Replace this with something similar to grpc::Status, + // so that we don't expose grpc_error to this API. + grpc_error* error = GRPC_ERROR_NONE; + + /// Used only if type is PICK_COMPLETE. + /// Callback set by lb policy to be notified of trailing metadata. + /// The user_data argument will be set to the + /// recv_trailing_metadata_ready_user_data field. + /// recv_trailing_metadata will be set to the metadata, which may be + /// modified by the callback. The callback does not take ownership, + /// however, so any data that needs to be used after returning must + /// be copied. + void (*recv_trailing_metadata_ready)( + void* user_data, grpc_metadata_batch* recv_trailing_metadata, + CallState* call_state) = nullptr; + void* recv_trailing_metadata_ready_user_data = nullptr; }; /// A subchannel picker is the object used to pick the subchannel to @@ -162,17 +161,14 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// live in the LB policy object itself. /// /// Currently, pickers are always accessed from within the - /// client_channel combiner, so they do not have to be thread-safe. - // TODO(roth): In a subsequent PR, split the data plane work (i.e., - // the interaction with the picker) and the control plane work (i.e., - // the interaction with the LB policy) into two different - // synchronization mechanisms, to avoid lock contention between the two. + /// client_channel data plane combiner, so they do not have to be + /// thread-safe. class SubchannelPicker { public: SubchannelPicker() = default; virtual ~SubchannelPicker() = default; - virtual PickResult Pick(PickArgs* pick, grpc_error** error) GRPC_ABSTRACT; + virtual PickResult Pick(PickArgs args) GRPC_ABSTRACT; GRPC_ABSTRACT_BASE_CLASS }; @@ -208,11 +204,24 @@ class LoadBalancingPolicy : public InternallyRefCounted { GRPC_ABSTRACT_BASE_CLASS }; + /// Interface for configuration data used by an LB policy implementation. + /// Individual implementations will create a subclass that adds methods to + /// return the parameters they need. + class Config : public RefCounted { + public: + virtual ~Config() = default; + + // Returns the load balancing policy name + virtual const char* name() const GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS + }; + /// Data passed to the UpdateLocked() method when new addresses and /// config are available. struct UpdateArgs { ServerAddressList addresses; - RefCountedPtr config; + RefCountedPtr config; const grpc_channel_args* args = nullptr; // TODO(roth): Remove everything below once channel args is @@ -291,7 +300,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { explicit QueuePicker(RefCountedPtr parent) : parent_(std::move(parent)) {} - PickResult Pick(PickArgs* pick, grpc_error** error) override; + PickResult Pick(PickArgs args) override; private: static void CallExitIdle(void* arg, grpc_error* error); @@ -306,10 +315,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { explicit TransientFailurePicker(grpc_error* error) : error_(error) {} ~TransientFailurePicker() override { GRPC_ERROR_UNREF(error_); } - PickResult Pick(PickArgs* pick, grpc_error** error) override { - *error = GRPC_ERROR_REF(error_); - return PICK_TRANSIENT_FAILURE; - } + PickResult Pick(PickArgs args) override; private: grpc_error* error_; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index ed6e8de3f21..2c652e4c6e6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -118,19 +118,19 @@ namespace { constexpr char kGrpclb[] = "grpclb"; -class ParsedGrpcLbConfig : public ParsedLoadBalancingConfig { +class ParsedGrpcLbConfig : public LoadBalancingPolicy::Config { public: explicit ParsedGrpcLbConfig( - RefCountedPtr child_policy) + RefCountedPtr child_policy) : child_policy_(std::move(child_policy)) {} const char* name() const override { return kGrpclb; } - RefCountedPtr child_policy() const { + RefCountedPtr child_policy() const { return child_policy_; } private: - RefCountedPtr child_policy_; + RefCountedPtr child_policy_; }; class GrpcLb : public LoadBalancingPolicy { @@ -274,7 +274,7 @@ class GrpcLb : public LoadBalancingPolicy { child_picker_(std::move(child_picker)), client_stats_(std::move(client_stats)) {} - PickResult Pick(PickArgs* pick, grpc_error** error) override; + PickResult Pick(PickArgs args) override; private: // Storing the address for logging, but not holding a ref. @@ -394,7 +394,7 @@ class GrpcLb : public LoadBalancingPolicy { // until it reports READY, at which point it will be moved to child_policy_. OrphanablePtr pending_child_policy_; // The child policy config. - RefCountedPtr child_policy_config_; + RefCountedPtr child_policy_config_; // Child policy in state READY. bool child_policy_ready_ = false; }; @@ -561,7 +561,8 @@ const char* GrpcLb::Serverlist::ShouldDrop() { // GrpcLb::Picker // -GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) { +GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { + PickResult result; // Check if we should drop the call. const char* drop_token = serverlist_->ShouldDrop(); if (drop_token != nullptr) { @@ -573,26 +574,28 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) { if (client_stats_ != nullptr) { client_stats_->AddCallDropped(drop_token); } - return PICK_COMPLETE; + result.type = PickResult::PICK_COMPLETE; + return result; } // Forward pick to child policy. - PickResult result = child_picker_->Pick(pick, error); + result = child_picker_->Pick(args); // If pick succeeded, add LB token to initial metadata. - if (result == PickResult::PICK_COMPLETE && - pick->connected_subchannel != nullptr) { + if (result.type == PickResult::PICK_COMPLETE && + result.connected_subchannel != nullptr) { const grpc_arg* arg = grpc_channel_args_find( - pick->connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); + result.connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN); if (arg == nullptr) { gpr_log(GPR_ERROR, - "[grpclb %p picker %p] No LB token for connected subchannel " - "pick %p", - parent_, this, pick); + "[grpclb %p picker %p] No LB token for connected subchannel %p", + parent_, this, result.connected_subchannel.get()); abort(); } grpc_mdelem lb_token = {reinterpret_cast(arg->value.pointer.p)}; GPR_ASSERT(!GRPC_MDISNULL(lb_token)); + grpc_linked_mdelem* mdelem_storage = static_cast( + args.call_state->Alloc(sizeof(grpc_linked_mdelem))); GPR_ASSERT(grpc_metadata_batch_add_tail( - pick->initial_metadata, &pick->lb_token_mdelem_storage, + args.initial_metadata, mdelem_storage, GRPC_MDELEM_REF(lb_token)) == GRPC_ERROR_NONE); GrpcLbClientStats* client_stats = static_cast( grpc_mdelem_get_user_data(lb_token, GrpcLbClientStats::Destroy)); @@ -1800,15 +1803,15 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { const char* name() const override { return kGrpclb; } - RefCountedPtr ParseLoadBalancingConfig( + RefCountedPtr ParseLoadBalancingConfig( const grpc_json* json, grpc_error** error) const override { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); if (json == nullptr) { - return RefCountedPtr( + return RefCountedPtr( New(nullptr)); } InlinedVector error_list; - RefCountedPtr child_policy; + RefCountedPtr child_policy; for (const grpc_json* field = json->child; field != nullptr; field = field->next) { if (field->key == nullptr) continue; @@ -1826,7 +1829,7 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { } } if (error_list.empty()) { - return RefCountedPtr( + return RefCountedPtr( New(std::move(child_policy))); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 199e973e72c..1b0dd230b49 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -115,9 +115,11 @@ class PickFirst : public LoadBalancingPolicy { explicit Picker(RefCountedPtr connected_subchannel) : connected_subchannel_(std::move(connected_subchannel)) {} - PickResult Pick(PickArgs* pick, grpc_error** error) override { - pick->connected_subchannel = connected_subchannel_; - return PICK_COMPLETE; + PickResult Pick(PickArgs args) override { + PickResult result; + result.type = PickResult::PICK_COMPLETE; + result.connected_subchannel = connected_subchannel_; + return result; } private: @@ -527,7 +529,7 @@ void PickFirst::PickFirstSubchannelData:: } } -class ParsedPickFirstConfig : public ParsedLoadBalancingConfig { +class ParsedPickFirstConfig : public LoadBalancingPolicy::Config { public: const char* name() const override { return kPickFirst; } }; @@ -545,12 +547,12 @@ class PickFirstFactory : public LoadBalancingPolicyFactory { const char* name() const override { return kPickFirst; } - RefCountedPtr ParseLoadBalancingConfig( + RefCountedPtr ParseLoadBalancingConfig( const grpc_json* json, grpc_error** error) const override { if (json != nullptr) { GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0); } - return RefCountedPtr( + return RefCountedPtr( New()); } }; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 1693032ea24..0b9915de28e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -149,7 +149,7 @@ class RoundRobin : public LoadBalancingPolicy { public: Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list); - PickResult Pick(PickArgs* pick, grpc_error** error) override; + PickResult Pick(PickArgs args) override; private: // Using pointer value only, no ref held -- do not dereference! @@ -220,8 +220,7 @@ RoundRobin::Picker::Picker(RoundRobin* parent, } } -RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick, - grpc_error** error) { +RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) { last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, @@ -230,8 +229,10 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick, parent_, this, last_picked_index_, subchannels_[last_picked_index_].get()); } - pick->connected_subchannel = subchannels_[last_picked_index_]; - return PICK_COMPLETE; + PickResult result; + result.type = PickResult::PICK_COMPLETE; + result.connected_subchannel = subchannels_[last_picked_index_]; + return result; } // @@ -503,7 +504,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) { } } -class ParsedRoundRobinConfig : public ParsedLoadBalancingConfig { +class ParsedRoundRobinConfig : public LoadBalancingPolicy::Config { public: const char* name() const override { return kRoundRobin; } }; @@ -521,12 +522,12 @@ class RoundRobinFactory : public LoadBalancingPolicyFactory { const char* name() const override { return kRoundRobin; } - RefCountedPtr ParseLoadBalancingConfig( + RefCountedPtr ParseLoadBalancingConfig( const grpc_json* json, grpc_error** error) const override { if (json != nullptr) { GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0); } - return RefCountedPtr( + return RefCountedPtr( New()); } }; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 819bad6c00d..d70042af229 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -120,11 +120,11 @@ constexpr char kXds[] = "xds_experimental"; constexpr char kDefaultLocalityName[] = "xds_default_locality"; constexpr uint32_t kDefaultLocalityWeight = 3; -class ParsedXdsConfig : public ParsedLoadBalancingConfig { +class ParsedXdsConfig : public LoadBalancingPolicy::Config { public: ParsedXdsConfig(const char* balancer_name, - RefCountedPtr child_policy, - RefCountedPtr fallback_policy) + RefCountedPtr child_policy, + RefCountedPtr fallback_policy) : balancer_name_(balancer_name), child_policy_(std::move(child_policy)), fallback_policy_(std::move(fallback_policy)) {} @@ -133,18 +133,18 @@ class ParsedXdsConfig : public ParsedLoadBalancingConfig { const char* balancer_name() const { return balancer_name_; }; - RefCountedPtr child_policy() const { + RefCountedPtr child_policy() const { return child_policy_; } - RefCountedPtr fallback_policy() const { + RefCountedPtr fallback_policy() const { return fallback_policy_; } private: const char* balancer_name_ = nullptr; - RefCountedPtr child_policy_; - RefCountedPtr fallback_policy_; + RefCountedPtr child_policy_; + RefCountedPtr fallback_policy_; }; class XdsLb : public LoadBalancingPolicy { @@ -300,9 +300,7 @@ class XdsLb : public LoadBalancingPolicy { public: explicit PickerRef(UniquePtr picker) : picker_(std::move(picker)) {} - PickResult Pick(PickArgs* pick, grpc_error** error) { - return picker_->Pick(pick, error); - } + PickResult Pick(PickArgs args) { return picker_->Pick(args); } private: UniquePtr picker_; @@ -322,12 +320,11 @@ class XdsLb : public LoadBalancingPolicy { : client_stats_(std::move(client_stats)), pickers_(std::move(pickers)) {} - PickResult Pick(PickArgs* pick, grpc_error** error) override; + PickResult Pick(PickArgs args) override; private: // Calls the picker of the locality that the key falls within - PickResult PickFromLocality(const uint32_t key, PickArgs* pick, - grpc_error** error); + PickResult PickFromLocality(const uint32_t key, PickArgs args); RefCountedPtr client_stats_; PickerList pickers_; }; @@ -363,7 +360,7 @@ class XdsLb : public LoadBalancingPolicy { ~LocalityEntry() = default; void UpdateLocked(xds_grpclb_serverlist* serverlist, - ParsedLoadBalancingConfig* child_policy_config, + LoadBalancingPolicy::Config* child_policy_config, const grpc_channel_args* args); void ShutdownLocked(); void ResetBackoffLocked(); @@ -410,7 +407,7 @@ class XdsLb : public LoadBalancingPolicy { }; void UpdateLocked(const LocalityList& locality_list, - ParsedLoadBalancingConfig* child_policy_config, + LoadBalancingPolicy::Config* child_policy_config, const grpc_channel_args* args, XdsLb* parent); void ShutdownLocked(); void ResetBackoffLocked(); @@ -506,7 +503,7 @@ class XdsLb : public LoadBalancingPolicy { grpc_closure lb_on_fallback_; // The policy to use for the fallback backends. - RefCountedPtr fallback_policy_config_; + RefCountedPtr fallback_policy_config_; // Lock held when modifying the value of fallback_policy_ or // pending_fallback_policy_. Mutex fallback_policy_mu_; @@ -515,7 +512,7 @@ class XdsLb : public LoadBalancingPolicy { OrphanablePtr pending_fallback_policy_; // The policy to use for the backends. - RefCountedPtr child_policy_config_; + RefCountedPtr child_policy_config_; // Map of policies to use in the backend LocalityMap locality_map_; // TODO(mhaidry) : Add support for multiple maps of localities @@ -530,25 +527,24 @@ class XdsLb : public LoadBalancingPolicy { // XdsLb::Picker // -XdsLb::PickResult XdsLb::Picker::Pick(PickArgs* pick, grpc_error** error) { +XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) { // TODO(roth): Add support for drop handling. // Generate a random number between 0 and the total weight const uint32_t key = (rand() * pickers_[pickers_.size() - 1].first) / RAND_MAX; // Forward pick to whichever locality maps to the range in which the // random number falls in. - PickResult result = PickFromLocality(key, pick, error); + PickResult result = PickFromLocality(key, args); // If pick succeeded, add client stats. - if (result == PickResult::PICK_COMPLETE && - pick->connected_subchannel != nullptr && client_stats_ != nullptr) { + if (result.type == PickResult::PICK_COMPLETE && + result.connected_subchannel != nullptr && client_stats_ != nullptr) { // TODO(roth): Add support for client stats. } return result; } XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key, - PickArgs* pick, - grpc_error** error) { + PickArgs args) { size_t mid = 0; size_t start_index = 0; size_t end_index = pickers_.size() - 1; @@ -566,7 +562,7 @@ XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key, } if (index == 0) index = start_index; GPR_ASSERT(pickers_[index].first > key); - return pickers_[index].second->Pick(pick, error); + return pickers_[index].second->Pick(args); } // @@ -1744,7 +1740,7 @@ void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) { void XdsLb::LocalityMap::UpdateLocked( const LocalityList& locality_serverlist, - ParsedLoadBalancingConfig* child_policy_config, + LoadBalancingPolicy::Config* child_policy_config, const grpc_channel_args* args, XdsLb* parent) { if (parent->shutting_down_) return; for (size_t i = 0; i < locality_serverlist.size(); i++) { @@ -1839,7 +1835,7 @@ XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked( void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( xds_grpclb_serverlist* serverlist, - ParsedLoadBalancingConfig* child_policy_config, + LoadBalancingPolicy::Config* child_policy_config, const grpc_channel_args* args_in) { if (parent_->shutting_down_) return; // Construct update args. @@ -2158,7 +2154,7 @@ class XdsFactory : public LoadBalancingPolicyFactory { const char* name() const override { return kXds; } - RefCountedPtr ParseLoadBalancingConfig( + RefCountedPtr ParseLoadBalancingConfig( const grpc_json* json, grpc_error** error) const override { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); if (json == nullptr) { @@ -2174,8 +2170,8 @@ class XdsFactory : public LoadBalancingPolicyFactory { InlinedVector error_list; const char* balancer_name = nullptr; - RefCountedPtr child_policy; - RefCountedPtr fallback_policy; + RefCountedPtr child_policy; + RefCountedPtr fallback_policy; for (const grpc_json* field = json->child; field != nullptr; field = field->next) { if (field->key == nullptr) continue; @@ -2221,7 +2217,7 @@ class XdsFactory : public LoadBalancingPolicyFactory { "field:balancerName error:not found")); } if (error_list.empty()) { - return RefCountedPtr(New( + return RefCountedPtr(New( balancer_name, std::move(child_policy), std::move(fallback_policy))); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR("Xds Parser", &error_list); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index aaf3e959542..3b8c9faa180 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -37,7 +37,7 @@ class LoadBalancingPolicyFactory { /// Caller does NOT take ownership of result. virtual const char* name() const GRPC_ABSTRACT; - virtual RefCountedPtr ParseLoadBalancingConfig( + virtual RefCountedPtr ParseLoadBalancingConfig( const grpc_json* json, grpc_error** error) const GRPC_ABSTRACT; virtual ~LoadBalancingPolicyFactory() {} diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.cc b/src/core/ext/filters/client_channel/lb_policy_registry.cc index 973aa26d0f6..20099b52d6c 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.cc +++ b/src/core/ext/filters/client_channel/lb_policy_registry.cc @@ -176,7 +176,7 @@ grpc_json* ParseLoadBalancingConfigHelper(const grpc_json* lb_config_array, } } // namespace -RefCountedPtr +RefCountedPtr LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(const grpc_json* json, grpc_error** error) { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h index 6820cfc9334..c5f02953a1b 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.h +++ b/src/core/ext/filters/client_channel/lb_policy_registry.h @@ -56,7 +56,7 @@ class LoadBalancingPolicyRegistry { /// Returns a parsed object of the load balancing policy to be used from a /// LoadBalancingConfig array \a json. - static RefCountedPtr ParseLoadBalancingConfig( + static RefCountedPtr ParseLoadBalancingConfig( const grpc_json* json, grpc_error** error); }; diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.cc b/src/core/ext/filters/client_channel/resolver_result_parsing.cc index 6a811a2d936..3a1960bfded 100644 --- a/src/core/ext/filters/client_channel/resolver_result_parsing.cc +++ b/src/core/ext/filters/client_channel/resolver_result_parsing.cc @@ -268,7 +268,7 @@ ClientChannelServiceConfigParser::ParseGlobalParams(const grpc_json* json, grpc_error** error) { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); InlinedVector error_list; - RefCountedPtr parsed_lb_config; + RefCountedPtr parsed_lb_config; UniquePtr lb_policy_name; Optional retry_throttling; const char* health_check_service_name = nullptr; diff --git a/src/core/ext/filters/client_channel/resolver_result_parsing.h b/src/core/ext/filters/client_channel/resolver_result_parsing.h index 7750791c779..d0a0456875d 100644 --- a/src/core/ext/filters/client_channel/resolver_result_parsing.h +++ b/src/core/ext/filters/client_channel/resolver_result_parsing.h @@ -45,7 +45,7 @@ class ClientChannelGlobalParsedConfig : public ServiceConfig::ParsedConfig { }; ClientChannelGlobalParsedConfig( - RefCountedPtr parsed_lb_config, + RefCountedPtr parsed_lb_config, UniquePtr parsed_deprecated_lb_policy, const Optional& retry_throttling, const char* health_check_service_name) @@ -58,7 +58,7 @@ class ClientChannelGlobalParsedConfig : public ServiceConfig::ParsedConfig { return retry_throttling_; } - RefCountedPtr parsed_lb_config() const { + RefCountedPtr parsed_lb_config() const { return parsed_lb_config_; } @@ -71,7 +71,7 @@ class ClientChannelGlobalParsedConfig : public ServiceConfig::ParsedConfig { } private: - RefCountedPtr parsed_lb_config_; + RefCountedPtr parsed_lb_config_; UniquePtr parsed_deprecated_lb_policy_; Optional retry_throttling_; const char* health_check_service_name_; diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index 4e383f65dd1..3fe2ee74c92 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -184,7 +184,7 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy( Args args, TraceFlag* tracer, UniquePtr target_uri, UniquePtr child_policy_name, - RefCountedPtr child_lb_config, + RefCountedPtr child_lb_config, grpc_error** error) : LoadBalancingPolicy(std::move(args)), tracer_(tracer), @@ -333,7 +333,7 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) { void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked( const char* lb_policy_name, - RefCountedPtr lb_policy_config, + RefCountedPtr lb_policy_config, Resolver::Result result, TraceStringVector* trace_strings) { // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store @@ -530,7 +530,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( const bool resolution_contains_addresses = result.addresses.size() > 0; // Process the resolver result. const char* lb_policy_name = nullptr; - RefCountedPtr lb_policy_config; + RefCountedPtr lb_policy_config; bool service_config_changed = false; char* service_config_error_string = nullptr; if (process_resolver_result_ != nullptr) { diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.h b/src/core/ext/filters/client_channel/resolving_lb_policy.h index 0ca6c9563f9..cc9f3176cce 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.h +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.h @@ -57,7 +57,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { ResolvingLoadBalancingPolicy( Args args, TraceFlag* tracer, UniquePtr target_uri, UniquePtr child_policy_name, - RefCountedPtr child_lb_config, + RefCountedPtr child_lb_config, grpc_error** error); // Private ctor, to be used by client_channel only! @@ -70,7 +70,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { // should set the channel to be in TRANSIENT_FAILURE. typedef bool (*ProcessResolverResultCallback)( void* user_data, Resolver::Result* result, const char** lb_policy_name, - RefCountedPtr* lb_policy_config, + RefCountedPtr* lb_policy_config, grpc_error** service_config_error); // If error is set when this returns, then construction failed, and // the caller may not use the new object. @@ -109,7 +109,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { void OnResolverError(grpc_error* error); void CreateOrUpdateLbPolicyLocked( const char* lb_policy_name, - RefCountedPtr lb_policy_config, + RefCountedPtr lb_policy_config, Resolver::Result result, TraceStringVector* trace_strings); OrphanablePtr CreateLbPolicyLocked( const char* lb_policy_name, const grpc_channel_args& args, @@ -126,7 +126,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { ProcessResolverResultCallback process_resolver_result_ = nullptr; void* process_resolver_result_user_data_ = nullptr; UniquePtr child_policy_name_; - RefCountedPtr child_lb_config_; + RefCountedPtr child_lb_config_; // Resolver and associated state. OrphanablePtr resolver_; diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index b871f04bc9e..bd28422bcd1 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -120,10 +120,12 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy cb_(cb), user_data_(user_data) {} - PickResult Pick(PickArgs* pick, grpc_error** error) override { - PickResult result = delegate_picker_->Pick(pick, error); - if (result == PICK_COMPLETE && pick->connected_subchannel != nullptr) { - New(pick, cb_, user_data_); // deletes itself + PickResult Pick(PickArgs args) override { + PickResult result = delegate_picker_->Pick(args); + if (result.type == PickResult::PICK_COMPLETE && + result.connected_subchannel != nullptr) { + new (args.call_state->Alloc(sizeof(TrailingMetadataHandler))) + TrailingMetadataHandler(&result, cb_, user_data_); } return result; } @@ -169,35 +171,27 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy class TrailingMetadataHandler { public: - TrailingMetadataHandler(PickArgs* pick, + TrailingMetadataHandler(PickResult* result, InterceptRecvTrailingMetadataCallback cb, void* user_data) : cb_(cb), user_data_(user_data) { - GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, - RecordRecvTrailingMetadata, this, - grpc_schedule_on_exec_ctx); - pick->recv_trailing_metadata_ready = &recv_trailing_metadata_ready_; - pick->original_recv_trailing_metadata_ready = - &original_recv_trailing_metadata_ready_; - pick->recv_trailing_metadata = &recv_trailing_metadata_; + result->recv_trailing_metadata_ready = &RecordRecvTrailingMetadata; + result->recv_trailing_metadata_ready_user_data = this; } private: - static void RecordRecvTrailingMetadata(void* arg, grpc_error* err) { + static void RecordRecvTrailingMetadata( + void* arg, grpc_metadata_batch* recv_trailing_metadata, + CallState* call_state) { TrailingMetadataHandler* self = static_cast(arg); - GPR_ASSERT(self->recv_trailing_metadata_ != nullptr); + GPR_ASSERT(recv_trailing_metadata != nullptr); self->cb_(self->user_data_); - GRPC_CLOSURE_SCHED(self->original_recv_trailing_metadata_ready_, - GRPC_ERROR_REF(err)); - Delete(self); + self->~TrailingMetadataHandler(); } InterceptRecvTrailingMetadataCallback cb_; void* user_data_; - grpc_closure recv_trailing_metadata_ready_; - grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; - grpc_metadata_batch* recv_trailing_metadata_ = nullptr; }; };