Track channelz server sockets in the server node

pull/19624/head
Hope Casey-Allen 5 years ago
parent 79f2ad8353
commit 225a878e9f
  1. 33
      src/core/lib/channel/channelz.cc
  2. 11
      src/core/lib/channel/channelz.h
  3. 32
      src/core/lib/surface/server.cc
  4. 8
      src/core/lib/surface/server.h

@ -309,31 +309,42 @@ ServerNode::ServerNode(grpc_server* server, size_t channel_tracer_max_nodes)
ServerNode::~ServerNode() {}
void ServerNode::AddChildSocket(RefCountedPtr<SocketNode> node) {
MutexLock lock(&child_mu_);
child_sockets_.insert(MakePair(node->uuid(), std::move(node)));
}
void ServerNode::RemoveChildSocket(intptr_t child_uuid) {
MutexLock lock(&child_mu_);
child_sockets_.erase(child_uuid);
}
char* ServerNode::RenderServerSockets(intptr_t start_socket_id,
intptr_t max_results) {
// if user does not set max_results, we choose 500.
// If user does not set max_results, we choose 500.
size_t pagination_limit = max_results == 0 ? 500 : max_results;
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
ChildSocketsList socket_refs;
grpc_server_populate_server_sockets(server_, &socket_refs, start_socket_id);
// declared early so it can be used outside of the loop.
size_t i = 0;
if (!socket_refs.empty()) {
// create list of socket refs
MutexLock lock(&child_mu_);
size_t sockets_rendered = 0;
if (!child_sockets_.empty()) {
// Create list of socket refs
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false);
for (i = 0; i < GPR_MIN(socket_refs.size(), pagination_limit); ++i) {
const size_t limit = GPR_MIN(child_sockets_.size(), pagination_limit);
for (auto it = child_sockets_.lower_bound(start_socket_id);
it != child_sockets_.end() && sockets_rendered < limit;
++it, ++sockets_rendered) {
grpc_json* socket_ref_json = grpc_json_create_child(
nullptr, array_parent, nullptr, nullptr, GRPC_JSON_OBJECT, false);
json_iterator = grpc_json_add_number_string_child(
socket_ref_json, nullptr, "socketId", socket_refs[i]->uuid());
socket_ref_json, nullptr, "socketId", it->first);
grpc_json_create_child(json_iterator, socket_ref_json, "name",
socket_refs[i]->remote(), GRPC_JSON_STRING, false);
it->second->remote(), GRPC_JSON_STRING, false);
}
}
if (i == socket_refs.size()) {
if (sockets_rendered == child_sockets_.size()) {
json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
GRPC_JSON_TRUE, false);
}

@ -64,7 +64,6 @@ intptr_t GetParentUuidFromArgs(const grpc_channel_args& args);
typedef InlinedVector<intptr_t, 10> ChildRefsList;
class SocketNode;
typedef InlinedVector<RefCountedPtr<SocketNode>, 10> ChildSocketsList;
namespace testing {
class CallCountingHelperPeer;
@ -207,12 +206,16 @@ class ChannelNode : public BaseNode {
class ServerNode : public BaseNode {
public:
ServerNode(grpc_server* server, size_t channel_tracer_max_nodes);
~ServerNode() override;
grpc_json* RenderJson() override;
char* RenderServerSockets(intptr_t start_socket_id,
intptr_t pagination_limit);
char* RenderServerSockets(intptr_t start_socket_id, intptr_t max_results);
void AddChildSocket(RefCountedPtr<SocketNode>);
void RemoveChildSocket(intptr_t child_uuid);
// proxy methods to composed classes.
void AddTraceEvent(ChannelTrace::Severity severity, const grpc_slice& data) {
@ -232,6 +235,8 @@ class ServerNode : public BaseNode {
grpc_server* server_;
CallCountingHelper call_counter_;
ChannelTrace trace_;
Mutex child_mu_; // Guards child map below.
Map<intptr_t, RefCountedPtr<SocketNode>> child_sockets_;
};
// Handles channelz bookkeeping for sockets

@ -31,6 +31,7 @@
#include <utility>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/mpscq.h"
@ -111,7 +112,7 @@ struct channel_data {
uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure;
grpc_closure channel_connectivity_changed;
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node;
intptr_t channelz_socket_uuid;
};
typedef struct shutdown_tag {
@ -941,7 +942,6 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
static void destroy_channel_elem(grpc_channel_element* elem) {
size_t i;
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
chand->socket_node.reset();
if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) {
grpc_slice_unref_internal(chand->registered_methods[i].method);
@ -952,6 +952,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
gpr_free(chand->registered_methods);
}
if (chand->server) {
if (chand->server->channelz_server != nullptr &&
chand->channelz_socket_uuid != 0) {
chand->server->channelz_server->RemoveChildSocket(
chand->channelz_socket_uuid);
}
gpr_mu_lock(&chand->server->mu_global);
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
@ -1144,7 +1149,8 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
void grpc_server_setup_transport(
grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node,
const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
socket_node,
grpc_resource_user* resource_user) {
size_t num_registered_methods;
size_t alloc;
@ -1166,7 +1172,12 @@ void grpc_server_setup_transport(
chand->server = s;
server_ref(s);
chand->channel = channel;
chand->socket_node = std::move(socket_node);
if (socket_node != nullptr) {
chand->channelz_socket_uuid = socket_node->uuid();
s->channelz_server->AddChildSocket(socket_node);
} else {
chand->channelz_socket_uuid = 0;
}
size_t cq_idx;
for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
@ -1241,19 +1252,6 @@ void grpc_server_setup_transport(
grpc_transport_perform_op(transport, op);
}
void grpc_server_populate_server_sockets(
grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets,
intptr_t start_idx) {
gpr_mu_lock(&s->mu_global);
channel_data* c = nullptr;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
if (c->socket_node != nullptr && c->socket_node->uuid() >= start_idx) {
server_sockets->push_back(c->socket_node);
}
}
gpr_mu_unlock(&s->mu_global);
}
void grpc_server_populate_listen_sockets(
grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets) {
gpr_mu_lock(&server->mu_global);

@ -47,14 +47,10 @@ void grpc_server_add_listener(grpc_server* server, void* listener,
void grpc_server_setup_transport(
grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset, const grpc_channel_args* args,
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node,
const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
socket_node,
grpc_resource_user* resource_user = nullptr);
/* fills in the uuids of all sockets used for connections on this server */
void grpc_server_populate_server_sockets(
grpc_server* server, grpc_core::channelz::ChildSocketsList* server_sockets,
intptr_t start_idx);
/* fills in the uuids of all listen sockets on this server */
void grpc_server_populate_listen_sockets(
grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets);

Loading…
Cancel
Save