Merge pull request #17577 from markdroth/lb_trailing_metadata

Allow LB policies to intercept recv_trailing_metadata
pull/17753/head
Mark D. Roth 6 years ago committed by GitHub
commit 10fa278660
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CMakeLists.txt
  2. 2
      Makefile
  3. 2
      build.yaml
  4. 2
      gRPC-Core.podspec
  5. 2
      grpc.gyp
  6. 29
      src/core/ext/filters/client_channel/client_channel.cc
  7. 13
      src/core/ext/filters/client_channel/lb_policy.h
  8. 1
      src/core/lib/transport/service_config.h
  9. 10
      test/core/util/BUILD
  10. 240
      test/core/util/test_lb_policies.cc
  11. 34
      test/core/util/test_lb_policies.h
  12. 1
      test/cpp/end2end/BUILD
  13. 77
      test/cpp/end2end/client_lb_end2end_test.cc
  14. 3
      tools/run_tests/generated/sources_and_headers.json

@ -1782,6 +1782,7 @@ add_library(grpc_test_util
test/core/util/subprocess_posix.cc
test/core/util/subprocess_windows.cc
test/core/util/test_config.cc
test/core/util/test_lb_policies.cc
test/core/util/tracer_util.cc
test/core/util/trickle_endpoint.cc
test/core/util/cmdline.cc
@ -2107,6 +2108,7 @@ add_library(grpc_test_util_unsecure
test/core/util/subprocess_posix.cc
test/core/util/subprocess_windows.cc
test/core/util/test_config.cc
test/core/util/test_lb_policies.cc
test/core/util/tracer_util.cc
test/core/util/trickle_endpoint.cc
test/core/util/cmdline.cc

@ -4286,6 +4286,7 @@ LIBGRPC_TEST_UTIL_SRC = \
test/core/util/subprocess_posix.cc \
test/core/util/subprocess_windows.cc \
test/core/util/test_config.cc \
test/core/util/test_lb_policies.cc \
test/core/util/tracer_util.cc \
test/core/util/trickle_endpoint.cc \
test/core/util/cmdline.cc \
@ -4598,6 +4599,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \
test/core/util/subprocess_posix.cc \
test/core/util/subprocess_windows.cc \
test/core/util/test_config.cc \
test/core/util/test_lb_policies.cc \
test/core/util/tracer_util.cc \
test/core/util/trickle_endpoint.cc \
test/core/util/cmdline.cc \

@ -923,6 +923,7 @@ filegroups:
- test/core/util/slice_splitter.h
- test/core/util/subprocess.h
- test/core/util/test_config.h
- test/core/util/test_lb_policies.h
- test/core/util/tracer_util.h
- test/core/util/trickle_endpoint.h
src:
@ -947,6 +948,7 @@ filegroups:
- test/core/util/subprocess_posix.cc
- test/core/util/subprocess_windows.cc
- test/core/util/test_config.cc
- test/core/util/test_lb_policies.cc
- test/core/util/tracer_util.cc
- test/core/util/trickle_endpoint.cc
deps:

@ -1233,6 +1233,7 @@ Pod::Spec.new do |s|
'test/core/util/subprocess_posix.cc',
'test/core/util/subprocess_windows.cc',
'test/core/util/test_config.cc',
'test/core/util/test_lb_policies.cc',
'test/core/util/tracer_util.cc',
'test/core/util/trickle_endpoint.cc',
'test/core/util/cmdline.cc',
@ -1259,6 +1260,7 @@ Pod::Spec.new do |s|
'test/core/util/slice_splitter.h',
'test/core/util/subprocess.h',
'test/core/util/test_config.h',
'test/core/util/test_lb_policies.h',
'test/core/util/tracer_util.h',
'test/core/util/trickle_endpoint.h',
'test/core/util/cmdline.h',

@ -628,6 +628,7 @@
'test/core/util/subprocess_posix.cc',
'test/core/util/subprocess_windows.cc',
'test/core/util/test_config.cc',
'test/core/util/test_lb_policies.cc',
'test/core/util/tracer_util.cc',
'test/core/util/trickle_endpoint.cc',
'test/core/util/cmdline.cc',
@ -873,6 +874,7 @@
'test/core/util/subprocess_posix.cc',
'test/core/util/subprocess_windows.cc',
'test/core/util/test_config.cc',
'test/core/util/test_lb_policies.cc',
'test/core/util/tracer_util.cc',
'test/core/util/trickle_endpoint.cc',
'test/core/util/cmdline.cc',

@ -727,6 +727,25 @@ static void free_cached_send_op_data_for_completed_batch(
}
}
//
// LB recv_trailing_metadata_ready handling
//
void maybe_inject_recv_trailing_metadata_ready_for_lb(
const grpc_core::LoadBalancingPolicy::PickState& pick,
grpc_transport_stream_op_batch* batch) {
if (pick.recv_trailing_metadata_ready != nullptr) {
*pick.original_recv_trailing_metadata_ready =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
pick.recv_trailing_metadata_ready;
if (pick.recv_trailing_metadata != nullptr) {
*pick.recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
}
}
}
//
// pending_batches management
//
@ -851,6 +870,10 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
if (batch->recv_trailing_metadata && calld->have_request) {
maybe_inject_recv_trailing_metadata_ready_for_lb(
*calld->request->pick(), batch);
}
batch->handler_private.extra_arg = calld;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
fail_pending_batch_in_call_combiner, batch,
@ -903,6 +926,10 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
if (batch->recv_trailing_metadata) {
maybe_inject_recv_trailing_metadata_ready_for_lb(
*calld->request->pick(), batch);
}
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
@ -1932,6 +1959,8 @@ static void add_retriable_recv_trailing_metadata_op(
batch_data->batch.payload->recv_trailing_metadata
.recv_trailing_metadata_ready =
&retry_state->recv_trailing_metadata_ready;
maybe_inject_recv_trailing_metadata_ready_for_lb(*calld->request->pick(),
&batch_data->batch);
}
// Helper function used to start a recv_trailing_metadata batch. This

@ -77,6 +77,19 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Closure to run when pick is complete, if not completed synchronously.
/// If null, pick will fail if a result is not available synchronously.
grpc_closure* on_complete = nullptr;
// Callback set by lb policy to be notified of trailing metadata.
// The callback must be scheduled on grpc_schedule_on_exec_ctx.
grpc_closure* recv_trailing_metadata_ready = nullptr;
// The address that will be set to point to the original
// recv_trailing_metadata_ready callback, to be invoked by the LB
// policy's recv_trailing_metadata_ready callback when complete.
// Must be non-null if recv_trailing_metadata_ready is non-null.
grpc_closure** original_recv_trailing_metadata_ready = nullptr;
// If this is not nullptr, then the client channel will point it to the
// call's trailing metadata before invoking recv_trailing_metadata_ready.
// If this is nullptr, then the callback will still be called.
// The lb does not have ownership of the metadata.
grpc_metadata_batch** recv_trailing_metadata = nullptr;
/// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
RefCountedPtr<ConnectedSubchannel> connected_subchannel;

@ -240,6 +240,7 @@ RefCountedPtr<T> ServiceConfig::MethodConfigTableLookup(
value = table.Get(wildcard_path);
grpc_slice_unref_internal(wildcard_path);
gpr_free(path_str);
if (value == nullptr) return nullptr;
}
return RefCountedPtr<T>(*value);
}

