Merge pull request #17456 from ncteisen/socket-pagination

Channelz: Add Pagination to ServerSockets
reviewable/pr17291/r8
Noah Eisen 6 years ago committed by GitHub
commit 9e9cae7839
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      include/grpc/grpc.h
  2. 23
      src/core/lib/channel/channelz.cc
  3. 3
      src/core/lib/channel/channelz.h
  4. 5
      src/core/lib/channel/channelz_registry.cc
  5. 4
      src/cpp/server/channelz/channelz_service.cc
  6. 36
      src/proto/grpc/channelz/channelz.proto
  7. 8
      src/python/grpcio/grpc/_cython/_cygrpc/channelz.pyx.pxi
  8. 3
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  9. 3
      src/python/grpcio_channelz/grpc_channelz/v1/channelz.py
  10. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  11. 2
      test/core/end2end/tests/channelz.cc
  12. 82
      test/cpp/end2end/channelz_service_test.cc

@ -511,7 +511,8 @@ GRPCAPI char* grpc_channelz_get_server(intptr_t server_id);
/* Gets all server sockets that exist in the server. */
GRPCAPI char* grpc_channelz_get_server_sockets(intptr_t server_id,
intptr_t start_socket_id);
intptr_t start_socket_id,
intptr_t max_results);
/* Returns a single Channel, or else a NOT_FOUND code. The returned string
is allocated and must be freed by the application. */

@ -203,31 +203,34 @@ ServerNode::ServerNode(grpc_server* server, size_t channel_tracer_max_nodes)
ServerNode::~ServerNode() {}
char* ServerNode::RenderServerSockets(intptr_t start_socket_id) {
char* ServerNode::RenderServerSockets(intptr_t start_socket_id,
intptr_t max_results) {
// if user does not set max_results, we choose 500.
size_t pagination_limit = max_results == 0 ? 500 : max_results;
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr;
ChildSocketsList socket_refs;
grpc_server_populate_server_sockets(server_, &socket_refs, start_socket_id);
// declared early so it can be used outside of the loop.
size_t i = 0;
if (!socket_refs.empty()) {
// create list of socket refs
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false);
for (size_t i = 0; i < socket_refs.size(); ++i) {
grpc_json* socket_ref_json =
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
GRPC_JSON_OBJECT, false);
for (i = 0; i < GPR_MIN(socket_refs.size(), pagination_limit); ++i) {
grpc_json* socket_ref_json = grpc_json_create_child(
nullptr, array_parent, nullptr, nullptr, GRPC_JSON_OBJECT, false);
json_iterator = grpc_json_add_number_string_child(
socket_ref_json, nullptr, "socketId", socket_refs[i]->uuid());
grpc_json_create_child(json_iterator, socket_ref_json, "name",
socket_refs[i]->remote(), GRPC_JSON_STRING, false);
}
}
// For now we do not have any pagination rules. In the future we could
// pick a constant for max_channels_sent for a GetServers request.
// Tracking: https://github.com/grpc/grpc/issues/16019.
json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
GRPC_JSON_TRUE, false);
if (i == socket_refs.size()) {
json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr,
GRPC_JSON_TRUE, false);
}
char* json_str = grpc_json_dump_to_string(top_level_json, 0);
grpc_json_destroy(top_level_json);
return json_str;

