Convert server config selector filter (#28877)

* server_config_selector_filter

* fix

* progress

* update

* x

* compiles!

* fixed!

* fix

* fix
pull/29299/head
Craig Tiller 3 years ago committed by GitHub
parent ec1aa1f26c
commit ee84fb9beb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 10
      src/core/ext/filters/server_config_selector/server_config_selector.cc
  3. 4
      src/core/ext/filters/server_config_selector/server_config_selector.h
  4. 278
      src/core/ext/filters/server_config_selector/server_config_selector_filter.cc
  5. 9
      test/core/server_config_selector/server_config_selector_test.cc

@ -2572,10 +2572,12 @@ grpc_cc_library(
],
language = "c++",
deps = [
"arena",
"gpr_base",
"grpc_base",
"grpc_server_config_selector",
"grpc_service_config",
"promise",
],
)

@ -54,14 +54,8 @@ grpc_arg ServerConfigSelectorProvider::MakeChannelArg() const {
const_cast<ServerConfigSelectorProvider*>(this), &kChannelArgVtable);
}
RefCountedPtr<ServerConfigSelectorProvider>
ServerConfigSelectorProvider::GetFromChannelArgs(
const grpc_channel_args& args) {
ServerConfigSelectorProvider* config_selector_provider =
grpc_channel_args_find_pointer<ServerConfigSelectorProvider>(
&args, kServerConfigSelectorProviderChannelArgName);
return config_selector_provider != nullptr ? config_selector_provider->Ref()
: nullptr;
absl::string_view ServerConfigSelectorProvider::ChannelArgName() {
return kServerConfigSelectorProviderChannelArgName;
}
} // namespace grpc_core

@ -61,9 +61,9 @@ class ServerConfigSelectorProvider
std::unique_ptr<ServerConfigSelectorWatcher> watcher) = 0;
virtual void CancelWatch() = 0;
static absl::string_view ChannelArgName();
grpc_arg MakeChannelArg() const;
static RefCountedPtr<ServerConfigSelectorProvider> GetFromChannelArgs(
const grpc_channel_args& args);
};
} // namespace grpc_core

