More LB policy API improvements.

pull/19049/head
Mark D. Roth 6 years ago
parent d43604a8b1
commit 31ec9a74ae
  1. 158
      src/core/ext/filters/client_channel/client_channel.cc
  2. 18
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 148
      src/core/ext/filters/client_channel/lb_policy.h
  4. 43
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 14
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  6. 17
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  7. 56
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  8. 2
      src/core/ext/filters/client_channel/lb_policy_factory.h
  9. 2
      src/core/ext/filters/client_channel/lb_policy_registry.cc
  10. 2
      src/core/ext/filters/client_channel/lb_policy_registry.h
  11. 2
      src/core/ext/filters/client_channel/resolver_result_parsing.cc
  12. 6
      src/core/ext/filters/client_channel/resolver_result_parsing.h
  13. 6
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  14. 8
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  15. 34
      test/core/util/test_lb_policies.cc

@ -105,7 +105,6 @@ namespace {
class ChannelData {
public:
struct QueuedPick {
LoadBalancingPolicy::PickArgs pick;
grpc_call_element* elem;
QueuedPick* next = nullptr;
};
@ -223,7 +222,7 @@ class ChannelData {
static bool ProcessResolverResultLocked(
void* arg, Resolver::Result* result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error);
grpc_error* DoPingLocked(grpc_transport_op* op);
@ -236,7 +235,7 @@ class ChannelData {
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
UniquePtr<char>* lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config);
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
//
// Fields set at construction and never modified.
@ -314,6 +313,16 @@ class CallData {
private:
class QueuedPickCanceller;
class LbCallState : public LoadBalancingPolicy::CallState {
public:
explicit LbCallState(CallData* calld) : calld_(calld) {}
void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); }
private:
CallData* calld_;
};
// State used for starting a retryable batch on a subchannel call.
// This provides its own grpc_transport_stream_op_batch and other data
// structures needed to populate the ops in the batch.
@ -449,8 +458,9 @@ class CallData {
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state);
static void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
const LoadBalancingPolicy::PickArgs& pick,
static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
void* arg, grpc_error* error);
void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
grpc_transport_stream_op_batch* batch);
// Returns the index into pending_batches_ to be used for batch.
@ -640,8 +650,19 @@ class CallData {
bool pick_queued_ = false;
bool service_config_applied_ = false;
QueuedPickCanceller* pick_canceller_ = nullptr;
LbCallState lb_call_state_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
void (*lb_recv_trailing_metadata_ready_)(
void* user_data, grpc_metadata_batch* recv_trailing_metadata,
LoadBalancingPolicy::CallState* call_state) = nullptr;
void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
grpc_closure pick_closure_;
// For intercepting recv_trailing_metadata_ready for the LB policy.
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
grpc_polling_entity* pollent_ = nullptr;
// Batches are added to this list when received from above.
@ -1143,7 +1164,7 @@ void ChannelData::ProcessLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
UniquePtr<char>* lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config) {
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
// Prefer the LB policy name found in the service config.
if (parsed_service_config != nullptr &&
parsed_service_config->parsed_lb_config() != nullptr) {
@ -1191,7 +1212,7 @@ void ChannelData::ProcessLbPolicy(
// resolver result update.
bool ChannelData::ProcessResolverResultLocked(
void* arg, Resolver::Result* result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error) {
ChannelData* chand = static_cast<ChannelData*>(arg);
RefCountedPtr<ServiceConfig> service_config;
@ -1312,19 +1333,18 @@ grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
}
LoadBalancingPolicy::PickArgs pick;
grpc_error* error = GRPC_ERROR_NONE;
picker_->Pick(&pick, &error);
if (pick.connected_subchannel != nullptr) {
pick.connected_subchannel->Ping(op->send_ping.on_initiate,
LoadBalancingPolicy::PickResult result =
picker_->Pick(LoadBalancingPolicy::PickArgs());
if (result.connected_subchannel != nullptr) {
result.connected_subchannel->Ping(op->send_ping.on_initiate,
op->send_ping.on_ack);
} else {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
if (result.error == GRPC_ERROR_NONE) {
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"LB policy dropped call on ping");
}
}
return error;
return result.error;
}
void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
@ -1505,6 +1525,7 @@ CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
call_context_(args.context),
lb_call_state_(this),
pending_send_initial_metadata_(false),
pending_send_message_(false),
pending_send_trailing_metadata_(false),
@ -1737,18 +1758,30 @@ void CallData::FreeCachedSendOpDataForCompletedBatch(
// LB recv_trailing_metadata_ready handling
//
void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
void* arg, grpc_error* error) {
CallData* calld = static_cast<CallData*>(arg);
// Invoke callback to LB policy.
calld->lb_recv_trailing_metadata_ready_(
calld->lb_recv_trailing_metadata_ready_user_data_,
calld->recv_trailing_metadata_, &calld->lb_call_state_);
// Chain to original callback.
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));
}
void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
const LoadBalancingPolicy::PickArgs& pick,
grpc_transport_stream_op_batch* batch) {
if (pick.recv_trailing_metadata_ready != nullptr) {
*pick.original_recv_trailing_metadata_ready =
if (lb_recv_trailing_metadata_ready_ != nullptr) {
recv_trailing_metadata_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
grpc_schedule_on_exec_ctx);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
pick.recv_trailing_metadata_ready;
if (pick.recv_trailing_metadata != nullptr) {
*pick.recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
}
&recv_trailing_metadata_ready_;
}
}
@ -1894,8 +1927,7 @@ void CallData::PendingBatchesFail(
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
if (batch->recv_trailing_metadata) {
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick,
batch);
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
}
batch->handler_private.extra_arg = this;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
@ -1949,8 +1981,7 @@ void CallData::PendingBatchesResume(grpc_call_element* elem) {
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
if (batch->recv_trailing_metadata) {
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick,
batch);
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
}
batch->handler_private.extra_arg = subchannel_call_.get();
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
@ -2011,7 +2042,7 @@ void CallData::DoRetry(grpc_call_element* elem,
GPR_ASSERT(retry_policy != nullptr);
// Reset subchannel call and connected subchannel.
subchannel_call_.reset();
pick_.pick.connected_subchannel.reset();
connected_subchannel_.reset();
// Compute backoff delay.
grpc_millis next_attempt_time;
if (server_pushback_ms >= 0) {
@ -2868,7 +2899,7 @@ void CallData::AddRetriableRecvTrailingMetadataOp(
.recv_trailing_metadata_ready =
&retry_state->recv_trailing_metadata_ready;
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
pick_.pick, &batch_data->batch);
&batch_data->batch);
}
void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
@ -3135,8 +3166,7 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) {
// need to use a separate call context for each subchannel call.
call_context_, call_combiner_, parent_data_size};
grpc_error* error = GRPC_ERROR_NONE;
subchannel_call_ =
pick_.pick.connected_subchannel->CreateCall(call_args, &error);
subchannel_call_ = connected_subchannel_->CreateCall(call_args, &error);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, this, subchannel_call_.get(), grpc_error_string(error));
@ -3297,13 +3327,14 @@ void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
}
}
const char* PickResultName(LoadBalancingPolicy::PickResult result) {
switch (result) {
case LoadBalancingPolicy::PICK_COMPLETE:
const char* PickResultTypeName(
LoadBalancingPolicy::PickResult::ResultType type) {
switch (type) {
case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
return "COMPLETE";
case LoadBalancingPolicy::PICK_QUEUE:
case LoadBalancingPolicy::PickResult::PICK_QUEUE:
return "QUEUE";
case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE:
return "TRANSIENT_FAILURE";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
@ -3313,8 +3344,10 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
CallData* calld = static_cast<CallData*>(elem->call_data);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
GPR_ASSERT(calld->pick_.pick.connected_subchannel == nullptr);
GPR_ASSERT(calld->connected_subchannel_ == nullptr);
GPR_ASSERT(calld->subchannel_call_ == nullptr);
// Apply service config to call if needed.
calld->MaybeApplyServiceConfigToCallLocked(elem);
// If this is a retry, use the send_initial_metadata payload that
// we've cached; otherwise, use the pending batch. The
// send_initial_metadata batch will be the first pending batch in the
@ -3325,58 +3358,58 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
// allocate the subchannel batch earlier so that we can give the
// subchannel's copy of the metadata batch (which is copied for each
// attempt) to the LB policy instead the one from the parent channel.
calld->pick_.pick.initial_metadata =
LoadBalancingPolicy::PickArgs pick_args;
pick_args.call_state = &calld->lb_call_state_;
pick_args.initial_metadata =
calld->seen_send_initial_metadata_
? &calld->send_initial_metadata_
: calld->pending_batches_[0]
.batch->payload->send_initial_metadata.send_initial_metadata;
uint32_t* send_initial_metadata_flags =
// Grab initial metadata flags so that we can check later if the call has
// wait_for_ready enabled.
const uint32_t send_initial_metadata_flags =
calld->seen_send_initial_metadata_
? &calld->send_initial_metadata_flags_
: &calld->pending_batches_[0]
? calld->send_initial_metadata_flags_
: calld->pending_batches_[0]
.batch->payload->send_initial_metadata
.send_initial_metadata_flags;
// Apply service config to call if needed.
calld->MaybeApplyServiceConfigToCallLocked(elem);
// When done, we schedule this closure to leave the data plane combiner.
GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem,
grpc_schedule_on_exec_ctx);
// Attempt pick.
error = GRPC_ERROR_NONE;
auto pick_result = chand->picker()->Pick(&calld->pick_.pick, &error);
auto result = chand->picker()->Pick(pick_args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
"error=%s)",
chand, calld, PickResultName(pick_result),
calld->pick_.pick.connected_subchannel.get(),
grpc_error_string(error));
chand, calld, PickResultTypeName(result.type),
result.connected_subchannel.get(), grpc_error_string(result.error));
}
switch (pick_result) {
case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: {
switch (result.type) {
case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: {
// If we're shutting down, fail all RPCs.
grpc_error* disconnect_error = chand->disconnect_error();
if (disconnect_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error);
GRPC_ERROR_UNREF(result.error);
GRPC_CLOSURE_SCHED(&calld->pick_closure_,
GRPC_ERROR_REF(disconnect_error));
break;
}
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
if ((*send_initial_metadata_flags &
if ((send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
// Retry if appropriate; otherwise, fail.
grpc_status_code status = GRPC_STATUS_OK;
grpc_error_get_status(error, calld->deadline_, &status, nullptr,
grpc_error_get_status(result.error, calld->deadline_, &status, nullptr,
nullptr, nullptr);
if (!calld->enable_retries_ ||
!calld->MaybeRetry(elem, nullptr /* batch_data */, status,
nullptr /* server_pushback_md */)) {
grpc_error* new_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to pick subchannel", &error, 1);
GRPC_ERROR_UNREF(error);
"Failed to pick subchannel", &result.error, 1);
GRPC_ERROR_UNREF(result.error);
GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error);
}
if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
@ -3384,19 +3417,24 @@ void CallData::StartPickLocked(void* arg, grpc_error* error) {
}
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
GRPC_ERROR_UNREF(error);
GRPC_ERROR_UNREF(result.error);
}
// Fallthrough
case LoadBalancingPolicy::PICK_QUEUE:
case LoadBalancingPolicy::PickResult::PICK_QUEUE:
if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem);
break;
default: // PICK_COMPLETE
// Handle drops.
if (GPR_UNLIKELY(calld->pick_.pick.connected_subchannel == nullptr)) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) {
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
}
GRPC_CLOSURE_SCHED(&calld->pick_closure_, error);
calld->connected_subchannel_ = std::move(result.connected_subchannel);
calld->lb_recv_trailing_metadata_ready_ =
result.recv_trailing_metadata_ready;
calld->lb_recv_trailing_metadata_ready_user_data_ =
result.recv_trailing_metadata_ready_user_data;
GRPC_CLOSURE_SCHED(&calld->pick_closure_, result.error);
if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
}
}

@ -105,7 +105,7 @@ LoadBalancingPolicy::UpdateArgs& LoadBalancingPolicy::UpdateArgs::operator=(
//
LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
PickArgs* pick, grpc_error** error) {
PickArgs args) {
// We invoke the parent's ExitIdleLocked() via a closure instead
// of doing it directly here, for two reasons:
// 1. ExitIdleLocked() may cause the policy's state to change and
@ -125,7 +125,9 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
grpc_combiner_scheduler(parent_->combiner())),
GRPC_ERROR_NONE);
}
return PICK_QUEUE;
PickResult result;
result.type = PickResult::PICK_QUEUE;
return result;
}
void LoadBalancingPolicy::QueuePicker::CallExitIdle(void* arg,
@ -135,4 +137,16 @@ void LoadBalancingPolicy::QueuePicker::CallExitIdle(void* arg,
parent->Unref();
}
//
// LoadBalancingPolicy::TransientFailurePicker
//
LoadBalancingPolicy::PickResult
LoadBalancingPolicy::TransientFailurePicker::Pick(PickArgs args) {
PickResult result;
result.type = PickResult::PICK_TRANSIENT_FAILURE;
result.error = GRPC_ERROR_REF(error_);
return result;
}
} // namespace grpc_core

