Move InterceptRecvTrailingMetadataLoadBalancingPolicy to a separate file.

This fixes a link error when building with make.
reviewable/pr17577/r2
Mark D. Roth 6 years ago
parent 8cd7178afb
commit d6e2b33670
  1. 4
      CMakeLists.txt
  2. 4
      Makefile
  3. 4
      build.yaml
  4. 4
      gRPC-Core.podspec
  5. 4
      grpc.gyp
  6. 61
      test/core/util/forwarding_load_balancing_policy.cc
  7. 102
      test/core/util/forwarding_load_balancing_policy.h
  8. 240
      test/core/util/test_lb_policies.cc
  9. 34
      test/core/util/test_lb_policies.h
  10. 107
      test/cpp/end2end/client_lb_end2end_test.cc
  11. 6
      tools/run_tests/generated/sources_and_headers.json

@ -1756,7 +1756,6 @@ add_library(grpc_test_util
test/core/end2end/fixtures/proxy.cc
test/core/iomgr/endpoint_tests.cc
test/core/util/debugger_macros.cc
test/core/util/forwarding_load_balancing_policy.cc
test/core/util/fuzzer_util.cc
test/core/util/grpc_profiler.cc
test/core/util/histogram.cc
@ -1771,6 +1770,7 @@ add_library(grpc_test_util
test/core/util/subprocess_posix.cc
test/core/util/subprocess_windows.cc
test/core/util/test_config.cc
test/core/util/test_lb_policies.cc
test/core/util/tracer_util.cc
test/core/util/trickle_endpoint.cc
test/core/util/cmdline.cc
@ -2079,7 +2079,6 @@ add_library(grpc_test_util_unsecure
test/core/end2end/fixtures/proxy.cc
test/core/iomgr/endpoint_tests.cc
test/core/util/debugger_macros.cc
test/core/util/forwarding_load_balancing_policy.cc
test/core/util/fuzzer_util.cc
test/core/util/grpc_profiler.cc
test/core/util/histogram.cc
@ -2094,6 +2093,7 @@ add_library(grpc_test_util_unsecure
test/core/util/subprocess_posix.cc
test/core/util/subprocess_windows.cc
test/core/util/test_config.cc
test/core/util/test_lb_policies.cc
test/core/util/tracer_util.cc
test/core/util/trickle_endpoint.cc
test/core/util/cmdline.cc

@ -4258,7 +4258,6 @@ LIBGRPC_TEST_UTIL_SRC = \
test/core/end2end/fixtures/proxy.cc \
test/core/iomgr/endpoint_tests.cc \
test/core/util/debugger_macros.cc \
test/core/util/forwarding_load_balancing_policy.cc \
test/core/util/fuzzer_util.cc \
test/core/util/grpc_profiler.cc \
test/core/util/histogram.cc \
@ -4273,6 +4272,7 @@ LIBGRPC_TEST_UTIL_SRC = \
test/core/util/subprocess_posix.cc \
test/core/util/subprocess_windows.cc \
test/core/util/test_config.cc \
test/core/util/test_lb_policies.cc \
test/core/util/tracer_util.cc \
test/core/util/trickle_endpoint.cc \
test/core/util/cmdline.cc \
@ -4568,7 +4568,6 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
test/core/end2end/fixtures/proxy.cc \
test/core/iomgr/endpoint_tests.cc \
test/core/util/debugger_macros.cc \
test/core/util/forwarding_load_balancing_policy.cc \
test/core/util/fuzzer_util.cc \
test/core/util/grpc_profiler.cc \
test/core/util/histogram.cc \
@ -4583,6 +4582,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
test/core/util/subprocess_posix.cc \
test/core/util/subprocess_windows.cc \
test/core/util/test_config.cc \
test/core/util/test_lb_policies.cc \
test/core/util/tracer_util.cc \
test/core/util/trickle_endpoint.cc \
test/core/util/cmdline.cc \

@ -906,7 +906,6 @@ filegroups:
- test/core/end2end/fixtures/proxy.h
- test/core/iomgr/endpoint_tests.h
- test/core/util/debugger_macros.h
- test/core/util/forwarding_load_balancing_policy.h
- test/core/util/fuzzer_util.h
- test/core/util/grpc_profiler.h
- test/core/util/histogram.h
@ -919,6 +918,7 @@ filegroups:
- test/core/util/slice_splitter.h
- test/core/util/subprocess.h
- test/core/util/test_config.h
- test/core/util/test_lb_policies.h
- test/core/util/tracer_util.h
- test/core/util/trickle_endpoint.h
src:
@ -929,7 +929,6 @@ filegroups:
- test/core/end2end/fixtures/proxy.cc
- test/core/iomgr/endpoint_tests.cc
- test/core/util/debugger_macros.cc
- test/core/util/forwarding_load_balancing_policy.cc
- test/core/util/fuzzer_util.cc
- test/core/util/grpc_profiler.cc
- test/core/util/histogram.cc
@ -944,6 +943,7 @@ filegroups:
- test/core/util/subprocess_posix.cc
- test/core/util/subprocess_windows.cc
- test/core/util/test_config.cc
- test/core/util/test_lb_policies.cc
- test/core/util/tracer_util.cc
- test/core/util/trickle_endpoint.cc
deps:

@ -1212,7 +1212,6 @@ Pod::Spec.new do |s|
'test/core/end2end/fixtures/proxy.cc',
'test/core/iomgr/endpoint_tests.cc',
'test/core/util/debugger_macros.cc',
'test/core/util/forwarding_load_balancing_policy.cc',
'test/core/util/fuzzer_util.cc',
'test/core/util/grpc_profiler.cc',
'test/core/util/histogram.cc',
@ -1227,6 +1226,7 @@ Pod::Spec.new do |s|
'test/core/util/subprocess_posix.cc',
'test/core/util/subprocess_windows.cc',
'test/core/util/test_config.cc',
'test/core/util/test_lb_policies.cc',
'test/core/util/tracer_util.cc',
'test/core/util/trickle_endpoint.cc',
'test/core/util/cmdline.cc',
@ -1241,7 +1241,6 @@ Pod::Spec.new do |s|
'test/core/end2end/fixtures/proxy.h',
'test/core/iomgr/endpoint_tests.h',
'test/core/util/debugger_macros.h',
'test/core/util/forwarding_load_balancing_policy.h',
'test/core/util/fuzzer_util.h',
'test/core/util/grpc_profiler.h',
'test/core/util/histogram.h',
@ -1254,6 +1253,7 @@ Pod::Spec.new do |s|
'test/core/util/slice_splitter.h',
'test/core/util/subprocess.h',
'test/core/util/test_config.h',
'test/core/util/test_lb_policies.h',
'test/core/util/tracer_util.h',
'test/core/util/trickle_endpoint.h',
'test/core/util/cmdline.h',

@ -611,7 +611,6 @@
'test/core/end2end/fixtures/proxy.cc',
'test/core/iomgr/endpoint_tests.cc',
'test/core/util/debugger_macros.cc',
'test/core/util/forwarding_load_balancing_policy.cc',
'test/core/util/fuzzer_util.cc',
'test/core/util/grpc_profiler.cc',
'test/core/util/histogram.cc',
@ -626,6 +625,7 @@
'test/core/util/subprocess_posix.cc',
'test/core/util/subprocess_windows.cc',
'test/core/util/test_config.cc',
'test/core/util/test_lb_policies.cc',
'test/core/util/tracer_util.cc',
'test/core/util/trickle_endpoint.cc',
'test/core/util/cmdline.cc',
@ -854,7 +854,6 @@
'test/core/end2end/fixtures/proxy.cc',
'test/core/iomgr/endpoint_tests.cc',
'test/core/util/debugger_macros.cc',
'test/core/util/forwarding_load_balancing_policy.cc',
'test/core/util/fuzzer_util.cc',
'test/core/util/grpc_profiler.cc',
'test/core/util/histogram.cc',
@ -869,6 +868,7 @@
'test/core/util/subprocess_posix.cc',
'test/core/util/subprocess_windows.cc',
'test/core/util/test_config.cc',
'test/core/util/test_lb_policies.cc',
'test/core/util/tracer_util.cc',
'test/core/util/trickle_endpoint.cc',
'test/core/util/cmdline.cc',

@ -1,61 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test/core/util/forwarding_load_balancing_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/pollset_set.h"
namespace grpc_core {
TraceFlag grpc_trace_forwarding_lb(false, "forwarding_lb");
ForwardingLoadBalancingPolicy::ForwardingLoadBalancingPolicy(
const Args& args, const std::string& delegate_policy_name)
: LoadBalancingPolicy(args) {
delegate_ =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
delegate_policy_name.c_str(), args);
grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),
interested_parties());
// Give re-resolution closure to delegate.
GRPC_CLOSURE_INIT(&on_delegate_request_reresolution_,
OnDelegateRequestReresolutionLocked, this,
grpc_combiner_scheduler(combiner()));
Ref().release(); // held by callback.
delegate_->SetReresolutionClosureLocked(&on_delegate_request_reresolution_);
}
ForwardingLoadBalancingPolicy::~ForwardingLoadBalancingPolicy() {}
void ForwardingLoadBalancingPolicy::OnDelegateRequestReresolutionLocked(
void* arg, grpc_error* error) {
ForwardingLoadBalancingPolicy* self =
static_cast<ForwardingLoadBalancingPolicy*>(arg);
if (error != GRPC_ERROR_NONE || self->delegate_ == nullptr) {
self->Unref();
return;
}
self->TryReresolutionLocked(&grpc_trace_forwarding_lb, GRPC_ERROR_NONE);
self->delegate_->SetReresolutionClosureLocked(
&self->on_delegate_request_reresolution_);
}
} // namespace grpc_core

@ -1,102 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <string>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/transport/connectivity_state.h"
#ifndef GRPC_TEST_CORE_UTIL_FORWARDING_LOAD_BALANCING_POLICY_H
#define GRPC_TEST_CORE_UTIL_FORWARDING_LOAD_BALANCING_POLICY_H
namespace grpc_core {
// A minimal forwarding class to avoid implementing a standalone test LB.
class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
public:
ForwardingLoadBalancingPolicy(const Args& args,
const std::string& delegate_policy_name);
~ForwardingLoadBalancingPolicy() override;
const char* name() const override { return delegate_->name(); }
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override {
delegate_->UpdateLocked(args, lb_config);
}
bool PickLocked(PickState* pick, grpc_error** error) override {
return delegate_->PickLocked(pick, error);
}
void CancelPickLocked(PickState* pick, grpc_error* error) override {
delegate_->CancelPickLocked(pick, error);
}
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) override {
delegate_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
initial_metadata_flags_eq, error);
}
void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
grpc_closure* closure) override {
delegate_->NotifyOnStateChangeLocked(state, closure);
}
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override {
return delegate_->CheckConnectivityLocked(connectivity_error);
}
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override {
delegate_->HandOffPendingPicksLocked(new_policy);
}
void ExitIdleLocked() override { delegate_->ExitIdleLocked(); }
void ResetBackoffLocked() override { delegate_->ResetBackoffLocked(); }
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) override {
delegate_->FillChildRefsForChannelz(child_subchannels, child_channels);
}
private:
void ShutdownLocked() override { delegate_.reset(); }
static void OnDelegateRequestReresolutionLocked(void* arg,
grpc_error* error);
OrphanablePtr<LoadBalancingPolicy> delegate_;
grpc_closure on_delegate_request_reresolution_;
};
} // namespace grpc_core
#endif // GRPC_TEST_CORE_UTIL_FORWARDING_LOAD_BALANCING_POLICY_H

