Merge pull request #19624 from hcaseyal/channelz_sockets

Track channelz server sockets in the server node
pull/19653/head
hcaseyal 6 years ago committed by GitHub
commit 592b840e10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      src/core/lib/channel/channelz.cc
  2. 11
      src/core/lib/channel/channelz.h
  3. 32
      src/core/lib/surface/server.cc
  4. 8
      src/core/lib/surface/server.h

@ -309,31 +309,42 @@ ServerNode::ServerNode(grpc_server* server, size_t channel_tracer_max_nodes)
ServerNode::~ServerNode() {} ServerNode::~ServerNode() {}
void ServerNode::AddChildSocket(RefCountedPtr<SocketNode> node) {
MutexLock lock(&child_mu_);
child_sockets_.insert(MakePair(node->uuid(), std::move(node)));
}
void ServerNode::RemoveChildSocket(intptr_t child_uuid) {
MutexLock lock(&child_mu_);
child_sockets_.erase(child_uuid);
}
char* ServerNode::RenderServerSockets(intptr_t start_socket_id, char* ServerNode::RenderServerSockets(intptr_t start_socket_id,
intptr_t max_results) { intptr_t max_results) {
// if user does not set max_results, we choose 500. // If user does not set max_results, we choose 500.
size_t pagination_limit = max_results == 0 ? 500 : max_results; size_t pagination_limit = max_results == 0 ? 500 : max_results;
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json; grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr; grpc_json* json_iterator = nullptr;
ChildSocketsList socket_refs; MutexLock lock(&child_mu_);
grpc_server_populate_server_sockets(server_, &socket_refs, start_socket_id); size_t sockets_rendered = 0;
// declared early so it can be used outside of the loop. if (!child_sockets_.empty()) {
size_t i = 0; // Create list of socket refs
if (!socket_refs.empty()) {
// create list of socket refs
grpc_json* array_parent = grpc_json_create_child( grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false); nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false);
for (i = 0; i < GPR_MIN(socket_refs.size(), pagination_limit); ++i) { const size_t limit = GPR_MIN(child_sockets_.size(), pagination_limit);
for (auto it = child_sockets_.lower_bound(start_socket_id);
it != child_sockets_.end() && sockets_rendered < limit;
++it, ++sockets_rendered) {
grpc_json* socket_ref_json = grpc_json_create_child( grpc_json* socket_ref_json = grpc_json_create_child(
nullptr, array_parent, nullptr, nullptr, GRPC_JSON_OBJECT, false); nullptr, array_parent, nullptr, nullptr, GRPC_JSON_OBJECT, false);
json_iterator = grpc_json_add_number_string_child( json_iterator = grpc_json_add_number_string_child(
socket_ref_json, nullptr, "socketId", socket_refs[i]->uuid()); socket_ref_json, nullptr, "socketId", it->first);
grpc_json_create_child(json_iterator, socket_ref_json, "name", grpc_json_create_child(json_iterator, socket_ref_json, "name",
socket_refs[i]->remote(), GRPC_JSON_STRING, false); it->second->remote(), GRPC_JSON_STRING, false);
} }
} }
if (i == socket_refs.size()) { if (sockets_rendered == child_sockets_.size()) {
json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
GRPC_JSON_TRUE, false); GRPC_JSON_TRUE, false);
} }

@ -64,7 +64,6 @@ intptr_t GetParentUuidFromArgs(const grpc_channel_args& args);
typedef InlinedVector<intptr_t, 10> ChildRefsList; typedef InlinedVector<intptr_t, 10> ChildRefsList;
class SocketNode; class SocketNode;
typedef InlinedVector<RefCountedPtr<SocketNode>, 10> ChildSocketsList;
namespace testing { namespace testing {
class CallCountingHelperPeer; class CallCountingHelperPeer;
@ -207,12 +206,16 @@ class ChannelNode : public BaseNode {
class ServerNode : public BaseNode { class ServerNode : public BaseNode {
public: public:
ServerNode(grpc_server* server, size_t channel_tracer_max_nodes); ServerNode(grpc_server* server, size_t channel_tracer_max_nodes);
~ServerNode() override; ~ServerNode() override;
grpc_json* RenderJson() override; grpc_json* RenderJson() override;
char* RenderServerSockets(intptr_t start_socket_id, char* RenderServerSockets(intptr_t start_socket_id, intptr_t max_results);
intptr_t pagination_limit);
void AddChildSocket(RefCountedPtr<SocketNode>);
void RemoveChildSocket(intptr_t child_uuid);
// proxy methods to composed classes. // proxy methods to composed classes.
void AddTraceEvent(ChannelTrace::Severity severity, const grpc_slice& data) { void AddTraceEvent(ChannelTrace::Severity severity, const grpc_slice& data) {
@ -232,6 +235,8 @@ class ServerNode : public BaseNode {
grpc_server* server_; grpc_server* server_;
CallCountingHelper call_counter_; CallCountingHelper call_counter_;
ChannelTrace trace_; ChannelTrace trace_;
Mutex child_mu_; // Guards child map below.
Map<intptr_t, RefCountedPtr<SocketNode>> child_sockets_;
}; };
// Handles channelz bookkeeping for sockets // Handles channelz bookkeeping for sockets

@ -31,6 +31,7 @@
#include <utility> #include <utility>
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/mpscq.h" #include "src/core/lib/gpr/mpscq.h"
@ -111,7 +112,7 @@ struct channel_data {
uint32_t registered_method_max_probes; uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure; grpc_closure finish_destroy_channel_closure;
grpc_closure channel_connectivity_changed; grpc_closure channel_connectivity_changed;
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node; intptr_t channelz_socket_uuid;
}; };
typedef struct shutdown_tag { typedef struct shutdown_tag {
@ -941,7 +942,6 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
static void destroy_channel_elem(grpc_channel_element* elem) { static void destroy_channel_elem(grpc_channel_element* elem) {
size_t i; size_t i;
channel_data* chand = static_cast<channel_data*>(elem->channel_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data);
chand->socket_node.reset();
if (chand->registered_methods) { if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) { for (i = 0; i < chand->registered_method_slots; i++) {
grpc_slice_unref_internal(chand->registered_methods[i].method); grpc_slice_unref_internal(chand->registered_methods[i].method);
@ -952,6 +952,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
gpr_free(chand->registered_methods); gpr_free(chand->registered_methods);
} }
if (chand->server) { if (chand->server) {
if (chand->server->channelz_server != nullptr &&
chand->channelz_socket_uuid != 0) {
chand->server->channelz_server->RemoveChildSocket(
chand->channelz_socket_uuid);
}
gpr_mu_lock(&chand->server->mu_global); gpr_mu_lock(&chand->server->mu_global);
chand->next->prev = chand->prev; chand->next->prev = chand->prev;
chand->prev->next = chand->next; chand->prev->next = chand->next;
@ -1144,7 +1149,8 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
void grpc_server_setup_transport( void grpc_server_setup_transport(
grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset, grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
const grpc_channel_args* args, const grpc_channel_args* args,
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node, const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
socket_node,
grpc_resource_user* resource_user) { grpc_resource_user* resource_user) {
size_t num_registered_methods; size_t num_registered_methods;
size_t alloc; size_t alloc;
@ -1166,7 +1172,12 @@ void grpc_server_setup_transport(
chand->server = s; chand->server = s;
server_ref(s); server_ref(s);
chand->channel = channel; chand->channel = channel;
chand->socket_node = std::move(socket_node); if (socket_node != nullptr) {
chand->channelz_socket_uuid = socket_node->uuid();
s->channelz_server->AddChildSocket(socket_node);
} else {
chand->channelz_socket_uuid = 0;
}
size_t cq_idx; size_t cq_idx;
for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
@ -1241,19 +1252,6 @@ void grpc_server_setup_transport(
grpc_transport_perform_op(transport, op); grpc_transport_perform_op(transport, op);
} }
void grpc_server_populate_server_sockets(
grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets,
intptr_t start_idx) {
gpr_mu_lock(&s->mu_global);
channel_data* c = nullptr;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
if (c->socket_node != nullptr && c->socket_node->uuid() >= start_idx) {
server_sockets->push_back(c->socket_node);
}
}
gpr_mu_unlock(&s->mu_global);
}
void grpc_server_populate_listen_sockets( void grpc_server_populate_listen_sockets(
grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets) { grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets) {
gpr_mu_lock(&server->mu_global); gpr_mu_lock(&server->mu_global);

@ -47,14 +47,10 @@ void grpc_server_add_listener(grpc_server* server, void* listener,
void grpc_server_setup_transport( void grpc_server_setup_transport(
grpc_server* server, grpc_transport* transport, grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset, const grpc_channel_args* args, grpc_pollset* accepting_pollset, const grpc_channel_args* args,
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> socket_node, const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
socket_node,
grpc_resource_user* resource_user = nullptr); grpc_resource_user* resource_user = nullptr);
/* fills in the uuids of all sockets used for connections on this server */
void grpc_server_populate_server_sockets(
grpc_server* server, grpc_core::channelz::ChildSocketsList* server_sockets,
intptr_t start_idx);
/* fills in the uuids of all listen sockets on this server */ /* fills in the uuids of all listen sockets on this server */
void grpc_server_populate_listen_sockets( void grpc_server_populate_listen_sockets(
grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets); grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets);

Loading…
Cancel
Save