@ -32,21 +32,9 @@
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
namespace grpc_core {
/// Interface for parsed forms of load balancing configs found in a service
/// config.
class ParsedLoadBalancingConfig : public RefCounted<ParsedLoadBalancingConfig> {
public:
virtual ~ParsedLoadBalancingConfig() = default;
// Returns the load balancing policy name
virtual const char* name() const GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS;
};
extern DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
/// Interface for load balancing policies.
///
@ -89,67 +77,78 @@ class ParsedLoadBalancingConfig : public RefCounted<ParsedLoadBalancingConfig> {
// interested_parties() hooks from the API.
class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
public:
/// Interface for accessing per-call state.
class CallState {
public:
CallState() = default;
virtual ~CallState() = default;
/// Allocates memory associated with the call, which will be
/// automatically freed when the call is complete.
/// It is more efficient to use this than to allocate memory directly
/// for allocations that need to be made on a per-call basis.
virtual void* Alloc(size_t size) GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
/// Arguments used when picking a subchannel for an RPC.
struct PickArgs {
///
/// Input parameters.
///
/// Initial metadata associated with the picking call.
/// The LB policy may use the existing metadata to influence its routing
/// decision, and it may add new metadata elements to be sent with the
/// call to the chosen backend.
// TODO(roth): Provide a more generic metadata API here.
grpc_metadata_batch* initial_metadata = nullptr;
/// Storage for LB token in \a initial_metadata, or nullptr if not used.
// TODO(roth): Remove this from the API. Maybe have the LB policy
// allocate this on the arena instead?
grpc_linked_mdelem lb_token_mdelem_storage;
///
/// Output parameters.
///
/// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
/// Callback set by lb policy to be notified of trailing metadata.
/// The callback must be scheduled on grpc_schedule_on_exec_ctx.
// TODO(roth): Provide a cleaner callback API.
grpc_closure* recv_trailing_metadata_ready = nullptr;
/// The address that will be set to point to the original
/// recv_trailing_metadata_ready callback, to be invoked by the LB
/// policy's recv_trailing_metadata_ready callback when complete.
/// Must be non-null if recv_trailing_metadata_ready is non-null.
// TODO(roth): Consider making the recv_trailing_metadata closure a
// synchronous callback, in which case it is not responsible for
// chaining to the next callback, so this can be removed from the API.
grpc_closure** original_recv_trailing_metadata_ready = nullptr;
/// If this is not nullptr, then the client channel will point it to the
/// call's trailing metadata before invoking recv_trailing_metadata_ready.
/// If this is nullptr, then the callback will still be called.
/// The lb does not have ownership of the metadata.
// TODO(roth): If we make this a synchronous callback, then this can
// be passed to the callback as a parameter and can be removed from
// the API here.
grpc_metadata_batch** recv_trailing_metadata = nullptr;
/// An interface for accessing call state. Can be used to allocate
/// data associated with the call in an efficient way.
CallState* call_state;
};
/// The result of picking a subchannel for an RPC.
enum PickResult {
// Pick complete. If connected_subchannel is non-null, client channel
// can immediately proceed with the call on connected_subchannel;
// otherwise, call should be dropped.
struct PickResult {
enum ResultType {
/// Pick complete. If connected_subchannel is non-null, client channel
/// can immediately proceed with the call on connected_subchannel;
/// otherwise, call should be dropped.
PICK_COMPLETE,
// Pick cannot be completed until something changes on the control
// plane. Client channel will queue the pick and try again the
// next time the picker is updated.
/// Pick cannot be completed until something changes on the control
/// plane. Client channel will queue the pick and try again the
/// next time the picker is updated.
PICK_QUEUE,
// LB policy is in transient failure. If the pick is wait_for_ready,
// client channel will wait for the next picker and try again;
// otherwise, the call will be failed immediately (although it may
// be retried if the client channel is configured to do so).
// The Pick() method will set its error parameter if this value is
// returned.
/// LB policy is in transient failure. If the pick is wait_for_ready,
/// client channel will wait for the next picker and try again;
/// otherwise, the call will be failed immediately (although it may
/// be retried if the client channel is configured to do so).
/// The Pick() method will set its error parameter if this value is
/// returned.
PICK_TRANSIENT_FAILURE,
};
ResultType type;
/// Used only if type is PICK_COMPLETE. Will be set to the selected
/// subchannel, or nullptr if the LB policy decides to drop the call.
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
/// Used only if type is PICK_TRANSIENT_FAILURE.
/// Error to be set when returning a transient failure.
// TODO(roth): Replace this with something similar to grpc::Status,
// so that we don't expose grpc_error to this API.
grpc_error* error = GRPC_ERROR_NONE;
/// Used only if type is PICK_COMPLETE.
/// Callback set by lb policy to be notified of trailing metadata.
/// The user_data argument will be set to the
/// recv_trailing_metadata_ready_user_data field.
/// recv_trailing_metadata will be set to the metadata, which may be
/// modified by the callback. The callback does not take ownership,
/// however, so any data that needs to be used after returning must
/// be copied.
void (*recv_trailing_metadata_ready)(
void* user_data, grpc_metadata_batch* recv_trailing_metadata,
CallState* call_state) = nullptr;
void* recv_trailing_metadata_ready_user_data = nullptr;
};
/// A subchannel picker is the object used to pick the subchannel to
/// use for a given RPC.
@ -162,17 +161,14 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// live in the LB policy object itself.
///
/// Currently, pickers are always accessed from within the
/// client_channel combiner, so they do not have to be thread-safe.
// TODO(roth): In a subsequent PR, split the data plane work (i.e.,
// the interaction with the picker) and the control plane work (i.e.,
// the interaction with the LB policy) into two different
// synchronization mechanisms, to avoid lock contention between the two.
/// client_channel data plane combiner, so they do not have to be
/// thread-safe.
class SubchannelPicker {
public:
SubchannelPicker() = default;
virtual ~SubchannelPicker() = default;
virtual PickResult Pick(PickArgs* pick, grpc_error** error) GRPC_ABSTRACT;
virtual PickResult Pick(PickArgs args) GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
@ -208,11 +204,24 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
GRPC_ABSTRACT_BASE_CLASS
};
/// Interface for configuration data used by an LB policy implementation.
/// Individual implementations will create a subclass that adds methods to
/// return the parameters they need.
class Config : public RefCounted<Config> {
public:
virtual ~Config() = default;
// Returns the load balancing policy name
virtual const char* name() const GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
/// Data passed to the UpdateLocked() method when new addresses and
/// config are available.
struct UpdateArgs {
ServerAddressList addresses;
RefCountedPtr<ParsedLoadBalancingConfig> config;
RefCountedPtr<Config> config;
const grpc_channel_args* args = nullptr;
// TODO(roth): Remove everything below once channel args is
@ -291,7 +300,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
explicit QueuePicker(RefCountedPtr<LoadBalancingPolicy> parent)
: parent_(std::move(parent)) {}
PickResult Pick(PickArgs* pick, grpc_error** error) override;
PickResult Pick(PickArgs args) override;
private:
static void CallExitIdle(void* arg, grpc_error* error);
@ -306,10 +315,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
explicit TransientFailurePicker(grpc_error* error) : error_(error) {}
~TransientFailurePicker() override { GRPC_ERROR_UNREF(error_); }
PickResult Pick(PickArgs* pick, grpc_error** error) override {
*error = GRPC_ERROR_REF(error_);
return PICK_TRANSIENT_FAILURE;
}
PickResult Pick(PickArgs args) override;
private:
grpc_error* error_;

@ -118,19 +118,19 @@ namespace {
constexpr char kGrpclb[] = "grpclb";
class ParsedGrpcLbConfig : public ParsedLoadBalancingConfig {
class ParsedGrpcLbConfig : public LoadBalancingPolicy::Config {
public:
explicit ParsedGrpcLbConfig(
RefCountedPtr<ParsedLoadBalancingConfig> child_policy)
RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
: child_policy_(std::move(child_policy)) {}
const char* name() const override { return kGrpclb; }
RefCountedPtr<ParsedLoadBalancingConfig> child_policy() const {
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
private:
RefCountedPtr<ParsedLoadBalancingConfig> child_policy_;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
};
class GrpcLb : public LoadBalancingPolicy {
@ -274,7 +274,7 @@ class GrpcLb : public LoadBalancingPolicy {
child_picker_(std::move(child_picker)),
client_stats_(std::move(client_stats)) {}
PickResult Pick(PickArgs* pick, grpc_error** error) override;
PickResult Pick(PickArgs args) override;
private:
// Storing the address for logging, but not holding a ref.
@ -394,7 +394,7 @@ class GrpcLb : public LoadBalancingPolicy {
// until it reports READY, at which point it will be moved to child_policy_.
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
// The child policy config.
RefCountedPtr<ParsedLoadBalancingConfig> child_policy_config_;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
// Child policy in state READY.
bool child_policy_ready_ = false;
};
@ -561,7 +561,8 @@ const char* GrpcLb::Serverlist::ShouldDrop() {
// GrpcLb::Picker
//
GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
PickResult result;
// Check if we should drop the call.
const char* drop_token = serverlist_->ShouldDrop();
if (drop_token != nullptr) {
@ -573,26 +574,28 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
if (client_stats_ != nullptr) {
client_stats_->AddCallDropped(drop_token);
}
return PICK_COMPLETE;
result.type = PickResult::PICK_COMPLETE;
return result;
}
// Forward pick to child policy.
PickResult result = child_picker_->Pick(pick, error);
result = child_picker_->Pick(args);
// If pick succeeded, add LB token to initial metadata.
if (result == PickResult::PICK_COMPLETE &&
pick->connected_subchannel != nullptr) {
if (result.type == PickResult::PICK_COMPLETE &&
result.connected_subchannel != nullptr) {
const grpc_arg* arg = grpc_channel_args_find(
pick->connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
result.connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
if (arg == nullptr) {
gpr_log(GPR_ERROR,
"[grpclb %p picker %p] No LB token for connected subchannel "
"pick %p",
parent_, this, pick);
"[grpclb %p picker %p] No LB token for connected subchannel %p",
parent_, this, result.connected_subchannel.get());
abort();
}
grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
GPR_ASSERT(!GRPC_MDISNULL(lb_token));
grpc_linked_mdelem* mdelem_storage = static_cast<grpc_linked_mdelem*>(
args.call_state->Alloc(sizeof(grpc_linked_mdelem)));
GPR_ASSERT(grpc_metadata_batch_add_tail(
pick->initial_metadata, &pick->lb_token_mdelem_storage,
args.initial_metadata, mdelem_storage,
GRPC_MDELEM_REF(lb_token)) == GRPC_ERROR_NONE);
GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(
grpc_mdelem_get_user_data(lb_token, GrpcLbClientStats::Destroy));
@ -1800,15 +1803,15 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
const char* name() const override { return kGrpclb; }
RefCountedPtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json == nullptr) {
return RefCountedPtr<ParsedLoadBalancingConfig>(
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(nullptr));
}
InlinedVector<grpc_error*, 2> error_list;
RefCountedPtr<ParsedLoadBalancingConfig> child_policy;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
for (const grpc_json* field = json->child; field != nullptr;
field = field->next) {
if (field->key == nullptr) continue;
@ -1826,7 +1829,7 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
}
}
if (error_list.empty()) {
return RefCountedPtr<ParsedLoadBalancingConfig>(
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedGrpcLbConfig>(std::move(child_policy)));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);

@ -115,9 +115,11 @@ class PickFirst : public LoadBalancingPolicy {
explicit Picker(RefCountedPtr<ConnectedSubchannel> connected_subchannel)
: connected_subchannel_(std::move(connected_subchannel)) {}
PickResult Pick(PickArgs* pick, grpc_error** error) override {
pick->connected_subchannel = connected_subchannel_;
return PICK_COMPLETE;
PickResult Pick(PickArgs args) override {
PickResult result;
result.type = PickResult::PICK_COMPLETE;
result.connected_subchannel = connected_subchannel_;
return result;
}
private:
@ -527,7 +529,7 @@ void PickFirst::PickFirstSubchannelData::
}
}
class ParsedPickFirstConfig : public ParsedLoadBalancingConfig {
class ParsedPickFirstConfig : public LoadBalancingPolicy::Config {
public:
const char* name() const override { return kPickFirst; }
};
@ -545,12 +547,12 @@ class PickFirstFactory : public LoadBalancingPolicyFactory {
const char* name() const override { return kPickFirst; }
RefCountedPtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
if (json != nullptr) {
GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
}
return RefCountedPtr<ParsedLoadBalancingConfig>(
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedPickFirstConfig>());
}
};

@ -149,7 +149,7 @@ class RoundRobin : public LoadBalancingPolicy {
public:
Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list);
PickResult Pick(PickArgs* pick, grpc_error** error) override;
PickResult Pick(PickArgs args) override;
private:
// Using pointer value only, no ref held -- do not dereference!
@ -220,8 +220,7 @@ RoundRobin::Picker::Picker(RoundRobin* parent,
}
}
RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick,
grpc_error** error) {
RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
@ -230,8 +229,10 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick,
parent_, this, last_picked_index_,
subchannels_[last_picked_index_].get());
}
pick->connected_subchannel = subchannels_[last_picked_index_];
return PICK_COMPLETE;
PickResult result;
result.type = PickResult::PICK_COMPLETE;
result.connected_subchannel = subchannels_[last_picked_index_];
return result;
}
//
@ -503,7 +504,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
}
}
class ParsedRoundRobinConfig : public ParsedLoadBalancingConfig {
class ParsedRoundRobinConfig : public LoadBalancingPolicy::Config {
public:
const char* name() const override { return kRoundRobin; }
};
@ -521,12 +522,12 @@ class RoundRobinFactory : public LoadBalancingPolicyFactory {
const char* name() const override { return kRoundRobin; }
RefCountedPtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
if (json != nullptr) {
GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
}
return RefCountedPtr<ParsedLoadBalancingConfig>(
return RefCountedPtr<LoadBalancingPolicy::Config>(
New<ParsedRoundRobinConfig>());
}
};

@ -120,11 +120,11 @@ constexpr char kXds[] = "xds_experimental";
constexpr char kDefaultLocalityName[] = "xds_default_locality";
constexpr uint32_t kDefaultLocalityWeight = 3;
class ParsedXdsConfig : public ParsedLoadBalancingConfig {
class ParsedXdsConfig : public LoadBalancingPolicy::Config {
public:
ParsedXdsConfig(const char* balancer_name,
RefCountedPtr<ParsedLoadBalancingConfig> child_policy,
RefCountedPtr<ParsedLoadBalancingConfig> fallback_policy)
RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy)
: balancer_name_(balancer_name),
child_policy_(std::move(child_policy)),
fallback_policy_(std::move(fallback_policy)) {}
@ -133,18 +133,18 @@ class ParsedXdsConfig : public ParsedLoadBalancingConfig {
const char* balancer_name() const { return balancer_name_; };
RefCountedPtr<ParsedLoadBalancingConfig> child_policy() const {
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
RefCountedPtr<ParsedLoadBalancingConfig> fallback_policy() const {
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy() const {
return fallback_policy_;
}
private:
const char* balancer_name_ = nullptr;
RefCountedPtr<ParsedLoadBalancingConfig> child_policy_;
RefCountedPtr<ParsedLoadBalancingConfig> fallback_policy_;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_;
};
class XdsLb : public LoadBalancingPolicy {
@ -300,9 +300,7 @@ class XdsLb : public LoadBalancingPolicy {
public:
explicit PickerRef(UniquePtr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs* pick, grpc_error** error) {
return picker_->Pick(pick, error);
}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
UniquePtr<SubchannelPicker> picker_;
@ -322,12 +320,11 @@ class XdsLb : public LoadBalancingPolicy {
: client_stats_(std::move(client_stats)),
pickers_(std::move(pickers)) {}
PickResult Pick(PickArgs* pick, grpc_error** error) override;
PickResult Pick(PickArgs args) override;
private:
// Calls the picker of the locality that the key falls within
PickResult PickFromLocality(const uint32_t key, PickArgs* pick,
grpc_error** error);
PickResult PickFromLocality(const uint32_t key, PickArgs args);
RefCountedPtr<XdsLbClientStats> client_stats_;
PickerList pickers_;
};
@ -363,7 +360,7 @@ class XdsLb : public LoadBalancingPolicy {
~LocalityEntry() = default;
void UpdateLocked(xds_grpclb_serverlist* serverlist,
ParsedLoadBalancingConfig* child_policy_config,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args);
void ShutdownLocked();
void ResetBackoffLocked();
@ -410,7 +407,7 @@ class XdsLb : public LoadBalancingPolicy {
};
void UpdateLocked(const LocalityList& locality_list,
ParsedLoadBalancingConfig* child_policy_config,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args, XdsLb* parent);
void ShutdownLocked();
void ResetBackoffLocked();
@ -506,7 +503,7 @@ class XdsLb : public LoadBalancingPolicy {
grpc_closure lb_on_fallback_;
// The policy to use for the fallback backends.
RefCountedPtr<ParsedLoadBalancingConfig> fallback_policy_config_;
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_config_;
// Lock held when modifying the value of fallback_policy_ or
// pending_fallback_policy_.
Mutex fallback_policy_mu_;
@ -515,7 +512,7 @@ class XdsLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> pending_fallback_policy_;
// The policy to use for the backends.
RefCountedPtr<ParsedLoadBalancingConfig> child_policy_config_;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
// Map of policies to use in the backend
LocalityMap locality_map_;
// TODO(mhaidry) : Add support for multiple maps of localities
@ -530,25 +527,24 @@ class XdsLb : public LoadBalancingPolicy {
// XdsLb::Picker
//
XdsLb::PickResult XdsLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) {
// TODO(roth): Add support for drop handling.
// Generate a random number between 0 and the total weight
const uint32_t key =
(rand() * pickers_[pickers_.size() - 1].first) / RAND_MAX;
// Forward pick to whichever locality maps to the range in which the
// random number falls in.
PickResult result = PickFromLocality(key, pick, error);
PickResult result = PickFromLocality(key, args);
// If pick succeeded, add client stats.
if (result == PickResult::PICK_COMPLETE &&
pick->connected_subchannel != nullptr && client_stats_ != nullptr) {
if (result.type == PickResult::PICK_COMPLETE &&
result.connected_subchannel != nullptr && client_stats_ != nullptr) {
// TODO(roth): Add support for client stats.
}
return result;
}
XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key,
PickArgs* pick,
grpc_error** error) {
PickArgs args) {
size_t mid = 0;
size_t start_index = 0;
size_t end_index = pickers_.size() - 1;
@ -566,7 +562,7 @@ XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key,
}
if (index == 0) index = start_index;
GPR_ASSERT(pickers_[index].first > key);
return pickers_[index].second->Pick(pick, error);
return pickers_[index].second->Pick(args);
}
//
@ -1744,7 +1740,7 @@ void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) {
void XdsLb::LocalityMap::UpdateLocked(
const LocalityList& locality_serverlist,
ParsedLoadBalancingConfig* child_policy_config,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args, XdsLb* parent) {
if (parent->shutting_down_) return;
for (size_t i = 0; i < locality_serverlist.size(); i++) {
@ -1839,7 +1835,7 @@ XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked(
void XdsLb::LocalityMap::LocalityEntry::UpdateLocked(
xds_grpclb_serverlist* serverlist,
ParsedLoadBalancingConfig* child_policy_config,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args_in) {
if (parent_->shutting_down_) return;
// Construct update args.
@ -2158,7 +2154,7 @@ class XdsFactory : public LoadBalancingPolicyFactory {
const char* name() const override { return kXds; }
RefCountedPtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json == nullptr) {
@ -2174,8 +2170,8 @@ class XdsFactory : public LoadBalancingPolicyFactory {
InlinedVector<grpc_error*, 3> error_list;
const char* balancer_name = nullptr;
RefCountedPtr<ParsedLoadBalancingConfig> child_policy;
RefCountedPtr<ParsedLoadBalancingConfig> fallback_policy;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy;
for (const grpc_json* field = json->child; field != nullptr;
field = field->next) {
if (field->key == nullptr) continue;
@ -2221,7 +2217,7 @@ class XdsFactory : public LoadBalancingPolicyFactory {
"field:balancerName error:not found"));
}
if (error_list.empty()) {
return RefCountedPtr<ParsedLoadBalancingConfig>(New<ParsedXdsConfig>(
return RefCountedPtr<LoadBalancingPolicy::Config>(New<ParsedXdsConfig>(
balancer_name, std::move(child_policy), std::move(fallback_policy)));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("Xds Parser", &error_list);

@ -37,7 +37,7 @@ class LoadBalancingPolicyFactory {
/// Caller does NOT take ownership of result.
virtual const char* name() const GRPC_ABSTRACT;
virtual RefCountedPtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
virtual RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error) const GRPC_ABSTRACT;
virtual ~LoadBalancingPolicyFactory() {}

@ -176,7 +176,7 @@ grpc_json* ParseLoadBalancingConfigHelper(const grpc_json* lb_config_array,
}
} // namespace
RefCountedPtr<ParsedLoadBalancingConfig>
RefCountedPtr<LoadBalancingPolicy::Config>
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(const grpc_json* json,
grpc_error** error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);

@ -56,7 +56,7 @@ class LoadBalancingPolicyRegistry {
/// Returns a parsed object of the load balancing policy to be used from a
/// LoadBalancingConfig array \a json.
static RefCountedPtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
static RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const grpc_json* json, grpc_error** error);
};

@ -268,7 +268,7 @@ ClientChannelServiceConfigParser::ParseGlobalParams(const grpc_json* json,
grpc_error** error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
InlinedVector<grpc_error*, 4> error_list;
RefCountedPtr<ParsedLoadBalancingConfig> parsed_lb_config;
RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config;
UniquePtr<char> lb_policy_name;
Optional<ClientChannelGlobalParsedConfig::RetryThrottling> retry_throttling;
const char* health_check_service_name = nullptr;

@ -45,7 +45,7 @@ class ClientChannelGlobalParsedConfig : public ServiceConfig::ParsedConfig {
};
ClientChannelGlobalParsedConfig(
RefCountedPtr<ParsedLoadBalancingConfig> parsed_lb_config,
RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config,
UniquePtr<char> parsed_deprecated_lb_policy,
const Optional<RetryThrottling>& retry_throttling,
const char* health_check_service_name)
@ -58,7 +58,7 @@ class ClientChannelGlobalParsedConfig : public ServiceConfig::ParsedConfig {
return retry_throttling_;
}
RefCountedPtr<ParsedLoadBalancingConfig> parsed_lb_config() const {
RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config() const {
return parsed_lb_config_;
}
@ -71,7 +71,7 @@ class ClientChannelGlobalParsedConfig : public ServiceConfig::ParsedConfig {
}
private:
RefCountedPtr<ParsedLoadBalancingConfig> parsed_lb_config_;
RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config_;
UniquePtr<char> parsed_deprecated_lb_policy_;
Optional<RetryThrottling> retry_throttling_;
const char* health_check_service_name_;

@ -184,7 +184,7 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
UniquePtr<char> child_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig> child_lb_config,
RefCountedPtr<LoadBalancingPolicy::Config> child_lb_config,
grpc_error** error)
: LoadBalancingPolicy(std::move(args)),
tracer_(tracer),
@ -333,7 +333,7 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
const char* lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config,
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result, TraceStringVector* trace_strings) {
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
@ -530,7 +530,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
const bool resolution_contains_addresses = result.addresses.size() > 0;
// Process the resolver result.
const char* lb_policy_name = nullptr;
RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config;
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
bool service_config_changed = false;
char* service_config_error_string = nullptr;
if (process_resolver_result_ != nullptr) {

@ -57,7 +57,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
UniquePtr<char> child_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig> child_lb_config,
RefCountedPtr<LoadBalancingPolicy::Config> child_lb_config,
grpc_error** error);
// Private ctor, to be used by client_channel only!
@ -70,7 +70,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// should set the channel to be in TRANSIENT_FAILURE.
typedef bool (*ProcessResolverResultCallback)(
void* user_data, Resolver::Result* result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
grpc_error** service_config_error);
// If error is set when this returns, then construction failed, and
// the caller may not use the new object.
@ -109,7 +109,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
void OnResolverError(grpc_error* error);
void CreateOrUpdateLbPolicyLocked(
const char* lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config,
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result, TraceStringVector* trace_strings);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const char* lb_policy_name, const grpc_channel_args& args,
@ -126,7 +126,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
ProcessResolverResultCallback process_resolver_result_ = nullptr;
void* process_resolver_result_user_data_ = nullptr;
UniquePtr<char> child_policy_name_;
RefCountedPtr<ParsedLoadBalancingConfig> child_lb_config_;
RefCountedPtr<LoadBalancingPolicy::Config> child_lb_config_;
// Resolver and associated state.
OrphanablePtr<Resolver> resolver_;

@ -120,10 +120,12 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
cb_(cb),
user_data_(user_data) {}
PickResult Pick(PickArgs* pick, grpc_error** error) override {
PickResult result = delegate_picker_->Pick(pick, error);
if (result == PICK_COMPLETE && pick->connected_subchannel != nullptr) {
New<TrailingMetadataHandler>(pick, cb_, user_data_); // deletes itself
PickResult Pick(PickArgs args) override {
PickResult result = delegate_picker_->Pick(args);
if (result.type == PickResult::PICK_COMPLETE &&
result.connected_subchannel != nullptr) {
new (args.call_state->Alloc(sizeof(TrailingMetadataHandler)))
TrailingMetadataHandler(&result, cb_, user_data_);
}
return result;
}
@ -169,35 +171,27 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
class TrailingMetadataHandler {
public:
TrailingMetadataHandler(PickArgs* pick,
TrailingMetadataHandler(PickResult* result,
InterceptRecvTrailingMetadataCallback cb,
void* user_data)
: cb_(cb), user_data_(user_data) {
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecordRecvTrailingMetadata, this,
grpc_schedule_on_exec_ctx);
pick->recv_trailing_metadata_ready = &recv_trailing_metadata_ready_;
pick->original_recv_trailing_metadata_ready =
&original_recv_trailing_metadata_ready_;
pick->recv_trailing_metadata = &recv_trailing_metadata_;
result->recv_trailing_metadata_ready = &RecordRecvTrailingMetadata;
result->recv_trailing_metadata_ready_user_data = this;
}
private:
static void RecordRecvTrailingMetadata(void* arg, grpc_error* err) {
static void RecordRecvTrailingMetadata(
void* arg, grpc_metadata_batch* recv_trailing_metadata,
CallState* call_state) {
TrailingMetadataHandler* self =
static_cast<TrailingMetadataHandler*>(arg);
GPR_ASSERT(self->recv_trailing_metadata_ != nullptr);
GPR_ASSERT(recv_trailing_metadata != nullptr);
self->cb_(self->user_data_);
GRPC_CLOSURE_SCHED(self->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(err));
Delete(self);
self->~TrailingMetadataHandler();
}
InterceptRecvTrailingMetadataCallback cb_;
void* user_data_;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
};
};

Loading…
Cancel
Save