@ -0,0 +1,240 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test/core/util/test_lb_policies.h"
#include <string>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_trace_forwarding_lb(false, "forwarding_lb");
namespace {
//
// ForwardingLoadBalancingPolicy
//
// A minimal forwarding class to avoid implementing a standalone test LB.
class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
public:
ForwardingLoadBalancingPolicy(const Args& args,
const std::string& delegate_policy_name)
: LoadBalancingPolicy(args) {
delegate_ =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
delegate_policy_name.c_str(), args);
grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),
interested_parties());
// Give re-resolution closure to delegate.
GRPC_CLOSURE_INIT(&on_delegate_request_reresolution_,
OnDelegateRequestReresolutionLocked, this,
grpc_combiner_scheduler(combiner()));
Ref().release(); // held by callback.
delegate_->SetReresolutionClosureLocked(&on_delegate_request_reresolution_);
}
~ForwardingLoadBalancingPolicy() override = default;
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override {
delegate_->UpdateLocked(args, lb_config);
}
bool PickLocked(PickState* pick, grpc_error** error) override {
return delegate_->PickLocked(pick, error);
}
void CancelPickLocked(PickState* pick, grpc_error* error) override {
delegate_->CancelPickLocked(pick, error);
}
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) override {
delegate_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
initial_metadata_flags_eq, error);
}
void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
grpc_closure* closure) override {
delegate_->NotifyOnStateChangeLocked(state, closure);
}
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override {
return delegate_->CheckConnectivityLocked(connectivity_error);
}
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override {
delegate_->HandOffPendingPicksLocked(new_policy);
}
void ExitIdleLocked() override { delegate_->ExitIdleLocked(); }
void ResetBackoffLocked() override { delegate_->ResetBackoffLocked(); }
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) override {
delegate_->FillChildRefsForChannelz(child_subchannels, child_channels);
}
private:
void ShutdownLocked() override {
delegate_.reset();
TryReresolutionLocked(&grpc_trace_forwarding_lb, GRPC_ERROR_CANCELLED);
}
static void OnDelegateRequestReresolutionLocked(void* arg,
grpc_error* error) {
ForwardingLoadBalancingPolicy* self =
static_cast<ForwardingLoadBalancingPolicy*>(arg);
if (error != GRPC_ERROR_NONE || self->delegate_ == nullptr) {
self->Unref();
return;
}
self->TryReresolutionLocked(&grpc_trace_forwarding_lb, GRPC_ERROR_NONE);
self->delegate_->SetReresolutionClosureLocked(
&self->on_delegate_request_reresolution_);
}
OrphanablePtr<LoadBalancingPolicy> delegate_;
grpc_closure on_delegate_request_reresolution_;
};
//
// InterceptRecvTrailingMetadataLoadBalancingPolicy
//
constexpr char kInterceptRecvTrailingMetadataLbPolicyName[] =
"intercept_trailing_metadata_lb";
class InterceptRecvTrailingMetadataLoadBalancingPolicy
: public ForwardingLoadBalancingPolicy {
public:
InterceptRecvTrailingMetadataLoadBalancingPolicy(
const Args& args, InterceptRecvTrailingMetadataCallback cb,
void* user_data)
: ForwardingLoadBalancingPolicy(args,
/*delegate_lb_policy_name=*/"pick_first"),
cb_(cb), user_data_(user_data) {}
~InterceptRecvTrailingMetadataLoadBalancingPolicy() override = default;
const char* name() const override {
return kInterceptRecvTrailingMetadataLbPolicyName;
}
bool PickLocked(PickState* pick, grpc_error** error) override {
bool ret = ForwardingLoadBalancingPolicy::PickLocked(pick, error);
// Note: This assumes that the delegate policy does not
// intercepting recv_trailing_metadata. If we ever need to use
// this with a delegate policy that does, then we'll need to
// handle async pick returns separately.
New<TrailingMetadataHandler>(pick, cb_, user_data_); // deletes itself
return ret;
}
private:
class TrailingMetadataHandler {
public:
TrailingMetadataHandler(PickState* pick,
InterceptRecvTrailingMetadataCallback cb,
void* user_data)
: cb_(cb), user_data_(user_data) {
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecordRecvTrailingMetadata, this,
grpc_schedule_on_exec_ctx);
pick->recv_trailing_metadata_ready = &recv_trailing_metadata_ready_;
pick->original_recv_trailing_metadata_ready =
&original_recv_trailing_metadata_ready_;
pick->recv_trailing_metadata = &recv_trailing_metadata_;
}
private:
static void RecordRecvTrailingMetadata(void* arg, grpc_error* err) {
TrailingMetadataHandler* self =
static_cast<TrailingMetadataHandler*>(arg);
GPR_ASSERT(self->recv_trailing_metadata_ != nullptr);
self->cb_(self->user_data_);
GRPC_CLOSURE_SCHED(self->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(err));
Delete(self);
}
InterceptRecvTrailingMetadataCallback cb_;
void* user_data_;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
};
InterceptRecvTrailingMetadataCallback cb_;
void* user_data_;
};
class InterceptTrailingFactory : public LoadBalancingPolicyFactory {
public:
explicit InterceptTrailingFactory(
InterceptRecvTrailingMetadataCallback cb, void* user_data)
: cb_(cb), user_data_(user_data) {}
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>
CreateLoadBalancingPolicy(
const grpc_core::LoadBalancingPolicy::Args& args) const override {
return grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>(
grpc_core::New<InterceptRecvTrailingMetadataLoadBalancingPolicy>(
args, cb_, user_data_));
}
const char* name() const override {
return kInterceptRecvTrailingMetadataLbPolicyName;
}
private:
InterceptRecvTrailingMetadataCallback cb_;
void* user_data_;
};
} // namespace
void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
InterceptRecvTrailingMetadataCallback cb, void* user_data) {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
grpc_core::New<InterceptTrailingFactory>(cb, user_data)));
}
} // namespace grpc_core