@ -1,5 +1,3 @@
//
//
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -13,14 +11,16 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/server_config_selector/server_config_selector_filter.h"
#include "src/core/ext/filters/server_config_selector/server_config_selector.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/transport/error_utils.h"
@ -28,248 +28,116 @@ namespace grpc_core {
namespace {
class ChannelData {
class ServerConfigSelectorFilter final : public ChannelFilter {
public:
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* args);
static void Destroy(grpc_channel_element* elem);
~ServerConfigSelectorFilter() override;
ServerConfigSelectorFilter(const ServerConfigSelectorFilter&) = delete;
ServerConfigSelectorFilter& operator=(const ServerConfigSelectorFilter&) =
delete;
ServerConfigSelectorFilter(ServerConfigSelectorFilter&&) = default;
ServerConfigSelectorFilter& operator=(ServerConfigSelectorFilter&&) = default;
static absl::StatusOr<ServerConfigSelectorFilter> Create(ChannelArgs args,
ChannelFilter::Args);
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
absl::StatusOr<RefCountedPtr<ServerConfigSelector>> config_selector() {
MutexLock lock(&mu_);
return config_selector_.value();
MutexLock lock(&state_->mu);
return state_->config_selector.value();
}
private:
struct State {
Mutex mu;
absl::optional<absl::StatusOr<RefCountedPtr<ServerConfigSelector>>>
config_selector ABSL_GUARDED_BY(mu);
};
class ServerConfigSelectorWatcher
: public ServerConfigSelectorProvider::ServerConfigSelectorWatcher {
public:
explicit ServerConfigSelectorWatcher(ChannelData* chand) : chand_(chand) {}
explicit ServerConfigSelectorWatcher(std::shared_ptr<State> state)
: state_(state) {}
void OnServerConfigSelectorUpdate(
absl::StatusOr<RefCountedPtr<ServerConfigSelector>> update) override {
MutexLock lock(&chand_->mu_);
chand_->config_selector_ = std::move(update);
MutexLock lock(&state_->mu);
state_->config_selector = std::move(update);
}
private:
ChannelData* chand_;
std::shared_ptr<State> state_;
};
explicit ChannelData(RefCountedPtr<ServerConfigSelectorProvider>
server_config_selector_provider);
~ChannelData();
explicit ServerConfigSelectorFilter(
RefCountedPtr<ServerConfigSelectorProvider>
server_config_selector_provider);
RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider_;
Mutex mu_;
absl::optional<absl::StatusOr<RefCountedPtr<ServerConfigSelector>>>
config_selector_ ABSL_GUARDED_BY(mu_);
std::shared_ptr<State> state_;
};
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args);
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /* final_info */,
grpc_closure* /* then_schedule_closure */);
static void StartTransportStreamOpBatch(grpc_call_element* elem,
grpc_transport_stream_op_batch* op);
private:
CallData(grpc_call_element* elem, const grpc_call_element_args& args);
~CallData();
static void RecvInitialMetadataReady(void* user_data,
grpc_error_handle error);
static void RecvTrailingMetadataReady(void* user_data,
grpc_error_handle error);
void MaybeResumeRecvTrailingMetadataReady();
grpc_call_context_element* call_context_;
CallCombiner* call_combiner_;
ServiceConfigCallData service_config_call_data_;
// Overall error for the call
grpc_error_handle error_ = GRPC_ERROR_NONE;
// State for keeping track of recv_initial_metadata
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
// State for keeping of track of recv_trailing_metadata
grpc_closure* original_recv_trailing_metadata_ready_;
grpc_closure recv_trailing_metadata_ready_;
grpc_error_handle recv_trailing_metadata_ready_error_;
bool seen_recv_trailing_metadata_ready_ = false;
};
// ChannelData
grpc_error_handle ChannelData::Init(grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(elem->filter == &kServerConfigSelectorFilter);
RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider =
ServerConfigSelectorProvider::GetFromChannelArgs(*args->channel_args);
absl::StatusOr<ServerConfigSelectorFilter> ServerConfigSelectorFilter::Create(
ChannelArgs args, ChannelFilter::Args) {
ServerConfigSelectorProvider* server_config_selector_provider =
args.GetObject<ServerConfigSelectorProvider>();
if (server_config_selector_provider == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No ServerConfigSelectorProvider object found");
return absl::UnknownError("No ServerConfigSelectorProvider object found");
}
new (elem->channel_data)
ChannelData(std::move(server_config_selector_provider));
return GRPC_ERROR_NONE;
return ServerConfigSelectorFilter(server_config_selector_provider->Ref());
}
void ChannelData::Destroy(grpc_channel_element* elem) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
}
ChannelData::ChannelData(
ServerConfigSelectorFilter::ServerConfigSelectorFilter(
RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider)
: server_config_selector_provider_(
std::move(server_config_selector_provider)) {
std::move(server_config_selector_provider)),
state_(std::make_shared<State>()) {
GPR_ASSERT(server_config_selector_provider_ != nullptr);
auto server_config_selector_watcher =
absl::make_unique<ServerConfigSelectorWatcher>(this);
absl::make_unique<ServerConfigSelectorWatcher>(state_);
auto config_selector = server_config_selector_provider_->Watch(
std::move(server_config_selector_watcher));
MutexLock lock(&mu_);
MutexLock lock(&state_->mu);
// It's possible for the watcher to have already updated config_selector_
if (!config_selector_.has_value()) {
config_selector_ = std::move(config_selector);
}
}
ChannelData::~ChannelData() { server_config_selector_provider_->CancelWatch(); }
// CallData
grpc_error_handle CallData::Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(elem, *args);
return GRPC_ERROR_NONE;
}
void CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*then_schedule_closure*/) {
auto* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
void CallData::StartTransportStreamOpBatch(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
CallData* calld = static_cast<CallData*>(elem->call_data);
if (op->recv_initial_metadata) {
calld->recv_initial_metadata_ =
op->payload->recv_initial_metadata.recv_initial_metadata;
calld->original_recv_initial_metadata_ready_ =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready_;
}
if (op->recv_trailing_metadata) {
// We might generate errors on receiving initial metadata which we need to
// bubble up through recv_trailing_metadata_ready
calld->original_recv_trailing_metadata_ready_ =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready_;
if (!state_->config_selector.has_value()) {
state_->config_selector = std::move(config_selector);
}
// Chain to the next filter.
grpc_call_next_op(elem, op);
}
CallData::CallData(grpc_call_element* elem, const grpc_call_element_args& args)
: call_context_(args.context), call_combiner_(args.call_combiner) {
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
elem, grpc_schedule_on_exec_ctx);
}
CallData::~CallData() {
// Remove the entry from call context, just in case anyone above us
// tries to look at it during call stack destruction.
call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value = nullptr;
GRPC_ERROR_UNREF(error_);
}
void CallData::RecvInitialMetadataReady(void* user_data,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
CallData* calld = static_cast<CallData*>(elem->call_data);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (error == GRPC_ERROR_NONE) {
auto config_selector = chand->config_selector();
if (config_selector.ok()) {
auto call_config =
config_selector.value()->GetCallConfig(calld->recv_initial_metadata_);
if (call_config.error != GRPC_ERROR_NONE) {
calld->error_ = call_config.error;
} else {
calld->service_config_call_data_ =
ServiceConfigCallData(std::move(call_config.service_config),
call_config.method_configs, {});
calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value =
&calld->service_config_call_data_;
}
} else {
calld->error_ = GRPC_ERROR_CREATE_FROM_CPP_STRING(
config_selector.status().ToString());
}
if (calld->error_ != GRPC_ERROR_NONE) {
calld->error_ = grpc_error_set_int(
calld->error_, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
error = calld->error_; // Does not take a ref
}
ServerConfigSelectorFilter::~ServerConfigSelectorFilter() {
if (server_config_selector_provider_ != nullptr) {
server_config_selector_provider_->CancelWatch();
}
calld->MaybeResumeRecvTrailingMetadataReady();
grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
calld->original_recv_initial_metadata_ready_ = nullptr;
Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error));
}
void CallData::RecvTrailingMetadataReady(void* user_data,
grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
CallData* calld = static_cast<CallData*>(elem->call_data);
if (calld->original_recv_initial_metadata_ready_ != nullptr) {
calld->seen_recv_trailing_metadata_ready_ = true;
calld->recv_trailing_metadata_ready_error_ = GRPC_ERROR_REF(error);
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"Deferring RecvTrailingMetadataReady until after "
"RecvInitialMetadataReady");
return;
}
error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error_);
calld->error_ = GRPC_ERROR_NONE;
grpc_closure* closure = calld->original_recv_trailing_metadata_ready_;
calld->original_recv_trailing_metadata_ready_ = nullptr;
Closure::Run(DEBUG_LOCATION, closure, error);
}
void CallData::MaybeResumeRecvTrailingMetadataReady() {
if (seen_recv_trailing_metadata_ready_) {
seen_recv_trailing_metadata_ready_ = false;
grpc_error_handle error = recv_trailing_metadata_ready_error_;
recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE;
GRPC_CALL_COMBINER_START(call_combiner_, &recv_trailing_metadata_ready_,
error, "Continuing RecvTrailingMetadataReady");
ArenaPromise<ServerMetadataHandle> ServerConfigSelectorFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto sel = config_selector();
if (!sel.ok()) return Immediate(ServerMetadataHandle(sel.status()));
auto call_config =
sel.value()->GetCallConfig(call_args.client_initial_metadata.get());
if (call_config.error != GRPC_ERROR_NONE) {
auto r = Immediate(ServerMetadataHandle(
absl::UnavailableError(grpc_error_std_string(call_config.error))));
GRPC_ERROR_UNREF(call_config.error);
return std::move(r);
}
auto& ctx = GetContext<
grpc_call_context_element>()[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA];
ctx.value = GetContext<Arena>()->New<ServiceConfigCallData>(
std::move(call_config.service_config), call_config.method_configs,
ServiceConfigCallData::CallAttributes{});
ctx.destroy = [](void* p) {
static_cast<ServiceConfigCallData*>(p)->~ServiceConfigCallData();
};
return next_promise_factory(std::move(call_args));
}
} // namespace
const grpc_channel_filter kServerConfigSelectorFilter = {
CallData::StartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(ChannelData),
ChannelData::Init,
ChannelData::Destroy,
grpc_channel_next_get_info,
"server_config_selector_filter",
};
const grpc_channel_filter kServerConfigSelectorFilter =
MakePromiseBasedFilter<ServerConfigSelectorFilter, FilterEndpoint::kServer>(
"server_config_selector_filter");
} // namespace grpc_core

