Don't retry when LB policy drops the call (#25846)

pull/25847/head
Mark D. Roth 4 years ago committed by GitHub
parent e4071fc20c
commit 21e25a03d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CMakeLists.txt
  2. 6
      build_autogenerated.yaml
  3. 3
      gRPC-Core.podspec
  4. 4
      grpc.gyp
  5. 22
      src/core/ext/filters/client_channel/client_channel.cc
  6. 3
      src/core/ext/filters/client_channel/client_channel.h
  7. 38
      src/core/ext/filters/client_channel/retry_filter.cc
  8. 2
      src/core/lib/iomgr/error.cc
  9. 2
      src/core/lib/iomgr/error.h
  10. 8
      test/core/end2end/end2end_nosec_tests.cc
  11. 8
      test/core/end2end/end2end_tests.cc
  12. 10
      test/core/end2end/generate_tests.bzl
  13. 276
      test/core/end2end/tests/retry_lb_drop.cc
  14. 57
      test/core/util/test_lb_policies.cc
  15. 39
      test/core/util/test_lb_policies.h

@ -1113,6 +1113,7 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/retry_disabled.cc
test/core/end2end/tests/retry_exceeds_buffer_size_in_initial_batch.cc
test/core/end2end/tests/retry_exceeds_buffer_size_in_subsequent_batch.cc
test/core/end2end/tests/retry_lb_drop.cc
test/core/end2end/tests/retry_non_retriable_status.cc
test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
test/core/end2end/tests/retry_recv_initial_metadata.cc
@ -1140,6 +1141,7 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/workaround_cronet_compression.cc
test/core/end2end/tests/write_buffering.cc
test/core/end2end/tests/write_buffering_at_end.cc
test/core/util/test_lb_policies.cc
)
set_target_properties(end2end_nosec_tests PROPERTIES
@ -1244,6 +1246,7 @@ add_library(end2end_tests
test/core/end2end/tests/retry_disabled.cc
test/core/end2end/tests/retry_exceeds_buffer_size_in_initial_batch.cc
test/core/end2end/tests/retry_exceeds_buffer_size_in_subsequent_batch.cc
test/core/end2end/tests/retry_lb_drop.cc
test/core/end2end/tests/retry_non_retriable_status.cc
test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
test/core/end2end/tests/retry_recv_initial_metadata.cc
@ -1271,6 +1274,7 @@ add_library(end2end_tests
test/core/end2end/tests/workaround_cronet_compression.cc
test/core/end2end/tests/write_buffering.cc
test/core/end2end/tests/write_buffering_at_end.cc
test/core/util/test_lb_policies.cc
)
set_target_properties(end2end_tests PROPERTIES

@ -24,6 +24,7 @@ libs:
- test/core/end2end/fixtures/local_util.h
- test/core/end2end/fixtures/proxy.h
- test/core/end2end/tests/cancel_test_helpers.h
- test/core/util/test_lb_policies.h
src:
- test/core/end2end/cq_verifier.cc
- test/core/end2end/data/client_certs.cc
@ -87,6 +88,7 @@ libs:
- test/core/end2end/tests/retry_disabled.cc
- test/core/end2end/tests/retry_exceeds_buffer_size_in_initial_batch.cc
- test/core/end2end/tests/retry_exceeds_buffer_size_in_subsequent_batch.cc
- test/core/end2end/tests/retry_lb_drop.cc
- test/core/end2end/tests/retry_non_retriable_status.cc
- test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
- test/core/end2end/tests/retry_recv_initial_metadata.cc
@ -114,6 +116,7 @@ libs:
- test/core/end2end/tests/workaround_cronet_compression.cc
- test/core/end2end/tests/write_buffering.cc
- test/core/end2end/tests/write_buffering_at_end.cc
- test/core/util/test_lb_policies.cc
deps:
- grpc_test_util
- name: end2end_tests
@ -128,6 +131,7 @@ libs:
- test/core/end2end/fixtures/local_util.h
- test/core/end2end/fixtures/proxy.h
- test/core/end2end/tests/cancel_test_helpers.h
- test/core/util/test_lb_policies.h
src:
- test/core/end2end/cq_verifier.cc
- test/core/end2end/data/client_certs.cc
@ -192,6 +196,7 @@ libs:
- test/core/end2end/tests/retry_disabled.cc
- test/core/end2end/tests/retry_exceeds_buffer_size_in_initial_batch.cc
- test/core/end2end/tests/retry_exceeds_buffer_size_in_subsequent_batch.cc
- test/core/end2end/tests/retry_lb_drop.cc
- test/core/end2end/tests/retry_non_retriable_status.cc
- test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
- test/core/end2end/tests/retry_recv_initial_metadata.cc
@ -219,6 +224,7 @@ libs:
- test/core/end2end/tests/workaround_cronet_compression.cc
- test/core/end2end/tests/write_buffering.cc
- test/core/end2end/tests/write_buffering_at_end.cc
- test/core/util/test_lb_policies.cc
deps:
- grpc_test_util
- name: gpr

@ -2087,6 +2087,7 @@ Pod::Spec.new do |s|
'test/core/end2end/tests/retry_disabled.cc',
'test/core/end2end/tests/retry_exceeds_buffer_size_in_initial_batch.cc',
'test/core/end2end/tests/retry_exceeds_buffer_size_in_subsequent_batch.cc',
'test/core/end2end/tests/retry_lb_drop.cc',
'test/core/end2end/tests/retry_non_retriable_status.cc',
'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc',
'test/core/end2end/tests/retry_recv_initial_metadata.cc',
@ -2149,6 +2150,8 @@ Pod::Spec.new do |s|
'test/core/util/subprocess_windows.cc',
'test/core/util/test_config.cc',
'test/core/util/test_config.h',
'test/core/util/test_lb_policies.cc',
'test/core/util/test_lb_policies.h',
'test/core/util/test_tcp_server.cc',
'test/core/util/test_tcp_server.h',
'test/core/util/tls_utils.cc',

@ -242,6 +242,7 @@
'test/core/end2end/tests/retry_disabled.cc',
'test/core/end2end/tests/retry_exceeds_buffer_size_in_initial_batch.cc',
'test/core/end2end/tests/retry_exceeds_buffer_size_in_subsequent_batch.cc',
'test/core/end2end/tests/retry_lb_drop.cc',
'test/core/end2end/tests/retry_non_retriable_status.cc',
'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc',
'test/core/end2end/tests/retry_recv_initial_metadata.cc',
@ -269,6 +270,7 @@
'test/core/end2end/tests/workaround_cronet_compression.cc',
'test/core/end2end/tests/write_buffering.cc',
'test/core/end2end/tests/write_buffering_at_end.cc',
'test/core/util/test_lb_policies.cc',
],
},
{
@ -341,6 +343,7 @@
'test/core/end2end/tests/retry_disabled.cc',
'test/core/end2end/tests/retry_exceeds_buffer_size_in_initial_batch.cc',
'test/core/end2end/tests/retry_exceeds_buffer_size_in_subsequent_batch.cc',
'test/core/end2end/tests/retry_lb_drop.cc',
'test/core/end2end/tests/retry_non_retriable_status.cc',
'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc',
'test/core/end2end/tests/retry_recv_initial_metadata.cc',
@ -368,6 +371,7 @@
'test/core/end2end/tests/workaround_cronet_compression.cc',
'test/core/end2end/tests/write_buffering.cc',
'test/core/end2end/tests/write_buffering_at_end.cc',
'test/core/util/test_lb_policies.cc',
],
},
{

@ -2526,6 +2526,7 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall(
ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
grpc_slice_unref_internal(path_);
GRPC_ERROR_UNREF(cancel_error_);
GRPC_ERROR_UNREF(failure_error_);
if (backend_metric_data_ != nullptr) {
backend_metric_data_
->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
@ -2583,6 +2584,8 @@ void ClientChannel::LoadBalancedCall::PendingBatchesFail(
grpc_error* error,
YieldCallCombinerPredicate yield_call_combiner_predicate) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
GRPC_ERROR_UNREF(failure_error_);
failure_error_ = error;
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
size_t num_batches = 0;
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
@ -2610,7 +2613,6 @@ void ClientChannel::LoadBalancedCall::PendingBatchesFail(
} else {
closures.RunClosuresWithoutYielding(call_combiner_);
}
GRPC_ERROR_UNREF(error);
}
// This is called via the call combiner, so access to calld is synchronized.
@ -2768,12 +2770,16 @@ void ClientChannel::LoadBalancedCall::
if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb);
}
// Chain to original callback.
if (self->failure_error_ != GRPC_ERROR_NONE) {
error = self->failure_error_;
self->failure_error_ = GRPC_ERROR_NONE;
} else {
error = GRPC_ERROR_REF(error);
}
Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));
error);
}
// TODO(roth): Consider not intercepting this callback unless we
// actually need to, if this causes a performance problem.
void ClientChannel::LoadBalancedCall::
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
grpc_transport_stream_op_batch* batch) {
@ -2990,9 +2996,11 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(grpc_error** error) {
// Handle drops.
if (GPR_UNLIKELY(result.subchannel == nullptr)) {
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE),
GRPC_ERROR_INT_LB_POLICY_DROP, 1);
} else {
// Grab a ref to the connected subchannel while we're still
// holding the data plane mutex.

@ -444,6 +444,9 @@ class ClientChannel::LoadBalancedCall
// Set when we get a cancel_stream op.
grpc_error* cancel_error_ = GRPC_ERROR_NONE;
// Set when we fail inside the LB call.
grpc_error* failure_error_ = GRPC_ERROR_NONE;
grpc_polling_entity* pollent_ = nullptr;
grpc_closure pick_closure_;

@ -241,6 +241,8 @@ class RetryFilter::CallData {
// TODO(roth): As part of implementing hedging, we'll need to store a
// ref to the LB call in this struct instead of doing the parent_data
// hack, since there will be multiple LB calls in flight at once.
// We will also need to maintain a list of all pending attempts, so that
// we can cancel them all if the call gets cancelled.
struct SubchannelCallRetryState {
explicit SubchannelCallRetryState(grpc_call_context_element* context)
: batch_payload(context),
@ -374,7 +376,7 @@ class RetryFilter::CallData {
grpc_millis server_pushback_ms);
// Returns true if the call is being retried.
bool MaybeRetry(SubchannelCallBatchData* batch_data, grpc_status_code status,
grpc_mdelem* server_pushback_md);
grpc_mdelem* server_pushback_md, bool is_lb_drop);
// Invokes recv_initial_metadata_ready for a subchannel batch.
static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
@ -388,11 +390,11 @@ class RetryFilter::CallData {
// Commits the call and returns the message up the stack.
static void RecvMessageReady(void* arg, grpc_error* error);
// Sets *status and *server_pushback_md based on md_batch and error.
// Only sets *server_pushback_md if server_pushback_md != nullptr.
// Sets *status, *server_pushback_md, and *is_lb_drop based on md_batch
// and error.
void GetCallStatus(grpc_metadata_batch* md_batch, grpc_error* error,
grpc_status_code* status,
grpc_mdelem** server_pushback_md);
grpc_status_code* status, grpc_mdelem** server_pushback_md,
bool* is_lb_drop);
// Adds recv_trailing_metadata_ready closure to closures.
void AddClosureForRecvTrailingMetadataReady(
SubchannelCallBatchData* batch_data, grpc_error* error,
@ -1073,7 +1075,10 @@ void RetryFilter::CallData::DoRetry(SubchannelCallRetryState* retry_state,
bool RetryFilter::CallData::MaybeRetry(SubchannelCallBatchData* batch_data,
grpc_status_code status,
grpc_mdelem* server_pushback_md) {
grpc_mdelem* server_pushback_md,
bool is_lb_drop) {
// LB drops always inhibit retries.
if (is_lb_drop) return false;
// Get retry policy.
if (retry_policy_ == nullptr) return false;
// If we've already dispatched a retry from this call, return true.
@ -1407,15 +1412,20 @@ void RetryFilter::CallData::RecvMessageReady(void* arg, grpc_error* error) {
void RetryFilter::CallData::GetCallStatus(grpc_metadata_batch* md_batch,
grpc_error* error,
grpc_status_code* status,
grpc_mdelem** server_pushback_md) {
grpc_mdelem** server_pushback_md,
bool* is_lb_drop) {
if (error != GRPC_ERROR_NONE) {
grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
intptr_t value = 0;
if (grpc_error_get_int(error, GRPC_ERROR_INT_LB_POLICY_DROP, &value) &&
value != 0) {
*is_lb_drop = true;
}
} else {
GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
*status =
grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
if (server_pushback_md != nullptr &&
md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
*server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
}
@ -1571,14 +1581,16 @@ void RetryFilter::CallData::RecvTrailingMetadataReady(void* arg,
grpc_mdelem* server_pushback_md = nullptr;
grpc_metadata_batch* md_batch =
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
bool is_lb_drop = false;
call->GetCallStatus(md_batch, GRPC_ERROR_REF(error), &status,
&server_pushback_md);
&server_pushback_md, &is_lb_drop);
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s",
call->chand_, call, grpc_status_code_to_string(status));
gpr_log(GPR_INFO,
"chand=%p calld=%p: call finished, status=%s is_lb_drop=%d",
call->chand_, call, grpc_status_code_to_string(status), is_lb_drop);
}
// Check if we should retry.
if (call->MaybeRetry(batch_data, status, server_pushback_md)) {
if (call->MaybeRetry(batch_data, status, server_pushback_md, is_lb_drop)) {
// Unref batch_data for deferred recv_initial_metadata_ready or
// recv_message_ready callbacks, if any.
if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {

@ -75,6 +75,8 @@ static const char* error_int_name(grpc_error_ints key) {
return "occurred_during_write";
case GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE:
return "channel_connectivity_state";
case GRPC_ERROR_INT_LB_POLICY_DROP:
return "lb_policy_drop";
case GRPC_ERROR_INT_MAX:
GPR_UNREACHABLE_CODE(return "unknown");
}

@ -74,6 +74,8 @@ typedef enum {
GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
/// channel connectivity state associated with the error
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE,
/// LB policy drop
GRPC_ERROR_INT_LB_POLICY_DROP,
/// Must always be last
GRPC_ERROR_INT_MAX,

@ -133,6 +133,8 @@ extern void retry_exceeds_buffer_size_in_initial_batch(grpc_end2end_test_config
extern void retry_exceeds_buffer_size_in_initial_batch_pre_init(void);
extern void retry_exceeds_buffer_size_in_subsequent_batch(grpc_end2end_test_config config);
extern void retry_exceeds_buffer_size_in_subsequent_batch_pre_init(void);
extern void retry_lb_drop(grpc_end2end_test_config config);
extern void retry_lb_drop_pre_init(void);
extern void retry_non_retriable_status(grpc_end2end_test_config config);
extern void retry_non_retriable_status_pre_init(void);
extern void retry_non_retriable_status_before_recv_trailing_metadata_started(grpc_end2end_test_config config);
@ -243,6 +245,7 @@ void grpc_end2end_tests_pre_init(void) {
retry_disabled_pre_init();
retry_exceeds_buffer_size_in_initial_batch_pre_init();
retry_exceeds_buffer_size_in_subsequent_batch_pre_init();
retry_lb_drop_pre_init();
retry_non_retriable_status_pre_init();
retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init();
retry_recv_initial_metadata_pre_init();
@ -331,6 +334,7 @@ void grpc_end2end_tests(int argc, char **argv,
retry_disabled(config);
retry_exceeds_buffer_size_in_initial_batch(config);
retry_exceeds_buffer_size_in_subsequent_batch(config);
retry_lb_drop(config);
retry_non_retriable_status(config);
retry_non_retriable_status_before_recv_trailing_metadata_started(config);
retry_recv_initial_metadata(config);
@ -570,6 +574,10 @@ void grpc_end2end_tests(int argc, char **argv,
retry_exceeds_buffer_size_in_subsequent_batch(config);
continue;
}
if (0 == strcmp("retry_lb_drop", argv[i])) {
retry_lb_drop(config);
continue;
}
if (0 == strcmp("retry_non_retriable_status", argv[i])) {
retry_non_retriable_status(config);
continue;

@ -135,6 +135,8 @@ extern void retry_exceeds_buffer_size_in_initial_batch(grpc_end2end_test_config
extern void retry_exceeds_buffer_size_in_initial_batch_pre_init(void);
extern void retry_exceeds_buffer_size_in_subsequent_batch(grpc_end2end_test_config config);
extern void retry_exceeds_buffer_size_in_subsequent_batch_pre_init(void);
extern void retry_lb_drop(grpc_end2end_test_config config);
extern void retry_lb_drop_pre_init(void);
extern void retry_non_retriable_status(grpc_end2end_test_config config);
extern void retry_non_retriable_status_pre_init(void);
extern void retry_non_retriable_status_before_recv_trailing_metadata_started(grpc_end2end_test_config config);
@ -246,6 +248,7 @@ void grpc_end2end_tests_pre_init(void) {
retry_disabled_pre_init();
retry_exceeds_buffer_size_in_initial_batch_pre_init();
retry_exceeds_buffer_size_in_subsequent_batch_pre_init();
retry_lb_drop_pre_init();
retry_non_retriable_status_pre_init();
retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init();
retry_recv_initial_metadata_pre_init();
@ -335,6 +338,7 @@ void grpc_end2end_tests(int argc, char **argv,
retry_disabled(config);
retry_exceeds_buffer_size_in_initial_batch(config);
retry_exceeds_buffer_size_in_subsequent_batch(config);
retry_lb_drop(config);
retry_non_retriable_status(config);
retry_non_retriable_status_before_recv_trailing_metadata_started(config);
retry_recv_initial_metadata(config);
@ -578,6 +582,10 @@ void grpc_end2end_tests(int argc, char **argv,
retry_exceeds_buffer_size_in_subsequent_batch(config);
continue;
}
if (0 == strcmp("retry_lb_drop", argv[i])) {
retry_lb_drop(config);
continue;
}
if (0 == strcmp("retry_non_retriable_status", argv[i])) {
retry_non_retriable_status(config);
continue;

@ -296,6 +296,10 @@ END2END_TESTS = {
# See b/151617965
short_name = "retry_exceeds_buffer_size_in_subseq",
),
"retry_lb_drop": _test_options(
needs_client_channel = True,
proxyable = False,
),
"retry_non_retriable_status": _test_options(
needs_client_channel = True,
proxyable = False,
@ -422,12 +426,14 @@ def grpc_end2end_tests():
"end2end_tests.h",
],
language = "C++",
testonly = 1,
deps = [
":cq_verifier",
":ssl_test_data",
":http_proxy",
":proxy",
":local_util",
"//test/core/util:test_lb_policies",
],
)
@ -436,6 +442,7 @@ def grpc_end2end_tests():
name = "%s_test" % f,
srcs = ["fixtures/%s.cc" % f],
language = "C++",
testonly = 1,
data = [
"//src/core/tsi/test_creds:ca.pem",
"//src/core/tsi/test_creds:server1.key",
@ -497,12 +504,14 @@ def grpc_end2end_nosec_tests():
"end2end_tests.h",
],
language = "C++",
testonly = 1,
deps = [
":cq_verifier",
":ssl_test_data",
":http_proxy",
":proxy",
":local_util",
"//test/core/util:test_lb_policies",
],
)
@ -513,6 +522,7 @@ def grpc_end2end_nosec_tests():
name = "%s_nosec_test" % f,
srcs = ["fixtures/%s.cc" % f],
language = "C++",
testonly = 1,
data = [
"//src/core/tsi/test_creds:ca.pem",
"//src/core/tsi/test_creds:server1.key",

@ -0,0 +1,276 @@
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "test/core/end2end/end2end_tests.h"
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/static_metadata.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/tests/cancel_test_helpers.h"
#include "test/core/util/test_lb_policies.h"
namespace grpc_core {
namespace {
const char* kDropPolicyName = "drop_lb";
class DropPolicy : public LoadBalancingPolicy {
public:
explicit DropPolicy(Args args) : LoadBalancingPolicy(std::move(args)) {}
const char* name() const override { return kDropPolicyName; }
void UpdateLocked(UpdateArgs) override {
channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<DropPicker>());
}
void ResetBackoffLocked() override {}
void ShutdownLocked() override {}
private:
class DropPicker : public SubchannelPicker {
public:
PickResult Pick(PickArgs /*args*/) override {
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
}
};
};
class DropLbConfig : public LoadBalancingPolicy::Config {
public:
const char* name() const override { return kDropPolicyName; }
};
class DropPolicyFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<DropPolicy>(std::move(args));
}
const char* name() const override { return kDropPolicyName; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& /*json*/, grpc_error** /*error*/) const override {
return MakeRefCounted<DropLbConfig>();
}
};
std::vector<PickArgsSeen>* g_pick_args_vector = nullptr;
void RegisterDropPolicy() {
LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
absl::make_unique<DropPolicyFactory>());
RegisterTestPickArgsLoadBalancingPolicy(
[](const PickArgsSeen& pick_args) {
GPR_ASSERT(g_pick_args_vector != nullptr);
g_pick_args_vector->push_back(pick_args);
},
kDropPolicyName);
}
} // namespace
} // namespace grpc_core
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Tests that we don't retry when retries are disabled via the
// GRPC_ARG_ENABLE_RETRIES channel arg, even when there is retry
// configuration in the service config.
// - 1 retry allowed for ABORTED status
// - first attempt returns ABORTED but does not retry
static void test_retry_lb_drop(grpc_end2end_test_config config) {
grpc_call* c;
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer* response_payload_recv = nullptr;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
std::vector<grpc_core::PickArgsSeen> pick_args_seen;
grpc_core::g_pick_args_vector = &pick_args_seen;
grpc_arg arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
const_cast<char*>(
"{\n"
" \"loadBalancingConfig\": [ {\n"
" \"test_pick_args_lb\": {}\n"
" } ],\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"service\", \"method\": \"method\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 2,\n"
" \"initialBackoff\": \"1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1.6,\n"
" \"retryableStatusCodes\": [ \"UNAVAILABLE\" ]\n"
" }\n"
" } ]\n"
"}"));
grpc_channel_args client_args = {1, &arg};
grpc_end2end_test_fixture f =
begin_test(config, "retry_lb_drop", &client_args, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(1), true);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
GPR_ASSERT(0 == grpc_slice_str_cmp(details,
"Call dropped by load balancing policy"));
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload_recv);
grpc_call_unref(c);
cq_verifier_destroy(cqv);
gpr_log(GPR_INFO, "NUMBER OF LB PICKS: %" PRIuPTR, pick_args_seen.size());
GPR_ASSERT(pick_args_seen.size() == 1);
grpc_core::g_pick_args_vector = nullptr;
end_test(&f);
config.tear_down_data(&f);
}
void retry_lb_drop(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
test_retry_lb_drop(config);
}
void retry_lb_drop_pre_init(void) { grpc_core::RegisterDropPolicy(); }

@ -1,20 +1,18 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "test/core/util/test_lb_policies.h"
@ -50,14 +48,14 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
public:
ForwardingLoadBalancingPolicy(
std::unique_ptr<ChannelControlHelper> delegating_helper, Args args,
const std::string& delegate_policy_name, intptr_t initial_refcount = 1)
const char* delegate_policy_name, intptr_t initial_refcount = 1)
: LoadBalancingPolicy(std::move(args), initial_refcount) {
Args delegate_args;
delegate_args.work_serializer = work_serializer();
delegate_args.channel_control_helper = std::move(delegating_helper);
delegate_args.args = args.args;
delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
delegate_policy_name.c_str(), std::move(delegate_args));
delegate_policy_name, std::move(delegate_args));
grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),
interested_parties());
}
@ -99,11 +97,11 @@ constexpr char kTestPickArgsLbPolicyName[] = "test_pick_args_lb";
class TestPickArgsLb : public ForwardingLoadBalancingPolicy {
public:
TestPickArgsLb(Args args, TestPickArgsCallback cb)
TestPickArgsLb(Args args, TestPickArgsCallback cb,
const char* delegate_policy_name)
: ForwardingLoadBalancingPolicy(
absl::make_unique<Helper>(RefCountedPtr<TestPickArgsLb>(this), cb),
std::move(args),
/*delegate_policy_name=*/"pick_first",
std::move(args), delegate_policy_name,
/*initial_refcount=*/2) {}
~TestPickArgsLb() override = default;
@ -171,12 +169,14 @@ class TestPickArgsLbConfig : public LoadBalancingPolicy::Config {
class TestPickArgsLbFactory : public LoadBalancingPolicyFactory {
public:
explicit TestPickArgsLbFactory(TestPickArgsCallback cb)
: cb_(std::move(cb)) {}
explicit TestPickArgsLbFactory(TestPickArgsCallback cb,
const char* delegate_policy_name)
: cb_(std::move(cb)), delegate_policy_name_(delegate_policy_name) {}
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<TestPickArgsLb>(std::move(args), cb_);
return MakeOrphanable<TestPickArgsLb>(std::move(args), cb_,
delegate_policy_name_);
}
const char* name() const override { return kTestPickArgsLbPolicyName; }
@ -188,6 +188,7 @@ class TestPickArgsLbFactory : public LoadBalancingPolicyFactory {
private:
TestPickArgsCallback cb_;
const char* delegate_policy_name_;
};
//
@ -416,9 +417,11 @@ class AddressTestFactory : public LoadBalancingPolicyFactory {
} // namespace
void RegisterTestPickArgsLoadBalancingPolicy(TestPickArgsCallback cb) {
void RegisterTestPickArgsLoadBalancingPolicy(TestPickArgsCallback cb,
const char* delegate_policy_name) {
LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
absl::make_unique<TestPickArgsLbFactory>(std::move(cb)));
absl::make_unique<TestPickArgsLbFactory>(std::move(cb),
delegate_policy_name));
}
void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(

@ -1,20 +1,18 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H
#define GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H
@ -32,9 +30,10 @@ struct PickArgsSeen {
using TestPickArgsCallback = std::function<void(const PickArgsSeen&)>;
// Registers an LB policy called "test_pick_args_lb" that checks the args
// passed to SubchannelPicker::Pick().
void RegisterTestPickArgsLoadBalancingPolicy(TestPickArgsCallback cb);
// Registers an LB policy called "test_pick_args_lb" that passes the args
// passed to SubchannelPicker::Pick() to cb.
void RegisterTestPickArgsLoadBalancingPolicy(
TestPickArgsCallback cb, const char* delegate_policy_name = "pick_first");
struct TrailingMetadataArgsSeen {
const LoadBalancingPolicy::BackendMetricData* backend_metric_data;

Loading…
Cancel
Save