Merge pull request #16725 from ncteisen/channelz-cpp

Channelz++ Part 4: Socket support
pull/16729/head
Noah Eisen 6 years ago committed by GitHub
commit a9f8bcce11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      src/cpp/server/channelz/channelz_service.cc
  2. 4
      src/cpp/server/channelz/channelz_service.h
  3. 152
      test/cpp/end2end/channelz_service_test.cc

@ -92,4 +92,21 @@ Status ChannelzService::GetSubchannel(
return Status::OK;
}
Status ChannelzService::GetSocket(ServerContext* unused,
const channelz::v1::GetSocketRequest* request,
channelz::v1::GetSocketResponse* response) {
char* json_str = grpc_channelz_get_socket(request->socket_id());
gpr_log(GPR_ERROR, "%s", json_str);
if (json_str == nullptr) {
return Status(NOT_FOUND, "No object found for that SocketId");
}
google::protobuf::util::Status s =
google::protobuf::util::JsonStringToMessage(json_str, response);
gpr_free(json_str);
if (s != google::protobuf::util::Status::OK) {
return Status(INTERNAL, s.ToString());
}
return Status::OK;
}
} // namespace grpc

@ -44,6 +44,10 @@ class ChannelzService final : public channelz::v1::Channelz::Service {
Status GetSubchannel(ServerContext* unused,
const channelz::v1::GetSubchannelRequest* request,
channelz::v1::GetSubchannelResponse* response) override;
// implementation of GetSocket rpc
Status GetSocket(ServerContext* unused,
const channelz::v1::GetSocketRequest* request,
channelz::v1::GetSocketResponse* response) override;
};
} // namespace grpc

