Merge pull request #16904 from ncteisen/more-channel-tracing

Channel Tracing Name Resolution Support
pull/16959/head
Noah Eisen 6 years ago committed by GitHub
commit 14a09c4849
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 92
      src/core/ext/filters/client_channel/client_channel.cc
  2. 8
      src/core/lib/channel/channel_trace.cc
  3. 2
      test/core/end2end/tests/channelz.cc
  4. 64
      test/core/end2end/tests/retry_streaming.cc

@ -131,6 +131,8 @@ typedef struct client_channel_channel_data {
grpc_core::UniquePtr<char> info_service_config_json;
/* backpointer to grpc_channel's channelz node */
grpc_core::channelz::ClientChannelNode* channelz_channel;
/* caches if the last resolution event contained addresses */
bool previous_resolution_contained_addresses;
} channel_data;
typedef struct {
@ -401,6 +403,8 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
}
using TraceStringVector = grpc_core::InlinedVector<char*, 3>;
// Creates a new LB policy, replacing any previous one.
// If the new policy is created successfully, sets *connectivity_state and
// *connectivity_error to its initial connectivity state; otherwise,
@ -408,7 +412,7 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
static void create_new_lb_policy_locked(
channel_data* chand, char* lb_policy_name,
grpc_connectivity_state* connectivity_state,
grpc_error** connectivity_error) {
grpc_error** connectivity_error, TraceStringVector* trace_strings) {
grpc_core::LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = chand->combiner;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
@ -418,11 +422,21 @@ static void create_new_lb_policy_locked(
lb_policy_name, lb_policy_args);
if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
if (chand->channelz_channel != nullptr) {
char* str;
gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name);
trace_strings->push_back(str);
}
} else {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
lb_policy_name, new_lb_policy.get());
}
if (chand->channelz_channel != nullptr) {
char* str;
gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name);
trace_strings->push_back(str);
}
// Swap out the LB policy and update the fds in
// chand->interested_parties.
if (chand->lb_policy != nullptr) {
@ -497,6 +511,51 @@ get_service_config_from_resolver_result_locked(channel_data* chand) {
return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
}
static void maybe_add_trace_message_for_address_changes_locked(
channel_data* chand, TraceStringVector* trace_strings) {
int resolution_contains_addresses = false;
const grpc_arg* channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
grpc_lb_addresses* addresses =
static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
if (addresses->num_addresses > 0) {
resolution_contains_addresses = true;
}
}
if (!resolution_contains_addresses &&
chand->previous_resolution_contained_addresses) {
trace_strings->push_back(gpr_strdup("Address list became empty"));
} else if (resolution_contains_addresses &&
!chand->previous_resolution_contained_addresses) {
trace_strings->push_back(gpr_strdup("Address list became non-empty"));
}
chand->previous_resolution_contained_addresses =
resolution_contains_addresses;
}
static void concatenate_and_add_channel_trace_locked(
channel_data* chand, TraceStringVector* trace_strings) {
if (!trace_strings->empty()) {
gpr_strvec v;
gpr_strvec_init(&v);
gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
bool is_first = 1;
for (size_t i = 0; i < trace_strings->size(); ++i) {
if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
is_first = false;
gpr_strvec_add(&v, (*trace_strings)[i]);
}
char* flat;
size_t flat_len = 0;
flat = gpr_strvec_flatten(&v, &flat_len);
chand->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_new(flat, flat_len, gpr_free));
gpr_strvec_destroy(&v);
}
}
// Callback invoked when a resolver result is available.
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
@ -518,6 +577,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
}
// Data used to set the channel's connectivity state.
bool set_connectivity_state = true;
// We only want to trace the address resolution in the follow cases:
// (a) Address resolution resulted in service config change.
// (b) Address resolution that causes number of backends to go from
// zero to non-zero.
// (c) Address resolution that causes number of backends to go from
// non-zero to zero.
// (d) Address resolution that causes a new LB policy to be created.
//
// we track a list of strings to eventually be concatenated and traced.
TraceStringVector trace_strings;
grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* connectivity_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
@ -552,11 +621,29 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
} else {
// Instantiate new LB policy.
create_new_lb_policy_locked(chand, lb_policy_name.get(),
&connectivity_state, &connectivity_error);
&connectivity_state, &connectivity_error,
&trace_strings);
}
// Find service config.
grpc_core::UniquePtr<char> service_config_json =
get_service_config_from_resolver_result_locked(chand);
// Note: It's safe to use chand->info_service_config_json here without
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
if (chand->channelz_channel != nullptr) {
if (((service_config_json == nullptr) !=
(chand->info_service_config_json == nullptr)) ||
(service_config_json != nullptr &&
strcmp(service_config_json.get(),
chand->info_service_config_json.get()) != 0)) {
// TODO(ncteisen): might be worth somehow including a snippet of the
// config in the trace, at the risk of bloating the trace logs.
trace_strings.push_back(gpr_strdup("Service config changed"));
}
maybe_add_trace_message_for_address_changes_locked(chand, &trace_strings);
concatenate_and_add_channel_trace_locked(chand, &trace_strings);
}
// Swap out the data used by cc_get_channel_info().
gpr_mu_lock(&chand->info_mu);
chand->info_lb_policy_name = std::move(lb_policy_name);
@ -725,6 +812,7 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
chand->channelz_channel = nullptr;
chand->previous_resolution_contained_addresses = false;
// Record client channel factory.
arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);

