Reviewer feedback

reviewable/pr15343/r6
ncteisen 7 years ago
parent c845ba66f3
commit d23739eda3
  1. 4
      include/grpc/grpc.h
  2. 1
      src/core/lib/channel/channel_trace.h
  3. 23
      src/core/lib/channel/channelz.cc
  4. 15
      src/core/lib/channel/channelz.h
  5. 30
      src/core/lib/surface/call.cc
  6. 10
      src/core/lib/surface/channel.cc
  7. 3
      src/core/lib/surface/channel.h
  8. 7
      test/core/channel/channelz_test.cc
  9. 6
      test/core/end2end/tests/simple_request.cc

@ -286,10 +286,6 @@ GRPCAPI grpc_channel* grpc_lame_client_channel_create(
/** Close and destroy a grpc channel */ /** Close and destroy a grpc channel */
GRPCAPI void grpc_channel_destroy(grpc_channel* channel); GRPCAPI void grpc_channel_destroy(grpc_channel* channel);
/** Returns the JSON formatted channel trace for this channel. The caller
owns the returned string and is responsible for freeing it. */
GRPCAPI char* grpc_channel_get_trace(grpc_channel* channel);
/** Returns the channel uuid, which can be used to look up its trace at a /** Returns the channel uuid, which can be used to look up its trace at a
later time. */ later time. */
GRPCAPI intptr_t grpc_channel_get_uuid(grpc_channel* channel); GRPCAPI intptr_t grpc_channel_get_uuid(grpc_channel* channel);

@ -22,7 +22,6 @@
#include <grpc/impl/codegen/port_platform.h> #include <grpc/impl/codegen/port_platform.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
// #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"

@ -42,9 +42,10 @@
namespace grpc_core { namespace grpc_core {
namespace channelz { namespace channelz {
// TODO(ncteisen): more this functions to a loc where it can be used
namespace { namespace {
// TODO(ncteisen): move this function to a common helper location.
//
// returns an allocated string that represents tm according to RFC-3339, and, // returns an allocated string that represents tm according to RFC-3339, and,
// more specifically, follows: // more specifically, follows:
// https://developers.google.com/protocol-buffers/docs/proto3#json // https://developers.google.com/protocol-buffers/docs/proto3#json
@ -89,14 +90,12 @@ grpc_json* add_num_str(grpc_json* parent, grpc_json* it, const char* name,
} // namespace } // namespace
Channel::Channel(grpc_channel* channel, size_t channel_tracer_max_nodes) Channel::Channel(grpc_channel* channel, size_t channel_tracer_max_nodes)
: channel_(channel) { : channel_(channel), target_(UniquePtr<char>(grpc_channel_get_target(channel_))), channel_uuid_(ChannelzRegistry::Register(this)) {
trace_.Init(channel_tracer_max_nodes); trace_.Init(channel_tracer_max_nodes);
target_ = grpc_channel_get_target(channel_); gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now());
channel_uuid_ = ChannelzRegistry::Register(this);
} }
Channel::~Channel() { Channel::~Channel() {
gpr_free(const_cast<char*>(target_));
trace_.Destroy(); trace_.Destroy();
ChannelzRegistry::Unregister(channel_uuid_); ChannelzRegistry::Unregister(channel_uuid_);
} }
@ -107,7 +106,7 @@ void Channel::RecordCallStarted() {
} }
grpc_connectivity_state Channel::GetConnectivityState() { grpc_connectivity_state Channel::GetConnectivityState() {
if (channel_destroyed_) { if (channel_ == nullptr) {
return GRPC_CHANNEL_SHUTDOWN; return GRPC_CHANNEL_SHUTDOWN;
} else { } else {
return grpc_channel_check_connectivity_state(channel_, false); return grpc_channel_check_connectivity_state(channel_, false);
@ -119,25 +118,20 @@ char* Channel::RenderJSON() {
grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT);
grpc_json* json = top_level_json; grpc_json* json = top_level_json;
grpc_json* json_iterator = nullptr; grpc_json* json_iterator = nullptr;
// create and fill the ref child // create and fill the ref child
json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr,
GRPC_JSON_OBJECT, false); GRPC_JSON_OBJECT, false);
json = json_iterator; json = json_iterator;
json_iterator = nullptr; json_iterator = nullptr;
json_iterator = add_num_str(json, json_iterator, "channelId", channel_uuid_); json_iterator = add_num_str(json, json_iterator, "channelId", channel_uuid_);
// reset json iterators to top level object // reset json iterators to top level object
json = top_level_json; json = top_level_json;
json_iterator = nullptr; json_iterator = nullptr;
// create and fill the data child. // create and fill the data child.
grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr,
GRPC_JSON_OBJECT, false); GRPC_JSON_OBJECT, false);
json = data; json = data;
json_iterator = nullptr; json_iterator = nullptr;
// create and fill the connectivity state child. // create and fill the connectivity state child.
grpc_connectivity_state connectivity_state = GetConnectivityState(); grpc_connectivity_state connectivity_state = GetConnectivityState();
json_iterator = grpc_json_create_child(json_iterator, json, "state", nullptr, json_iterator = grpc_json_create_child(json_iterator, json, "state", nullptr,
@ -146,12 +140,10 @@ char* Channel::RenderJSON() {
grpc_json_create_child(nullptr, json, "state", grpc_json_create_child(nullptr, json, "state",
grpc_connectivity_state_name(connectivity_state), grpc_connectivity_state_name(connectivity_state),
GRPC_JSON_STRING, false); GRPC_JSON_STRING, false);
// reset the parent to be the data object. // reset the parent to be the data object.
json = data; json = data;
json_iterator = grpc_json_create_child(json_iterator, json, "target", target_, json_iterator = grpc_json_create_child(json_iterator, json, "target", target_.get(),
GRPC_JSON_STRING, false); GRPC_JSON_STRING, false);
// fill in the channel trace if applicable // fill in the channel trace if applicable
grpc_json* trace = trace_->RenderJSON(); grpc_json* trace = trace_->RenderJSON();
if (trace != nullptr) { if (trace != nullptr) {
@ -163,11 +155,9 @@ char* Channel::RenderJSON() {
trace->key = "trace"; trace->key = "trace";
trace->owns_value = false; trace->owns_value = false;
} }
// reset the parent to be the data object. // reset the parent to be the data object.
json = data; json = data;
json_iterator = nullptr; json_iterator = nullptr;
// We use -1 as sentinel values since proto default value for integers is // We use -1 as sentinel values since proto default value for integers is
// zero, and the confuses the parser into thinking the value weren't present // zero, and the confuses the parser into thinking the value weren't present
json_iterator = add_num_str(json, json_iterator, "callsStarted", json_iterator = add_num_str(json, json_iterator, "callsStarted",
@ -180,7 +170,6 @@ char* Channel::RenderJSON() {
json_iterator = grpc_json_create_child( json_iterator = grpc_json_create_child(
json_iterator, json, "lastCallStartedTimestamp", json_iterator, json, "lastCallStartedTimestamp",
fmt_time(ts), GRPC_JSON_STRING, true); fmt_time(ts), GRPC_JSON_STRING, true);
// render and return the over json object // render and return the over json object
char* json_str = grpc_json_dump_to_string(top_level_json, 0); char* json_str = grpc_json_dump_to_string(top_level_json, 0);
grpc_json_destroy(top_level_json); grpc_json_destroy(top_level_json);

@ -56,8 +56,8 @@ class Channel : public RefCounted<Channel> {
ChannelTrace* trace() { return trace_.get(); } ChannelTrace* trace() { return trace_.get(); }
void set_channel_destroyed() { void set_channel_destroyed() {
GPR_ASSERT(!channel_destroyed_); GPR_ASSERT(channel_ != nullptr);
channel_destroyed_ = true; channel_ = nullptr;
} }
intptr_t channel_uuid() { return channel_uuid_; } intptr_t channel_uuid() { return channel_uuid_; }
@ -66,17 +66,18 @@ class Channel : public RefCounted<Channel> {
// testing peer friend. // testing peer friend.
friend class testing::ChannelPeer; friend class testing::ChannelPeer;
bool channel_destroyed_ = false; // helper for getting connectivity state.
grpc_connectivity_state GetConnectivityState();
// Not owned. Will be set to nullptr when the channel is destroyed.
grpc_channel* channel_; grpc_channel* channel_;
const char* target_; UniquePtr<char> target_;
gpr_atm calls_started_ = 0; gpr_atm calls_started_ = 0;
gpr_atm calls_succeeded_ = 0; gpr_atm calls_succeeded_ = 0;
gpr_atm calls_failed_ = 0; gpr_atm calls_failed_ = 0;
gpr_atm last_call_started_millis_; gpr_atm last_call_started_millis_;
intptr_t channel_uuid_; const intptr_t channel_uuid_;
ManualConstructor<ChannelTrace> trace_; ManualConstructor<ChannelTrace> trace_;
grpc_connectivity_state GetConnectivityState();
}; };
} // namespace channelz } // namespace channelz

@ -478,6 +478,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
&call->pollent); &call->pollent);
} }
grpc_core::channelz::Channel* channelz_channel =
grpc_channel_get_channelz_channel_node(call->channel);
channelz_channel->RecordCallStarted();
grpc_slice_unref_internal(path); grpc_slice_unref_internal(path);
return error; return error;
@ -1078,18 +1082,7 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
grpc_status_code status_code = grpc_status_code status_code =
grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md); grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
grpc_core::channelz::Channel* channelz_channel = if (status_code != GRPC_STATUS_OK) {
call->channel != nullptr
? grpc_channel_get_channelz_channel(call->channel)
: nullptr;
if (status_code == GRPC_STATUS_OK) {
if (channelz_channel != nullptr) {
channelz_channel->RecordCallSucceeded();
}
} else {
if (channelz_channel != nullptr) {
channelz_channel->RecordCallFailed();
}
error = grpc_error_set_int( error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error received from peer"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error received from peer"),
GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code)); GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
@ -1268,6 +1261,16 @@ static void post_batch_completion(batch_control* bctl) {
call->final_op.server.cancelled, nullptr, nullptr); call->final_op.server.cancelled, nullptr, nullptr);
} }
if (call->channel != nullptr) {
grpc_core::channelz::Channel* channelz_channel = grpc_channel_get_channelz_channel_node(call->channel);
if (*call->final_op.client.status != GRPC_STATUS_OK) {
channelz_channel->RecordCallFailed();
} else {
channelz_channel->RecordCallSucceeded();
}
}
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE; error = GRPC_ERROR_NONE;
} }
@ -1675,9 +1678,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string = stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string; &call->peer_string;
} }
grpc_core::channelz::Channel* channelz_channel =
grpc_channel_get_channelz_channel(call->channel);
channelz_channel->RecordCallStarted();
break; break;
} }
case GRPC_OP_SEND_MESSAGE: { case GRPC_OP_SEND_MESSAGE: {

@ -186,18 +186,11 @@ static grpc_channel_args* build_channel_args(
return grpc_channel_args_copy_and_add(input_args, new_args, num_new_args); return grpc_channel_args_copy_and_add(input_args, new_args, num_new_args);
} }
char* grpc_channel_get_trace(grpc_channel* channel) {
grpc_json* json = channel->channelz_channel->trace()->RenderJSON();
char* json_str = grpc_json_dump_to_string(json, 0);
grpc_json_destroy(json);
return json_str;
}
char* grpc_channel_render_channelz(grpc_channel* channel) { char* grpc_channel_render_channelz(grpc_channel* channel) {
return channel->channelz_channel->RenderJSON(); return channel->channelz_channel->RenderJSON();
} }
grpc_core::channelz::Channel* grpc_channel_get_channelz_channel( grpc_core::channelz::Channel* grpc_channel_get_channelz_channel_node(
grpc_channel* channel) { grpc_channel* channel) {
return channel->channelz_channel.get(); return channel->channelz_channel.get();
} }
@ -417,7 +410,6 @@ static void destroy_channel(void* arg, grpc_error* error) {
GRPC_MDELEM_UNREF(rc->authority); GRPC_MDELEM_UNREF(rc->authority);
gpr_free(rc); gpr_free(rc);
} }
channel->channelz_channel.reset();
gpr_mu_destroy(&channel->registered_call_mu); gpr_mu_destroy(&channel->registered_call_mu);
gpr_free(channel->target); gpr_free(channel->target);
gpr_free(channel); gpr_free(channel);