@ -43,6 +43,8 @@ using grpc::channelz::v1::GetChannelRequest;
using grpc::channelz::v1::GetChannelResponse;
using grpc::channelz::v1::GetServersRequest;
using grpc::channelz::v1::GetServersResponse;
using grpc::channelz::v1::GetSocketRequest;
using grpc::channelz::v1::GetSocketResponse;
using grpc::channelz::v1::GetSubchannelRequest;
using grpc::channelz::v1::GetSubchannelResponse;
using grpc::channelz::v1::GetTopChannelsRequest;
@ -71,6 +73,26 @@ class Proxy : public ::grpc::testing::EchoTestService::Service {
return stubs_[idx]->Echo(client_context.get(), *request, response);
}
Status BidiStream(ServerContext* server_context,
ServerReaderWriter<EchoResponse, EchoRequest>*
stream_from_client) override {
EchoRequest request;
EchoResponse response;
std::unique_ptr<ClientContext> client_context =
ClientContext::FromServerContext(*server_context);
// always use the first proxy for streaming
auto stream_to_backend = stubs_[0]->BidiStream(client_context.get());
while (stream_from_client->Read(&request)) {
stream_to_backend->Write(request);
stream_to_backend->Read(&response);
stream_from_client->Write(response);
}
stream_to_backend->WritesDone();
return stream_to_backend->Finish();
}
private:
std::vector<std::unique_ptr<::grpc::testing::EchoTestService::Stub>> stubs_;
};
@ -149,6 +171,21 @@ class ChannelzServerTest : public ::testing::Test {
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
}
void SendSuccessfulStream(int num_messages) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello channelz");
ClientContext context;
auto stream_to_proxy = echo_stub_->BidiStream(&context);
for (int i = 0; i < num_messages; ++i) {
EXPECT_TRUE(stream_to_proxy->Write(request));
EXPECT_TRUE(stream_to_proxy->Read(&response));
}
stream_to_proxy->WritesDone();
Status s = stream_to_proxy->Finish();
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
}
void SendFailedEcho(int channel_idx) {
EchoRequest request;
EchoResponse response;
@ -448,6 +485,121 @@ TEST_F(ChannelzServerTest, ServerCallTest) {
kNumSuccess + kNumFailed + 1);
}
TEST_F(ChannelzServerTest, ManySubchannelsAndSockets) {
ResetStubs();
const int kNumChannels = 4;
ConfigureProxy(kNumChannels);
const int kNumSuccess = 10;
const int kNumFailed = 11;
for (int i = 0; i < kNumSuccess; ++i) {
SendSuccessfulEcho(0);
SendSuccessfulEcho(2);
}
for (int i = 0; i < kNumFailed; ++i) {
SendFailedEcho(1);
SendFailedEcho(2);
}
GetTopChannelsRequest gtc_request;
GetTopChannelsResponse gtc_response;
gtc_request.set_start_channel_id(0);
ClientContext context;
Status s =
channelz_stub_->GetTopChannels(&context, gtc_request, &gtc_response);
EXPECT_TRUE(s.ok()) << s.error_message();
EXPECT_EQ(gtc_response.channel_size(), kNumChannels);
for (int i = 0; i < gtc_response.channel_size(); ++i) {
// if the channel sent no RPCs, then expect no subchannels to have been
// created.
if (gtc_response.channel(i).data().calls_started() == 0) {
EXPECT_EQ(gtc_response.channel(i).subchannel_ref_size(), 0);
continue;
}
// The resolver must return at least one address.
ASSERT_GT(gtc_response.channel(i).subchannel_ref_size(), 0);
// First grab the subchannel
GetSubchannelRequest get_subchannel_req;
GetSubchannelResponse get_subchannel_resp;
get_subchannel_req.set_subchannel_id(
gtc_response.channel(i).subchannel_ref(0).subchannel_id());
ClientContext get_subchannel_ctx;
Status s = channelz_stub_->GetSubchannel(
&get_subchannel_ctx, get_subchannel_req, &get_subchannel_resp);
EXPECT_TRUE(s.ok()) << s.error_message();
EXPECT_EQ(get_subchannel_resp.subchannel().socket_ref_size(), 1);
// Now grab the socket.
GetSocketRequest get_socket_req;
GetSocketResponse get_socket_resp;
ClientContext get_socket_ctx;
get_socket_req.set_socket_id(
get_subchannel_resp.subchannel().socket_ref(0).socket_id());
s = channelz_stub_->GetSocket(&get_socket_ctx, get_socket_req,
&get_socket_resp);
EXPECT_TRUE(s.ok()) << s.error_message();
// calls started == streams started AND stream succeeded. Since none of
// these RPCs were canceled, all of the streams will succeeded even though
// the RPCs they represent might have failed.
EXPECT_EQ(get_subchannel_resp.subchannel().data().calls_started(),
get_socket_resp.socket().data().streams_started());
EXPECT_EQ(get_subchannel_resp.subchannel().data().calls_started(),
get_socket_resp.socket().data().streams_succeeded());
// All of the calls were unary, so calls started == messages sent.
EXPECT_EQ(get_subchannel_resp.subchannel().data().calls_started(),
get_socket_resp.socket().data().messages_sent());
// We only get responses when the RPC was successful, so
// calls succeeded == messages received.
EXPECT_EQ(get_subchannel_resp.subchannel().data().calls_succeeded(),
get_socket_resp.socket().data().messages_received());
}
}
TEST_F(ChannelzServerTest, StreamingRPC) {
ResetStubs();
ConfigureProxy(1);
const int kNumMessages = 5;
SendSuccessfulStream(kNumMessages);
// Get the channel
GetChannelRequest get_channel_request;
GetChannelResponse get_channel_response;
get_channel_request.set_channel_id(GetChannelId(0));
ClientContext get_channel_context;
Status s = channelz_stub_->GetChannel(
&get_channel_context, get_channel_request, &get_channel_response);
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
EXPECT_EQ(get_channel_response.channel().data().calls_started(), 1);
EXPECT_EQ(get_channel_response.channel().data().calls_succeeded(), 1);
EXPECT_EQ(get_channel_response.channel().data().calls_failed(), 0);
// Get the subchannel
ASSERT_GT(get_channel_response.channel().subchannel_ref_size(), 0);
GetSubchannelRequest get_subchannel_request;
GetSubchannelResponse get_subchannel_response;
ClientContext get_subchannel_context;
get_subchannel_request.set_subchannel_id(
get_channel_response.channel().subchannel_ref(0).subchannel_id());
s = channelz_stub_->GetSubchannel(&get_subchannel_context,
get_subchannel_request,
&get_subchannel_response);
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
EXPECT_EQ(get_subchannel_response.subchannel().data().calls_started(), 1);
EXPECT_EQ(get_subchannel_response.subchannel().data().calls_succeeded(), 1);
EXPECT_EQ(get_subchannel_response.subchannel().data().calls_failed(), 0);
// Get the socket
ASSERT_GT(get_subchannel_response.subchannel().socket_ref_size(), 0);
GetSocketRequest get_socket_request;
GetSocketResponse get_socket_response;
ClientContext get_socket_context;
get_socket_request.set_socket_id(
get_subchannel_response.subchannel().socket_ref(0).socket_id());
s = channelz_stub_->GetSocket(&get_socket_context, get_socket_request,
&get_socket_response);
EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
EXPECT_EQ(get_socket_response.socket().data().streams_started(), 1);
EXPECT_EQ(get_socket_response.socket().data().streams_succeeded(), 1);
EXPECT_EQ(get_socket_response.socket().data().streams_failed(), 0);
EXPECT_EQ(get_socket_response.socket().data().messages_sent(), kNumMessages);
EXPECT_EQ(get_socket_response.socket().data().messages_received(),
kNumMessages);
}
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save