@ -0,0 +1,34 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H
#define GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H
namespace grpc_core {
typedef void (*InterceptRecvTrailingMetadataCallback)(void*);
// Registers an LB policy called "intercept_trailing_metadata_lb" that
// invokes cb with argument user_data when trailing metadata is received
// for each call.
void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
InterceptRecvTrailingMetadataCallback cb, void* user_data);
} // namespace grpc_core
#endif // GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H

@ -60,8 +60,8 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/forwarding_load_balancing_policy.h"
#include "test/core/util/test_config.h"
#include "test/core/util/test_lb_policies.h"
#include "test/cpp/end2end/test_service_impl.h"
#include <gtest/gtest.h>
@ -1237,112 +1237,25 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
protected:
void SetUp() override {
ClientLbEnd2endTest::SetUp();
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
grpc_core::New<InterceptTrailingFactory>(this)));
grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
ReportTrailerIntercepted, this);
}
void TearDown() override { ClientLbEnd2endTest::TearDown(); }
class InterceptRecvTrailingMetadataLoadBalancingPolicy
: public grpc_core::ForwardingLoadBalancingPolicy {
public:
InterceptRecvTrailingMetadataLoadBalancingPolicy(
const Args& args, const std::string& delegate_lb_policy_name,
ClientLbInterceptTrailingMetadataTest* test)
: grpc_core::ForwardingLoadBalancingPolicy(args,
delegate_lb_policy_name),
test_(test) {}
~InterceptRecvTrailingMetadataLoadBalancingPolicy() override = default;
bool PickLocked(PickState* pick, grpc_error** error) override {
bool ret = ForwardingLoadBalancingPolicy::PickLocked(pick, error);
// Note: This assumes that the delegate policy does not
// intercepting recv_trailing_metadata. If we ever need to use
// this with a delegate policy that does, then we'll need to
// handle async pick returns separately.
new TrailingMetadataHandler(pick, test_); // deletes itself
return ret;
}
private:
class TrailingMetadataHandler {
public:
TrailingMetadataHandler(PickState* pick,
ClientLbInterceptTrailingMetadataTest* test)
: test_(test) {
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecordRecvTrailingMetadata, this,
grpc_schedule_on_exec_ctx);
pick->recv_trailing_metadata_ready = &recv_trailing_metadata_ready_;
pick->original_recv_trailing_metadata_ready =
&original_recv_trailing_metadata_ready_;
pick->recv_trailing_metadata = &recv_trailing_metadata_;
}
private:
static void RecordRecvTrailingMetadata(void* arg, grpc_error* err) {
TrailingMetadataHandler* self =
static_cast<TrailingMetadataHandler*>(arg);
GPR_ASSERT(self->recv_trailing_metadata_ != nullptr);
// a simple check to make sure the trailing metadata is valid
GPR_ASSERT(
grpc_get_status_code_from_metadata(
self->recv_trailing_metadata_->idx.named.grpc_status->md) ==
grpc_status_code::GRPC_STATUS_OK);
self->test_->ReportTrailerIntercepted();
GRPC_CLOSURE_SCHED(self->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(err));
delete self;
}
ClientLbInterceptTrailingMetadataTest* test_;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
};
ClientLbInterceptTrailingMetadataTest* test_;
};
// A factory for a test LB policy that intercepts trailing metadata.
// The LB policy is implemented as a wrapper around a delegate LB policy.
class InterceptTrailingFactory
: public grpc_core::LoadBalancingPolicyFactory {
public:
explicit InterceptTrailingFactory(
ClientLbInterceptTrailingMetadataTest* test)
: test_(test) {}
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>
CreateLoadBalancingPolicy(
const grpc_core::LoadBalancingPolicy::Args& args) const override {
return grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>(
grpc_core::New<InterceptRecvTrailingMetadataLoadBalancingPolicy>(
args, /*delegate_lb_policy_name=*/ "pick_first", test_));
}
const char* name() const override {
return "intercept_trailing_metadata_lb";
}
private:
ClientLbInterceptTrailingMetadataTest* test_;
};
void ReportTrailerIntercepted() {
std::unique_lock<std::mutex> lock(mu_);
trailers_intercepted_++;
}
int trailers_intercepted() {
std::unique_lock<std::mutex> lock(mu_);
return trailers_intercepted_;
}
private:
static void ReportTrailerIntercepted(void* arg) {
ClientLbInterceptTrailingMetadataTest* self =
static_cast<ClientLbInterceptTrailingMetadataTest*>(arg);
std::unique_lock<std::mutex> lock(self->mu_);
self->trailers_intercepted_++;
}
std::mutex mu_;
int trailers_intercepted_ = 0;
};