@ -108,16 +108,20 @@ void ChannelTrace::AddTraceEventHelper(TraceEvent* new_trace_event) {
}
void ChannelTrace::AddTraceEvent(Severity severity, grpc_slice data) {
if (max_event_memory_ == 0)
if (max_event_memory_ == 0) {
grpc_slice_unref_internal(data);
return; // tracing is disabled if max_event_memory_ == 0
}
AddTraceEventHelper(New<TraceEvent>(severity, data));
}
void ChannelTrace::AddTraceEventWithReference(
Severity severity, grpc_slice data,
RefCountedPtr<BaseNode> referenced_entity) {
if (max_event_memory_ == 0)
if (max_event_memory_ == 0) {
grpc_slice_unref_internal(data);
return; // tracing is disabled if max_event_memory_ == 0
}
// create and fill up the new event
AddTraceEventHelper(
New<TraceEvent>(severity, data, std::move(referenced_entity)));

@ -288,6 +288,8 @@ static void test_channelz_with_channel_trace(grpc_end2end_test_config config) {
grpc_server_get_channelz_node(f.server);
GPR_ASSERT(channelz_server != nullptr);
run_one_request(config, f, true);
char* json = channelz_channel->RenderJsonString();
GPR_ASSERT(json != nullptr);
gpr_log(GPR_INFO, "%s", json);

@ -28,6 +28,9 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
@ -133,25 +136,30 @@ static void test_retry_streaming(grpc_end2end_test_config config) {
int was_cancelled = 2;
char* peer;
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
arg.key = const_cast<char*>(GRPC_ARG_SERVICE_CONFIG);
arg.value.string = const_cast<char*>(
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"service\", \"method\": \"method\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 3,\n"
" \"initialBackoff\": \"1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1.6,\n"
" \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
" }\n"
" } ]\n"
"}");
grpc_channel_args client_args = {1, &arg};
grpc_arg args[] = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE),
1024 * 8),
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_CHANNELZ), true),
grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
const_cast<char*>(
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"service\", \"method\": \"method\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 3,\n"
" \"initialBackoff\": \"1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1.6,\n"
" \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
" }\n"
" } ]\n"
"}"))};
grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args};
grpc_end2end_test_fixture f =
begin_test(config, "retry_streaming", &client_args, nullptr);
@ -161,6 +169,9 @@ static void test_retry_streaming(grpc_end2end_test_config config) {
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, deadline, nullptr);
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(f.client);
GPR_ASSERT(c);
peer = grpc_call_get_peer(c);
@ -384,6 +395,20 @@ static void test_retry_streaming(grpc_end2end_test_config config) {
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 1);
GPR_ASSERT(channelz_channel != nullptr);
char* json = channelz_channel->RenderJsonString();
GPR_ASSERT(json != nullptr);
gpr_log(GPR_INFO, "%s", json);
GPR_ASSERT(nullptr != strstr(json, "\"trace\""));
GPR_ASSERT(nullptr != strstr(json, "\"description\":\"Channel created\""));
GPR_ASSERT(nullptr != strstr(json, "\"severity\":\"CT_INFO\""));
GPR_ASSERT(nullptr != strstr(json, "Resolution event"));
GPR_ASSERT(nullptr != strstr(json, "Created new LB policy"));
GPR_ASSERT(nullptr != strstr(json, "Service config changed"));
GPR_ASSERT(nullptr != strstr(json, "Address list became non-empty"));
GPR_ASSERT(nullptr != strstr(json, "Channel state change to CONNECTING"));
gpr_free(json);
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
@ -414,6 +439,7 @@ static void test_retry_streaming(grpc_end2end_test_config config) {
void retry_streaming(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
test_retry_streaming(config);
}

Loading…
Cancel
Save