@ -154,3 +154,13 @@ sh_library(
name = "run_with_poller_sh",
srcs = ["run_with_poller.sh"],
)
grpc_cc_library(
name = "test_lb_policies",
testonly = 1,
srcs = ["test_lb_policies.cc"],
hdrs = ["test_lb_policies.h"],
deps = [
"//:grpc",
],
)

@ -0,0 +1,240 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test/core/util/test_lb_policies.h"
#include <string>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_trace_forwarding_lb(false, "forwarding_lb");
namespace {
//
// ForwardingLoadBalancingPolicy
//
// A minimal forwarding class to avoid implementing a standalone test LB.
class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
public:
ForwardingLoadBalancingPolicy(const Args& args,
const std::string& delegate_policy_name)
: LoadBalancingPolicy(args) {
delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
delegate_policy_name.c_str(), args);
grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),
interested_parties());
// Give re-resolution closure to delegate.
GRPC_CLOSURE_INIT(&on_delegate_request_reresolution_,
OnDelegateRequestReresolutionLocked, this,
grpc_combiner_scheduler(combiner()));
Ref().release(); // held by callback.
delegate_->SetReresolutionClosureLocked(&on_delegate_request_reresolution_);
}
~ForwardingLoadBalancingPolicy() override = default;
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override {
delegate_->UpdateLocked(args, lb_config);
}
bool PickLocked(PickState* pick, grpc_error** error) override {
return delegate_->PickLocked(pick, error);
}
void CancelPickLocked(PickState* pick, grpc_error* error) override {
delegate_->CancelPickLocked(pick, error);
}
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) override {
delegate_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
initial_metadata_flags_eq, error);
}
void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
grpc_closure* closure) override {
delegate_->NotifyOnStateChangeLocked(state, closure);
}
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override {
return delegate_->CheckConnectivityLocked(connectivity_error);
}
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override {
delegate_->HandOffPendingPicksLocked(new_policy);
}
void ExitIdleLocked() override { delegate_->ExitIdleLocked(); }
void ResetBackoffLocked() override { delegate_->ResetBackoffLocked(); }
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) override {
delegate_->FillChildRefsForChannelz(child_subchannels, child_channels);
}
private:
void ShutdownLocked() override {
delegate_.reset();
TryReresolutionLocked(&grpc_trace_forwarding_lb, GRPC_ERROR_CANCELLED);
}
static void OnDelegateRequestReresolutionLocked(void* arg,
grpc_error* error) {
ForwardingLoadBalancingPolicy* self =
static_cast<ForwardingLoadBalancingPolicy*>(arg);
if (error != GRPC_ERROR_NONE || self->delegate_ == nullptr) {
self->Unref();
return;
}
self->TryReresolutionLocked(&grpc_trace_forwarding_lb, GRPC_ERROR_NONE);
self->delegate_->SetReresolutionClosureLocked(
&self->on_delegate_request_reresolution_);
}
OrphanablePtr<LoadBalancingPolicy> delegate_;
grpc_closure on_delegate_request_reresolution_;
};
//
// InterceptRecvTrailingMetadataLoadBalancingPolicy
//
constexpr char kInterceptRecvTrailingMetadataLbPolicyName[] =
"intercept_trailing_metadata_lb";
class InterceptRecvTrailingMetadataLoadBalancingPolicy
: public ForwardingLoadBalancingPolicy {
public:
InterceptRecvTrailingMetadataLoadBalancingPolicy(
const Args& args, InterceptRecvTrailingMetadataCallback cb,
void* user_data)
: ForwardingLoadBalancingPolicy(args,
/*delegate_lb_policy_name=*/"pick_first"),
cb_(cb),
user_data_(user_data) {}
~InterceptRecvTrailingMetadataLoadBalancingPolicy() override = default;
const char* name() const override {
return kInterceptRecvTrailingMetadataLbPolicyName;
}
bool PickLocked(PickState* pick, grpc_error** error) override {
bool ret = ForwardingLoadBalancingPolicy::PickLocked(pick, error);
// Note: This assumes that the delegate policy does not
// intercepting recv_trailing_metadata. If we ever need to use
// this with a delegate policy that does, then we'll need to
// handle async pick returns separately.
New<TrailingMetadataHandler>(pick, cb_, user_data_); // deletes itself
return ret;
}
private:
class TrailingMetadataHandler {
public:
TrailingMetadataHandler(PickState* pick,
InterceptRecvTrailingMetadataCallback cb,
void* user_data)
: cb_(cb), user_data_(user_data) {
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecordRecvTrailingMetadata, this,
grpc_schedule_on_exec_ctx);
pick->recv_trailing_metadata_ready = &recv_trailing_metadata_ready_;
pick->original_recv_trailing_metadata_ready =
&original_recv_trailing_metadata_ready_;
pick->recv_trailing_metadata = &recv_trailing_metadata_;
}
private:
static void RecordRecvTrailingMetadata(void* arg, grpc_error* err) {
TrailingMetadataHandler* self =
static_cast<TrailingMetadataHandler*>(arg);
GPR_ASSERT(self->recv_trailing_metadata_ != nullptr);
self->cb_(self->user_data_);
GRPC_CLOSURE_SCHED(self->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(err));
Delete(self);
}
InterceptRecvTrailingMetadataCallback cb_;
void* user_data_;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
};
InterceptRecvTrailingMetadataCallback cb_;
void* user_data_;
};
class InterceptTrailingFactory : public LoadBalancingPolicyFactory {
public:
explicit InterceptTrailingFactory(InterceptRecvTrailingMetadataCallback cb,
void* user_data)
: cb_(cb), user_data_(user_data) {}
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>
CreateLoadBalancingPolicy(
const grpc_core::LoadBalancingPolicy::Args& args) const override {
return grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>(
grpc_core::New<InterceptRecvTrailingMetadataLoadBalancingPolicy>(
args, cb_, user_data_));
}
const char* name() const override {
return kInterceptRecvTrailingMetadataLbPolicyName;
}
private:
InterceptRecvTrailingMetadataCallback cb_;
void* user_data_;
};
} // namespace
void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
InterceptRecvTrailingMetadataCallback cb, void* user_data) {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
grpc_core::New<InterceptTrailingFactory>(cb, user_data)));
}
} // namespace grpc_core