@ -10459,7 +10459,6 @@
"test/core/end2end/fixtures/proxy.h",
"test/core/iomgr/endpoint_tests.h",
"test/core/util/debugger_macros.h",
"test/core/util/forwarding_load_balancing_policy.h",
"test/core/util/fuzzer_util.h",
"test/core/util/grpc_profiler.h",
"test/core/util/histogram.h",
@ -10472,6 +10471,7 @@
"test/core/util/slice_splitter.h",
"test/core/util/subprocess.h",
"test/core/util/test_config.h",
"test/core/util/test_lb_policies.h",
"test/core/util/tracer_util.h",
"test/core/util/trickle_endpoint.h"
],
@ -10493,8 +10493,6 @@
"test/core/iomgr/endpoint_tests.h",
"test/core/util/debugger_macros.cc",
"test/core/util/debugger_macros.h",
"test/core/util/forwarding_load_balancing_policy.cc",
"test/core/util/forwarding_load_balancing_policy.h",
"test/core/util/fuzzer_util.cc",
"test/core/util/fuzzer_util.h",
"test/core/util/grpc_profiler.cc",
@ -10521,6 +10519,8 @@
"test/core/util/subprocess_windows.cc",
"test/core/util/test_config.cc",
"test/core/util/test_config.h",
"test/core/util/test_lb_policies.cc",
"test/core/util/test_lb_policies.h",
"test/core/util/tracer_util.cc",
"test/core/util/tracer_util.h",
"test/core/util/trickle_endpoint.cc",

Loading…
Cancel
Save