From 225a878e9f358de573c4e05f119bf106476ed19c Mon Sep 17 00:00:00 2001 From: Hope Casey-Allen Date: Thu, 11 Jul 2019 18:51:17 -0700 Subject: [PATCH] Track channelz server sockets in the server node --- src/core/lib/channel/channelz.cc | 33 +++++++++++++++++++++----------- src/core/lib/channel/channelz.h | 11 ++++++++--- src/core/lib/surface/server.cc | 32 +++++++++++++++---------------- src/core/lib/surface/server.h | 8 ++------ 4 files changed, 47 insertions(+), 37 deletions(-) diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 184ba17889d..b9f65870cfe 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -309,31 +309,42 @@ ServerNode::ServerNode(grpc_server* server, size_t channel_tracer_max_nodes) ServerNode::~ServerNode() {} +void ServerNode::AddChildSocket(RefCountedPtr node) { + MutexLock lock(&child_mu_); + child_sockets_.insert(MakePair(node->uuid(), std::move(node))); +} + +void ServerNode::RemoveChildSocket(intptr_t child_uuid) { + MutexLock lock(&child_mu_); + child_sockets_.erase(child_uuid); +} + char* ServerNode::RenderServerSockets(intptr_t start_socket_id, intptr_t max_results) { - // if user does not set max_results, we choose 500. + // If user does not set max_results, we choose 500. size_t pagination_limit = max_results == 0 ? 500 : max_results; grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* json = top_level_json; grpc_json* json_iterator = nullptr; - ChildSocketsList socket_refs; - grpc_server_populate_server_sockets(server_, &socket_refs, start_socket_id); - // declared early so it can be used outside of the loop. - size_t i = 0; - if (!socket_refs.empty()) { - // create list of socket refs + MutexLock lock(&child_mu_); + size_t sockets_rendered = 0; + if (!child_sockets_.empty()) { + // Create list of socket refs grpc_json* array_parent = grpc_json_create_child( nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false); - for (i = 0; i < GPR_MIN(socket_refs.size(), pagination_limit); ++i) { + const size_t limit = GPR_MIN(child_sockets_.size(), pagination_limit); + for (auto it = child_sockets_.lower_bound(start_socket_id); + it != child_sockets_.end() && sockets_rendered < limit; + ++it, ++sockets_rendered) { grpc_json* socket_ref_json = grpc_json_create_child( nullptr, array_parent, nullptr, nullptr, GRPC_JSON_OBJECT, false); json_iterator = grpc_json_add_number_string_child( - socket_ref_json, nullptr, "socketId", socket_refs[i]->uuid()); + socket_ref_json, nullptr, "socketId", it->first); grpc_json_create_child(json_iterator, socket_ref_json, "name", - socket_refs[i]->remote(), GRPC_JSON_STRING, false); + it->second->remote(), GRPC_JSON_STRING, false); } } - if (i == socket_refs.size()) { + if (sockets_rendered == child_sockets_.size()) { json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, GRPC_JSON_TRUE, false); } diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index d26896262ec..a02cb82cc75 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -64,7 +64,6 @@ intptr_t GetParentUuidFromArgs(const grpc_channel_args& args); typedef InlinedVector ChildRefsList; class SocketNode; -typedef InlinedVector, 10> ChildSocketsList; namespace testing { class CallCountingHelperPeer; @@ -207,12 +206,16 @@ class ChannelNode : public BaseNode { class ServerNode : public BaseNode { public: ServerNode(grpc_server* server, size_t channel_tracer_max_nodes); + ~ServerNode() override; grpc_json* RenderJson() override; - char* RenderServerSockets(intptr_t start_socket_id, - intptr_t pagination_limit); + char* RenderServerSockets(intptr_t start_socket_id, intptr_t max_results); + + void AddChildSocket(RefCountedPtr); + + void RemoveChildSocket(intptr_t child_uuid); // proxy methods to composed classes. void AddTraceEvent(ChannelTrace::Severity severity, const grpc_slice& data) { @@ -232,6 +235,8 @@ class ServerNode : public BaseNode { grpc_server* server_; CallCountingHelper call_counter_; ChannelTrace trace_; + Mutex child_mu_; // Guards child map below. + Map> child_sockets_; }; // Handles channelz bookkeeping for sockets diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 6cd86003f01..a1c7d132ade 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -31,6 +31,7 @@ #include #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/mpscq.h" @@ -111,7 +112,7 @@ struct channel_data { uint32_t registered_method_max_probes; grpc_closure finish_destroy_channel_closure; grpc_closure channel_connectivity_changed; - grpc_core::RefCountedPtr socket_node; + intptr_t channelz_socket_uuid; }; typedef struct shutdown_tag { @@ -941,7 +942,6 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, static void destroy_channel_elem(grpc_channel_element* elem) { size_t i; channel_data* chand = static_cast(elem->channel_data); - chand->socket_node.reset(); if (chand->registered_methods) { for (i = 0; i < chand->registered_method_slots; i++) { grpc_slice_unref_internal(chand->registered_methods[i].method); @@ -952,6 +952,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) { gpr_free(chand->registered_methods); } if (chand->server) { + if (chand->server->channelz_server != nullptr && + chand->channelz_socket_uuid != 0) { + chand->server->channelz_server->RemoveChildSocket( + chand->channelz_socket_uuid); + } gpr_mu_lock(&chand->server->mu_global); chand->next->prev = chand->prev; chand->prev->next = chand->next; @@ -1144,7 +1149,8 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets, void grpc_server_setup_transport( grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args, - grpc_core::RefCountedPtr socket_node, + const grpc_core::RefCountedPtr& + socket_node, grpc_resource_user* resource_user) { size_t num_registered_methods; size_t alloc; @@ -1166,7 +1172,12 @@ void grpc_server_setup_transport( chand->server = s; server_ref(s); chand->channel = channel; - chand->socket_node = std::move(socket_node); + if (socket_node != nullptr) { + chand->channelz_socket_uuid = socket_node->uuid(); + s->channelz_server->AddChildSocket(socket_node); + } else { + chand->channelz_socket_uuid = 0; + } size_t cq_idx; for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { @@ -1241,19 +1252,6 @@ void grpc_server_setup_transport( grpc_transport_perform_op(transport, op); } -void grpc_server_populate_server_sockets( - grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets, - intptr_t start_idx) { - gpr_mu_lock(&s->mu_global); - channel_data* c = nullptr; - for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { - if (c->socket_node != nullptr && c->socket_node->uuid() >= start_idx) { - server_sockets->push_back(c->socket_node); - } - } - gpr_mu_unlock(&s->mu_global); -} - void grpc_server_populate_listen_sockets( grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets) { gpr_mu_lock(&server->mu_global); diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 393bb242148..926a5825006 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -47,14 +47,10 @@ void grpc_server_add_listener(grpc_server* server, void* listener, void grpc_server_setup_transport( grpc_server* server, grpc_transport* transport, grpc_pollset* accepting_pollset, const grpc_channel_args* args, - grpc_core::RefCountedPtr socket_node, + const grpc_core::RefCountedPtr& + socket_node, grpc_resource_user* resource_user = nullptr); -/* fills in the uuids of all sockets used for connections on this server */ -void grpc_server_populate_server_sockets( - grpc_server* server, grpc_core::channelz::ChildSocketsList* server_sockets, - intptr_t start_idx); - /* fills in the uuids of all listen sockets on this server */ void grpc_server_populate_listen_sockets( grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets);