Surface socket name

reviewable/pr17433/r1
ncteisen 6 years ago
parent e97c9457e2
commit d7c252c947
  1. 5
      src/core/ext/filters/client_channel/connector.h
  2. 4
      src/core/ext/filters/client_channel/subchannel.cc
  3. 4
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  4. 2
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  5. 9
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  6. 4
      src/core/ext/transport/chttp2/transport/chttp2_transport.h
  7. 10
      src/core/lib/channel/channelz.cc
  8. 5
      src/core/lib/channel/channelz.h
  9. 14
      src/core/lib/surface/server.cc
  10. 4
      src/core/lib/surface/server.h

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/transport/transport.h"
@ -48,8 +49,8 @@ typedef struct {
/** channel arguments (to be passed to the filters) */
grpc_channel_args* channel_args;
/** socket uuid of the connected transport. 0 if not available */
intptr_t socket_uuid;
/** socket node of the connected transport */
grpc_core::channelz::SocketNode* socket_node;
} grpc_connect_out_args;
struct grpc_connector_vtable {

@ -826,7 +826,9 @@ static bool publish_transport_locked(grpc_subchannel* c) {
GRPC_ERROR_UNREF(error);
return false;
}
intptr_t socket_uuid = c->connecting_result.socket_uuid;
intptr_t socket_uuid = c->connecting_result.socket_node == nullptr
? 0
: c->connecting_result.socket_node->uuid();
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
if (c->disconnected) {

@ -117,8 +117,8 @@ static void on_handshake_done(void* arg, grpc_error* error) {
c->args.interested_parties);
c->result->transport =
grpc_create_chttp2_transport(args->args, args->endpoint, true);
c->result->socket_uuid =
grpc_chttp2_transport_get_socket_uuid(c->result->transport);
c->result->socket_node =
grpc_chttp2_transport_get_socket_node(c->result->transport);
GPR_ASSERT(c->result->transport);
// TODO(roth): We ideally want to wait until we receive HTTP/2
// settings from the server before we consider the connection

@ -149,7 +149,7 @@ static void on_handshake_done(void* arg, grpc_error* error) {
grpc_server_setup_transport(
connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args,
grpc_chttp2_transport_get_socket_uuid(transport), resource_user);
grpc_chttp2_transport_get_socket_node(transport), resource_user);
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
connection_state->transport =

@ -3145,14 +3145,11 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport) {
grpc_core::channelz::SocketNode* grpc_chttp2_transport_get_socket_node(
grpc_transport* transport) {
grpc_chttp2_transport* t =
reinterpret_cast<grpc_chttp2_transport*>(transport);
if (t->channelz_socket != nullptr) {
return t->channelz_socket->uuid();
} else {
return 0;
}
return t->channelz_socket.get();
}
grpc_transport* grpc_create_chttp2_transport(

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/transport/transport.h"
@ -35,7 +36,8 @@ grpc_transport* grpc_create_chttp2_transport(
const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
grpc_resource_user* resource_user = nullptr);
intptr_t grpc_chttp2_transport_get_socket_uuid(grpc_transport* transport);
grpc_core::channelz::SocketNode* grpc_chttp2_transport_get_socket_node(
grpc_transport* transport);
/// Takes ownership of \a read_buffer, which (if non-NULL) contains
/// leftover bytes previously read from the endpoint (e.g., by handshakers).

@ -207,18 +207,20 @@ char* ServerNode::RenderServerSockets(intptr_t start_socket_id) {
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
ChildRefsList socket_refs;
ChildSocketsList socket_refs;
grpc_server_populate_server_sockets(server_, &socket_refs, start_socket_id);
if (!socket_refs.empty()) {
// create list of socket refs
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false);
for (size_t i = 0; i < socket_refs.size(); ++i) {
json_iterator =
grpc_json* socket_ref_json =
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_add_number_string_child(json_iterator, nullptr, "socketId",
socket_refs[i]);
json_iterator = grpc_json_add_number_string_child(
socket_ref_json, nullptr, "socketId", socket_refs[i]->uuid());
grpc_json_create_child(json_iterator, socket_ref_json, "name",
socket_refs[i]->remote(), GRPC_JSON_STRING, false);
}
}
// For now we do not have any pagination rules. In the future we could

@ -59,6 +59,9 @@ namespace channelz {
// add human readable names as in the channelz.proto
typedef InlinedVector<intptr_t, 10> ChildRefsList;
class SocketNode;
typedef InlinedVector<SocketNode*, 10> ChildSocketsList;
namespace testing {
class CallCountingHelperPeer;
class ChannelNodePeer;
@ -251,6 +254,8 @@ class SocketNode : public BaseNode {
gpr_atm_no_barrier_fetch_add(&keepalives_sent_, static_cast<gpr_atm>(1));
}
const char* remote() { return remote_.get(); }
private:
gpr_atm streams_started_ = 0;
gpr_atm streams_succeeded_ = 0;

@ -109,7 +109,7 @@ struct channel_data {
uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure;
grpc_closure channel_connectivity_changed;
intptr_t socket_uuid;
grpc_core::channelz::SocketNode* socket_node;
};
typedef struct shutdown_tag {
@ -1158,7 +1158,7 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
intptr_t socket_uuid,
grpc_core::channelz::SocketNode* socket_node,
grpc_resource_user* resource_user) {
size_t num_registered_methods;
size_t alloc;
@ -1180,7 +1180,7 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
chand->server = s;
server_ref(s);
chand->channel = channel;
chand->socket_uuid = socket_uuid;
chand->socket_node = socket_node;
size_t cq_idx;
for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
@ -1256,14 +1256,14 @@ void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
}
void grpc_server_populate_server_sockets(
grpc_server* s, grpc_core::channelz::ChildRefsList* server_sockets,
grpc_server* s, grpc_core::channelz::ChildSocketsList* server_sockets,
intptr_t start_idx) {
gpr_mu_lock(&s->mu_global);
channel_data* c = nullptr;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
intptr_t socket_uuid = c->socket_uuid;
if (socket_uuid >= start_idx) {
server_sockets->push_back(socket_uuid);
grpc_core::channelz::SocketNode* socket_node = c->socket_node;
if (socket_node && socket_node->uuid() >= start_idx) {
server_sockets->push_back(socket_node);
}
}
gpr_mu_unlock(&s->mu_global);

@ -47,12 +47,12 @@ void grpc_server_add_listener(grpc_server* server, void* listener,
void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
intptr_t socket_uuid,
grpc_core::channelz::SocketNode* socket_node,
grpc_resource_user* resource_user = nullptr);
/* fills in the uuids of all sockets used for connections on this server */
void grpc_server_populate_server_sockets(
grpc_server* server, grpc_core::channelz::ChildRefsList* server_sockets,
grpc_server* server, grpc_core::channelz::ChildSocketsList* server_sockets,
intptr_t start_idx);
/* fills in the uuids of all listen sockets on this server */

Loading…
Cancel
Save