@ -210,7 +210,8 @@ class ServerNode : public BaseNode {
grpc_json* RenderJson() override;
char* RenderServerSockets(intptr_t start_socket_id);
char* RenderServerSockets(intptr_t start_socket_id,
intptr_t pagination_limit);
// proxy methods to composed classes.
void AddTraceEvent(ChannelTrace::Severity severity, grpc_slice data) {

@ -252,7 +252,8 @@ char* grpc_channelz_get_server(intptr_t server_id) {
}
char* grpc_channelz_get_server_sockets(intptr_t server_id,
intptr_t start_socket_id) {
intptr_t start_socket_id,
intptr_t max_results) {
grpc_core::channelz::BaseNode* base_node =
grpc_core::channelz::ChannelzRegistry::Get(server_id);
if (base_node == nullptr ||
@ -263,7 +264,7 @@ char* grpc_channelz_get_server_sockets(intptr_t server_id,
// actually a server node
grpc_core::channelz::ServerNode* server_node =
static_cast<grpc_core::channelz::ServerNode*>(base_node);
return server_node->RenderServerSockets(start_socket_id);
return server_node->RenderServerSockets(start_socket_id, max_results);
}
char* grpc_channelz_get_channel(intptr_t channel_id) {

@ -79,8 +79,8 @@ Status ChannelzService::GetServer(ServerContext* unused,
Status ChannelzService::GetServerSockets(
ServerContext* unused, const channelz::v1::GetServerSocketsRequest* request,
channelz::v1::GetServerSocketsResponse* response) {
char* json_str = grpc_channelz_get_server_sockets(request->server_id(),
request->start_socket_id());
char* json_str = grpc_channelz_get_server_sockets(
request->server_id(), request->start_socket_id(), request->max_results());
if (json_str == nullptr) {
return Status(StatusCode::INTERNAL,
"grpc_channelz_get_server_sockets returned null");

@ -177,6 +177,7 @@ message SubchannelRef {
// SocketRef is a reference to a Socket.
message SocketRef {
// The globally unique id for this socket. Must be a positive number.
int64 socket_id = 3;
// An optional name associated with the socket.
string name = 4;
@ -288,7 +289,8 @@ message SocketData {
// include stream level or TCP level flow control info.
google.protobuf.Int64Value remote_flow_control_window = 12;
// Socket options set on this socket. May be absent.
// Socket options set on this socket. May be absent if 'summary' is set
// on GetSocketRequest.
repeated SocketOption option = 13;
}
@ -439,12 +441,21 @@ service Channelz {
message GetTopChannelsRequest {
// start_channel_id indicates that only channels at or above this id should be
// included in the results.
// To request the first page, this should be set to 0. To request
// subsequent pages, the client generates this value by adding 1 to
// the highest seen result ID.
int64 start_channel_id = 1;
// If non-zero, the server will return a page of results containing
// at most this many items. If zero, the server will choose a
// reasonable page size. Must never be negative.
int64 max_results = 2;
}
message GetTopChannelsResponse {
// list of channels that the connection detail service knows about. Sorted in
// ascending channel_id order.
// Must contain at least 1 result, otherwise 'end' must be true.
repeated Channel channel = 1;
// If set, indicates that the list of channels is the final list. Requesting
// more channels can only return more if they are created after this RPC
@ -455,12 +466,21 @@ message GetTopChannelsResponse {
message GetServersRequest {
// start_server_id indicates that only servers at or above this id should be
// included in the results.
// To request the first page, this must be set to 0. To request
// subsequent pages, the client generates this value by adding 1 to
// the highest seen result ID.
int64 start_server_id = 1;
// If non-zero, the server will return a page of results containing
// at most this many items. If zero, the server will choose a
// reasonable page size. Must never be negative.
int64 max_results = 2;
}
message GetServersResponse {
// list of servers that the connection detail service knows about. Sorted in
// ascending server_id order.
// Must contain at least 1 result, otherwise 'end' must be true.
repeated Server server = 1;
// If set, indicates that the list of servers is the final list. Requesting
// more servers will only return more if they are created after this RPC
@ -483,12 +503,21 @@ message GetServerSocketsRequest {
int64 server_id = 1;
// start_socket_id indicates that only sockets at or above this id should be
// included in the results.
// To request the first page, this must be set to 0. To request
// subsequent pages, the client generates this value by adding 1 to
// the highest seen result ID.
int64 start_socket_id = 2;
// If non-zero, the server will return a page of results containing
// at most this many items. If zero, the server will choose a
// reasonable page size. Must never be negative.
int64 max_results = 3;
}
message GetServerSocketsResponse {
// list of socket refs that the connection detail service knows about. Sorted in
// ascending socket_id order.
// Must contain at least 1 result, otherwise 'end' must be true.
repeated SocketRef socket_ref = 1;
// If set, indicates that the list of sockets is the final list. Requesting
// more sockets will only return more if they are created after this RPC
@ -521,6 +550,11 @@ message GetSubchannelResponse {
message GetSocketRequest {
// socket_id is the identifier of the specific socket to get.
int64 socket_id = 1;
// If true, the response will contain only high level information
// that is inexpensive to obtain. Fields thay may be omitted are
// documented.
bool summary = 2;
}
message GetSocketResponse {

@ -36,15 +36,17 @@ def channelz_get_server(server_id):
' server_id==%s is valid' % server_id)
return c_returned_str
def channelz_get_server_sockets(server_id, start_socket_id):
def channelz_get_server_sockets(server_id, start_socket_id, max_results):
cdef char *c_returned_str = grpc_channelz_get_server_sockets(
server_id,
start_socket_id,
max_results,
)
if c_returned_str == NULL:
raise ValueError('Failed to get server sockets, please ensure your' \
' server_id==%s and start_socket_id==%s is valid' %
(server_id, start_socket_id))
' server_id==%s and start_socket_id==%s and' \
' max_results==%s is valid' %
(server_id, start_socket_id, max_results))
return c_returned_str
def channelz_get_channel(channel_id):

@ -389,7 +389,8 @@ cdef extern from "grpc/grpc.h":
char* grpc_channelz_get_servers(intptr_t start_server_id)
char* grpc_channelz_get_server(intptr_t server_id)
char* grpc_channelz_get_server_sockets(intptr_t server_id,
intptr_t start_socket_id)
intptr_t start_socket_id,
intptr_t max_results)
char* grpc_channelz_get_channel(intptr_t channel_id)
char* grpc_channelz_get_subchannel(intptr_t subchannel_id)
char* grpc_channelz_get_socket(intptr_t socket_id)

@ -66,7 +66,8 @@ class ChannelzServicer(_channelz_pb2_grpc.ChannelzServicer):
try:
return json_format.Parse(
cygrpc.channelz_get_server_sockets(request.server_id,
request.start_socket_id),
request.start_socket_id,
request.max_results),
_channelz_pb2.GetServerSocketsResponse(),
)
except ValueError as e:

@ -272,7 +272,7 @@ extern grpc_channelz_get_servers_type grpc_channelz_get_servers_import;
typedef char*(*grpc_channelz_get_server_type)(intptr_t server_id);
extern grpc_channelz_get_server_type grpc_channelz_get_server_import;
#define grpc_channelz_get_server grpc_channelz_get_server_import
typedef char*(*grpc_channelz_get_server_sockets_type)(intptr_t server_id, intptr_t start_socket_id);
typedef char*(*grpc_channelz_get_server_sockets_type)(intptr_t server_id, intptr_t start_socket_id, intptr_t max_results);
extern grpc_channelz_get_server_sockets_type grpc_channelz_get_server_sockets_import;
#define grpc_channelz_get_server_sockets grpc_channelz_get_server_sockets_import
typedef char*(*grpc_channelz_get_channel_type)(intptr_t channel_id);

@ -259,7 +259,7 @@ static void test_channelz(grpc_end2end_test_config config) {
GPR_ASSERT(nullptr == strstr(json, "\"severity\":\"CT_INFO\""));
gpr_free(json);
json = channelz_server->RenderServerSockets(0);
json = channelz_server->RenderServerSockets(0, 100);
GPR_ASSERT(nullptr != strstr(json, "\"end\":true"));
gpr_free(json);

@ -54,6 +54,14 @@ using grpc::channelz::v1::GetSubchannelResponse;
using grpc::channelz::v1::GetTopChannelsRequest;
using grpc::channelz::v1::GetTopChannelsResponse;
// This code snippet can be used to print out any responses for
// visual debugging.
//
//
// string out_str;
// google::protobuf::TextFormat::PrintToString(resp, &out_str);
// std::cout << "resp: " << out_str << "\n";
namespace grpc {
namespace testing {
namespace {
@ -164,6 +172,19 @@ class ChannelzServerTest : public ::testing::Test {
echo_stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
std::unique_ptr<grpc::testing::EchoTestService::Stub> NewEchoStub() {
static int salt = 0;
string target = "dns:localhost:" + to_string(proxy_port_);
ChannelArguments args;
// disable channelz. We only want to focus on proxy to backend outbound.
args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 0);
// This ensures that gRPC will not do connection sharing.
args.SetInt("salt", salt++);
std::shared_ptr<Channel> channel =
CreateCustomChannel(target, InsecureChannelCredentials(), args);
return grpc::testing::EchoTestService::NewStub(channel);
}
void SendSuccessfulEcho(int channel_idx) {
EchoRequest request;
EchoResponse response;
@ -651,6 +672,67 @@ TEST_F(ChannelzServerTest, GetServerSocketsTest) {
EXPECT_EQ(get_server_sockets_response.socket_ref_size(), 1);
}
TEST_F(ChannelzServerTest, GetServerSocketsPaginationTest) {
ResetStubs();
ConfigureProxy(1);
std::vector<std::unique_ptr<grpc::testing::EchoTestService::Stub>> stubs;
const int kNumServerSocketsCreated = 20;
for (int i = 0; i < kNumServerSocketsCreated; ++i) {
stubs.push_back(NewEchoStub());
EchoRequest request;
EchoResponse response;
request.set_message("Hello channelz");
request.mutable_param()->set_backend_channel_idx(0);
ClientContext context;
Status s = stubs.back()->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
}
GetServersRequest get_server_request;
GetServersResponse get_server_response;
get_server_request.set_start_server_id(0);
ClientContext get_server_context;
Status s = channelz_stub_->GetServers(&get_server_context, get_server_request,
&get_server_response);
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
EXPECT_EQ(get_server_response.server_size(), 1);
// Make a request that gets all of the serversockets
{
GetServerSocketsRequest get_server_sockets_request;
GetServerSocketsResponse get_server_sockets_response;
get_server_sockets_request.set_server_id(
get_server_response.server(0).ref().server_id());
get_server_sockets_request.set_start_socket_id(0);
ClientContext get_server_sockets_context;
s = channelz_stub_->GetServerSockets(&get_server_sockets_context,
get_server_sockets_request,
&get_server_sockets_response);
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
// We add one to account the the channelz stub that will end up creating
// a serversocket.
EXPECT_EQ(get_server_sockets_response.socket_ref_size(),
kNumServerSocketsCreated + 1);
EXPECT_TRUE(get_server_sockets_response.end());
}
// Now we make a request that exercises pagination.
{
GetServerSocketsRequest get_server_sockets_request;
GetServerSocketsResponse get_server_sockets_response;
get_server_sockets_request.set_server_id(
get_server_response.server(0).ref().server_id());
get_server_sockets_request.set_start_socket_id(0);
const int kMaxResults = 10;
get_server_sockets_request.set_max_results(kMaxResults);
ClientContext get_server_sockets_context;
s = channelz_stub_->GetServerSockets(&get_server_sockets_context,
get_server_sockets_request,
&get_server_sockets_response);
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
EXPECT_EQ(get_server_sockets_response.socket_ref_size(), kMaxResults);
EXPECT_FALSE(get_server_sockets_response.end());
}
}
TEST_F(ChannelzServerTest, GetServerListenSocketsTest) {
ResetStubs();
ConfigureProxy(1);

Loading…
Cancel
Save