@ -51,9 +51,8 @@ grpc_call* grpc_channel_create_pollset_set_call(
/** Get a (borrowed) pointer to this channels underlying channel stack */ /** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel); grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel);
grpc_core::channelz::Channel* grpc_channel_get_channelz_channel( grpc_core::channelz::Channel* grpc_channel_get_channelz_channel_node(
grpc_channel* channel); grpc_channel* channel);
char* grpc_channel_render_channelz(grpc_channel* channel);
/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of /** Get a grpc_mdelem of grpc-status: X where X is the numeric value of
status_code. status_code.

@ -151,32 +151,27 @@ TEST_P(ChannelzChannelTest, BasicChannelAPIFunctionality) {
ValidateChannel(channelz_channel, {3, 3, 3}); ValidateChannel(channelz_channel, {3, 3, 3});
} }
TEST_P(ChannelzChannelTest, LastCallStartedTimestamp) { TEST_P(ChannelzChannelTest, LastCallStartedMillis) {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
ChannelFixture channel(GetParam()); ChannelFixture channel(GetParam());
intptr_t uuid = grpc_channel_get_uuid(channel.channel()); intptr_t uuid = grpc_channel_get_uuid(channel.channel());
Channel* channelz_channel = ChannelzRegistry::Get<Channel>(uuid); Channel* channelz_channel = ChannelzRegistry::Get<Channel>(uuid);
// start a call to set the last call started timestamp // start a call to set the last call started timestamp
channelz_channel->RecordCallStarted(); channelz_channel->RecordCallStarted();
grpc_millis millis1 = GetLastCallStartedMillis(channelz_channel); grpc_millis millis1 = GetLastCallStartedMillis(channelz_channel);
// time gone by should not affect the timestamp // time gone by should not affect the timestamp
ChannelzSleep(100); ChannelzSleep(100);
grpc_millis millis2 = GetLastCallStartedMillis(channelz_channel); grpc_millis millis2 = GetLastCallStartedMillis(channelz_channel);
EXPECT_EQ(millis1, millis2); EXPECT_EQ(millis1, millis2);
// calls succeeded or failed should not affect the timestamp // calls succeeded or failed should not affect the timestamp
ChannelzSleep(100); ChannelzSleep(100);
channelz_channel->RecordCallFailed(); channelz_channel->RecordCallFailed();
channelz_channel->RecordCallSucceeded(); channelz_channel->RecordCallSucceeded();
grpc_millis millis3 = GetLastCallStartedMillis(channelz_channel); grpc_millis millis3 = GetLastCallStartedMillis(channelz_channel);
EXPECT_EQ(millis1, millis3); EXPECT_EQ(millis1, millis3);
// another call started should affect the timestamp // another call started should affect the timestamp
// sleep for extra long to avoid flakes (since we cache Now()) // sleep for extra long to avoid flakes (since we cache Now())
ChannelzSleep(5000); ChannelzSleep(5000);
grpc_core::ExecCtx::Get()->InvalidateNow();
channelz_channel->RecordCallStarted(); channelz_channel->RecordCallStarted();
grpc_millis millis4 = GetLastCallStartedMillis(channelz_channel); grpc_millis millis4 = GetLastCallStartedMillis(channelz_channel);
EXPECT_NE(millis1, millis4); EXPECT_NE(millis1, millis4);

@ -258,7 +258,8 @@ static void test_invoke_simple_request(grpc_end2end_test_config config) {
// The following is a quick sanity check on channelz functionality. It // The following is a quick sanity check on channelz functionality. It
// ensures that core properly tracked the one call that occurred in this // ensures that core properly tracked the one call that occurred in this
// simple end2end test. // simple end2end test.
char* json = grpc_channel_render_channelz(f.client); grpc_core::channelz::Channel* channelz_channel = grpc_channel_get_channelz_channel_node(f.client);
char* json = channelz_channel->RenderJSON();
GPR_ASSERT(nullptr != strstr(json, "\"callsStarted\":\"1\"")); GPR_ASSERT(nullptr != strstr(json, "\"callsStarted\":\"1\""));
GPR_ASSERT(nullptr != strstr(json, "\"callsFailed\":\"1\"")); GPR_ASSERT(nullptr != strstr(json, "\"callsFailed\":\"1\""));
GPR_ASSERT(nullptr != strstr(json, "\"callsSucceeded\":\"-1\"")); GPR_ASSERT(nullptr != strstr(json, "\"callsSucceeded\":\"-1\""));
@ -279,7 +280,8 @@ static void test_invoke_10_simple_requests(grpc_end2end_test_config config) {
// The following is a quick sanity check on channelz functionality. It // The following is a quick sanity check on channelz functionality. It
// ensures that core properly tracked the ten calls that occurred. // ensures that core properly tracked the ten calls that occurred.
char* json = grpc_channel_render_channelz(f.client); grpc_core::channelz::Channel* channelz_channel = grpc_channel_get_channelz_channel_node(f.client);
char* json = channelz_channel->RenderJSON();
GPR_ASSERT(nullptr != strstr(json, "\"callsStarted\":\"10\"")); GPR_ASSERT(nullptr != strstr(json, "\"callsStarted\":\"10\""));
GPR_ASSERT(nullptr != strstr(json, "\"callsFailed\":\"10\"")); GPR_ASSERT(nullptr != strstr(json, "\"callsFailed\":\"10\""));
GPR_ASSERT(nullptr != strstr(json, "\"callsSucceeded\":\"-1\"")); GPR_ASSERT(nullptr != strstr(json, "\"callsSucceeded\":\"-1\""));

Loading…
Cancel
Save