@ -52,7 +52,8 @@ TEST(ServerConfigSelectorProviderTest, CopyChannelArgs) {
grpc_arg arg = server_config_selector_provider->MakeChannelArg();
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
EXPECT_EQ(server_config_selector_provider,
ServerConfigSelectorProvider::GetFromChannelArgs(*args));
grpc_core::ChannelArgs::FromC(args)
.GetObject<ServerConfigSelectorProvider>());
grpc_channel_args_destroy(args);
}
@ -63,8 +64,10 @@ TEST(ServerConfigSelectorProviderTest, ChannelArgsCompare) {
grpc_arg arg = server_config_selector_provider->MakeChannelArg();
grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
grpc_channel_args* new_args = grpc_channel_args_copy(args);
EXPECT_EQ(ServerConfigSelectorProvider::GetFromChannelArgs(*new_args),
ServerConfigSelectorProvider::GetFromChannelArgs(*args));
EXPECT_EQ(grpc_core::ChannelArgs::FromC(new_args)
.GetObject<ServerConfigSelectorProvider>(),
grpc_core::ChannelArgs::FromC(args)
.GetObject<ServerConfigSelectorProvider>());
grpc_channel_args_destroy(args);
grpc_channel_args_destroy(new_args);
}

Loading…
Cancel
Save