grpc_server_config_fetcher: Cleanup StartWatch - remove unnecessary args (#27663)

* grpc_server_config_fetcher: Cleanup StartWatch - remove unnecessary args

* Reviewer comments

* Avoid a channel args copy copy for the non server config fetcher case

* Fix args destruction
pull/27698/head
Yash Tibrewal 3 years ago committed by GitHub
parent c89d92225f
commit 8907c891b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  2. 10
      src/core/ext/xds/xds_server_config_fetcher.cc
  3. 1
      src/core/lib/surface/server.h

@ -221,10 +221,10 @@ class Chttp2ServerListener : public Server::ListenerInterface {
grpc_resolved_address resolved_address_; grpc_resolved_address resolved_address_;
Chttp2ServerArgsModifier const args_modifier_; Chttp2ServerArgsModifier const args_modifier_;
ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
Mutex channel_args_mu_; grpc_channel_args* args_;
grpc_channel_args* args_ ABSL_GUARDED_BY(channel_args_mu_); Mutex connection_manager_mu_;
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager> RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager_ ABSL_GUARDED_BY(channel_args_mu_); connection_manager_ ABSL_GUARDED_BY(connection_manager_mu_);
Mutex mu_; Mutex mu_;
// Signals whether grpc_tcp_server_start() has been called. // Signals whether grpc_tcp_server_start() has been called.
bool started_ ABSL_GUARDED_BY(mu_) = false; bool started_ ABSL_GUARDED_BY(mu_) = false;
@ -252,7 +252,7 @@ void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager> RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager_to_destroy; connection_manager_to_destroy;
{ {
MutexLock lock(&listener_->channel_args_mu_); MutexLock lock(&listener_->connection_manager_mu_);
connection_manager_to_destroy = listener_->connection_manager_; connection_manager_to_destroy = listener_->connection_manager_;
listener_->connection_manager_ = std::move(connection_manager); listener_->connection_manager_ = std::move(connection_manager);
} }
@ -693,16 +693,10 @@ Chttp2ServerListener::~Chttp2ServerListener() {
void Chttp2ServerListener::Start( void Chttp2ServerListener::Start(
Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) { Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
if (server_->config_fetcher() != nullptr) { if (server_->config_fetcher() != nullptr) {
grpc_channel_args* args = nullptr;
auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref()); auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
config_fetcher_watcher_ = watcher.get(); config_fetcher_watcher_ = watcher.get();
{
MutexLock lock(&channel_args_mu_);
args = grpc_channel_args_copy(args_);
}
server_->config_fetcher()->StartWatch( server_->config_fetcher()->StartWatch(
grpc_sockaddr_to_string(&resolved_address_, false), args, grpc_sockaddr_to_string(&resolved_address_, false), std::move(watcher));
std::move(watcher));
} else { } else {
{ {
MutexLock lock(&mu_); MutexLock lock(&mu_);
@ -726,12 +720,12 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset, grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) { grpc_tcp_server_acceptor* acceptor) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg); Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
grpc_channel_args* args = nullptr; grpc_channel_args* args = self->args_;
grpc_channel_args* args_to_destroy = nullptr;
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager> RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager; connection_manager;
{ {
MutexLock lock(&self->channel_args_mu_); MutexLock lock(&self->connection_manager_mu_);
args = grpc_channel_args_copy(self->args_);
connection_manager = self->connection_manager_; connection_manager = self->connection_manager_;
} }
auto endpoint_cleanup = [&](grpc_error_handle error) { auto endpoint_cleanup = [&](grpc_error_handle error) {
@ -744,11 +738,12 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No ConnectionManager configured. Closing connection."); "No ConnectionManager configured. Closing connection.");
endpoint_cleanup(error); endpoint_cleanup(error);
grpc_channel_args_destroy(args);
return; return;
} }
// TODO(yashykt): Maybe combine the following two arg modifiers into a // TODO(yashykt): Maybe combine the following two arg modifiers into a
// single one. // single one.
// Make a copy of the args so as to avoid destroying the original.
args = grpc_channel_args_copy(args);
absl::StatusOr<grpc_channel_args*> args_result = absl::StatusOr<grpc_channel_args*> args_result =
connection_manager->UpdateChannelArgsForConnection(args, tcp); connection_manager->UpdateChannelArgsForConnection(args, tcp);
if (!args_result.ok()) { if (!args_result.ok()) {
@ -767,6 +762,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args);
return; return;
} }
args_to_destroy = args;
} }
grpc_resource_user* channel_resource_user = grpc_resource_user_create( grpc_resource_user* channel_resource_user = grpc_resource_user_create(
self->resource_quota_, self->resource_quota_,
@ -804,7 +800,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
} else { } else {
connection_ref->Start(std::move(listener_ref), tcp, args); connection_ref->Start(std::move(listener_ref), tcp, args);
} }
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args_to_destroy);
} }
void Chttp2ServerListener::TcpServerShutdownComplete(void* arg, void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,

@ -353,12 +353,12 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
GPR_ASSERT(xds_client_ != nullptr); GPR_ASSERT(xds_client_ != nullptr);
} }
void StartWatch(std::string listening_address, grpc_channel_args* args, void StartWatch(std::string listening_address,
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
watcher) override { watcher) override {
grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get(); grpc_server_config_fetcher::WatcherInterface* watcher_ptr = watcher.get();
auto listener_watcher = absl::make_unique<ListenerWatcher>( auto listener_watcher = absl::make_unique<ListenerWatcher>(
std::move(watcher), args, xds_client_, serving_status_notifier_, std::move(watcher), xds_client_, serving_status_notifier_,
listening_address); listening_address);
auto* listener_watcher_ptr = listener_watcher.get(); auto* listener_watcher_ptr = listener_watcher.get();
listening_address = absl::StrReplaceAll( listening_address = absl::StrReplaceAll(
@ -396,17 +396,14 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
explicit ListenerWatcher( explicit ListenerWatcher(
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
server_config_watcher, server_config_watcher,
grpc_channel_args* args, RefCountedPtr<XdsClient> xds_client, RefCountedPtr<XdsClient> xds_client,
grpc_server_xds_status_notifier serving_status_notifier, grpc_server_xds_status_notifier serving_status_notifier,
std::string listening_address) std::string listening_address)
: server_config_watcher_(std::move(server_config_watcher)), : server_config_watcher_(std::move(server_config_watcher)),
args_(args),
xds_client_(std::move(xds_client)), xds_client_(std::move(xds_client)),
serving_status_notifier_(serving_status_notifier), serving_status_notifier_(serving_status_notifier),
listening_address_(std::move(listening_address)) {} listening_address_(std::move(listening_address)) {}
~ListenerWatcher() override { grpc_channel_args_destroy(args_); }
// Deleted due to special handling required for args_. Copy the channel args // Deleted due to special handling required for args_. Copy the channel args
// if we ever need these. // if we ever need these.
ListenerWatcher(const ListenerWatcher&) = delete; ListenerWatcher(const ListenerWatcher&) = delete;
@ -498,7 +495,6 @@ class XdsServerConfigFetcher : public grpc_server_config_fetcher {
private: private:
std::unique_ptr<grpc_server_config_fetcher::WatcherInterface> std::unique_ptr<grpc_server_config_fetcher::WatcherInterface>
server_config_watcher_; server_config_watcher_;
grpc_channel_args* args_;
RefCountedPtr<XdsClient> xds_client_; RefCountedPtr<XdsClient> xds_client_;
grpc_server_xds_status_notifier serving_status_notifier_; grpc_server_xds_status_notifier serving_status_notifier_;
std::string listening_address_; std::string listening_address_;

@ -481,7 +481,6 @@ struct grpc_server_config_fetcher {
// Ownership of \a args is transferred. // Ownership of \a args is transferred.
virtual void StartWatch(std::string listening_address, virtual void StartWatch(std::string listening_address,
grpc_channel_args* args,
std::unique_ptr<WatcherInterface> watcher) = 0; std::unique_ptr<WatcherInterface> watcher) = 0;
virtual void CancelWatch(WatcherInterface* watcher) = 0; virtual void CancelWatch(WatcherInterface* watcher) = 0;
virtual grpc_pollset_set* interested_parties() = 0; virtual grpc_pollset_set* interested_parties() = 0;

Loading…
Cancel
Save