@ -0,0 +1,34 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H
#define GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H
namespace grpc_core {
typedef void (*InterceptRecvTrailingMetadataCallback)(void*);
// Registers an LB policy called "intercept_trailing_metadata_lb" that
// invokes cb with argument user_data when trailing metadata is received
// for each call.
void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
InterceptRecvTrailingMetadataCallback cb, void* user_data);
} // namespace grpc_core
#endif // GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H

@ -388,6 +388,7 @@ grpc_cc_test(
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
"//test/core/util:grpc_test_util",
"//test/core/util:test_lb_policies",
"//test/cpp/util:test_util",
],
)

@ -41,6 +41,7 @@
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -52,6 +53,7 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/core/util/test_lb_policies.h"
#include "test/cpp/end2end/test_service_impl.h"
#include <gtest/gtest.h>
@ -1265,6 +1267,81 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) {
EnableDefaultHealthCheckService(false);
}
class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
protected:
void SetUp() override {
ClientLbEnd2endTest::SetUp();
grpc_core::RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
ReportTrailerIntercepted, this);
}
void TearDown() override { ClientLbEnd2endTest::TearDown(); }
int trailers_intercepted() {
std::unique_lock<std::mutex> lock(mu_);
return trailers_intercepted_;
}
private:
static void ReportTrailerIntercepted(void* arg) {
ClientLbInterceptTrailingMetadataTest* self =
static_cast<ClientLbInterceptTrailingMetadataTest*>(arg);
std::unique_lock<std::mutex> lock(self->mu_);
self->trailers_intercepted_++;
}
std::mutex mu_;
int trailers_intercepted_ = 0;
};
TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesDisabled) {
const int kNumServers = 1;
const int kNumRpcs = 10;
StartServers(kNumServers);
auto channel = BuildChannel("intercept_trailing_metadata_lb");
auto stub = BuildStub(channel);
SetNextResolution(GetServersPorts());
for (size_t i = 0; i < kNumRpcs; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
// Check LB policy name for the channel.
EXPECT_EQ("intercept_trailing_metadata_lb",
channel->GetLoadBalancingPolicyName());
EXPECT_EQ(kNumRpcs, trailers_intercepted());
}
TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
const int kNumServers = 1;
const int kNumRpcs = 10;
StartServers(kNumServers);
ChannelArguments args;
args.SetServiceConfigJSON(
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"grpc.testing.EchoTestService\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 3,\n"
" \"initialBackoff\": \"1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1.6,\n"
" \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
" }\n"
" } ]\n"
"}");
auto channel = BuildChannel("intercept_trailing_metadata_lb", args);
auto stub = BuildStub(channel);
SetNextResolution(GetServersPorts());
for (size_t i = 0; i < kNumRpcs; ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
// Check LB policy name for the channel.
EXPECT_EQ("intercept_trailing_metadata_lb",
channel->GetLoadBalancingPolicyName());
EXPECT_EQ(kNumRpcs, trailers_intercepted());
}
} // namespace
} // namespace testing
} // namespace grpc

@ -10508,6 +10508,7 @@
"test/core/util/slice_splitter.h",
"test/core/util/subprocess.h",
"test/core/util/test_config.h",
"test/core/util/test_lb_policies.h",
"test/core/util/tracer_util.h",
"test/core/util/trickle_endpoint.h"
],
@ -10555,6 +10556,8 @@
"test/core/util/subprocess_windows.cc",
"test/core/util/test_config.cc",
"test/core/util/test_config.h",
"test/core/util/test_lb_policies.cc",
"test/core/util/test_lb_policies.h",
"test/core/util/tracer_util.cc",
"test/core/util/tracer_util.h",
"test/core/util/trickle_endpoint.cc",

Loading…
Cancel
Save