MErge master

pull/37387/head
tanvi-jagtap 9 months ago
commit 8205b30d2e
  1. 3
      grpc.def
  2. 19
      include/grpc/support/log.h
  3. 2
      src/core/ext/transport/binder/wire_format/wire_writer.cc
  4. 89
      src/core/lib/channel/promise_based_filter.cc
  5. 36
      src/core/lib/gprpp/work_serializer.cc
  6. 9
      src/core/lib/iomgr/ev_apple.cc
  7. 110
      src/core/lib/iomgr/ev_epoll1_linux.cc
  8. 8
      src/core/lib/iomgr/ev_poll_posix.cc
  9. 49
      src/core/lib/iomgr/tcp_posix.cc
  10. 5
      src/core/lib/iomgr/tcp_server_posix.cc
  11. 16
      src/core/lib/iomgr/tcp_windows.cc
  12. 9
      src/core/lib/iomgr/timer_manager.cc
  13. 23
      src/core/lib/promise/for_each.h
  14. 20
      src/core/lib/promise/inter_activity_latch.h
  15. 40
      src/core/lib/promise/latch.h
  16. 23
      src/core/lib/resource_quota/memory_quota.cc
  17. 9
      src/core/lib/security/authorization/grpc_authorization_policy_provider.cc
  18. 9
      src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
  19. 14
      src/core/lib/transport/bdp_estimator.cc
  20. 8
      src/core/lib/transport/call_filters.cc
  21. 69
      src/core/load_balancing/grpclb/grpclb.cc
  22. 48
      src/core/load_balancing/health_check_client.cc
  23. 63
      src/core/load_balancing/outlier_detection/outlier_detection.cc
  24. 151
      src/core/load_balancing/pick_first/pick_first.cc
  25. 72
      src/core/load_balancing/priority/priority.cc
  26. 26
      src/core/load_balancing/ring_hash/ring_hash.cc
  27. 105
      src/core/load_balancing/rls/rls.cc
  28. 34
      src/core/load_balancing/round_robin/round_robin.cc
  29. 49
      src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc
  30. 57
      src/core/load_balancing/weighted_target/weighted_target.cc
  31. 47
      src/core/load_balancing/xds/cds.cc
  32. 19
      src/core/load_balancing/xds/xds_cluster_impl.cc
  33. 52
      src/core/load_balancing/xds/xds_cluster_manager.cc
  34. 56
      src/core/load_balancing/xds/xds_override_host.cc
  35. 23
      src/core/load_balancing/xds/xds_wrr_locality.cc
  36. 1
      src/core/util/android/log.cc
  37. 1
      src/core/util/linux/log.cc
  38. 9
      src/core/util/log.cc
  39. 1
      src/core/util/posix/log.cc
  40. 1
      src/core/util/windows/log.cc
  41. 12
      src/python/grpcio_reflection/grpc_reflection/v1alpha/_async.py
  42. 40
      src/python/grpcio_reflection/grpc_reflection/v1alpha/_base.py
  43. 12
      src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py
  44. 9
      src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py
  45. 9
      src/python/grpcio_tests/tests_aio/reflection/reflection_servicer_test.py
  46. 2
      src/ruby/ext/grpc/rb_call.c
  47. 4
      src/ruby/ext/grpc/rb_call_credentials.c
  48. 2
      src/ruby/ext/grpc/rb_grpc.c
  49. 6
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  50. 9
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  51. 6
      src/ruby/ext/grpc/rb_server.c
  52. 17
      tools/run_tests/sanity/banned_functions.py

3
grpc.def generated

@ -231,9 +231,8 @@ EXPORTS
gpr_cpu_num_cores
gpr_cpu_current_cpu
gpr_log
gpr_should_log
absl_vlog2_enabled
gpr_log_verbosity_init
gpr_set_log_function
gpr_format_message
gpr_strdup
gpr_asprintf

@ -51,26 +51,11 @@ typedef enum gpr_log_severity {
GPRAPI void gpr_log(const char* file, int line, gpr_log_severity severity,
const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
GPRAPI int gpr_should_log(gpr_log_severity severity);
/** Deprecated. **/
GPRAPI int absl_vlog2_enabled();
GPRAPI void gpr_log_verbosity_init(void);
/** Log overrides: applications can use this API to intercept logging calls
and use their own implementations */
struct gpr_log_func_args {
const char* file;
int line;
gpr_log_severity severity;
const char* message;
};
typedef struct gpr_log_func_args gpr_log_func_args;
typedef void (*gpr_log_func)(gpr_log_func_args* args);
GPRAPI void gpr_set_log_function(gpr_log_func deprecated_setting);
#ifdef __cplusplus
}
#endif

@ -377,7 +377,7 @@ void WireWriterImpl::TryScheduleTransaction() {
} else {
// It is common to fill `kFlowControlWindowSize` completely because
// transactions are send at faster rate than the other end of transport
// can handle it, so here we use `GPR_DEBUG` log level.
// can handle it, so here we use VLOG(2).
VLOG(2) << "Some work cannot be scheduled yet due to slow ack from the "
"other end of transport. This transport might be blocked if "
"this number don't go down. pending_outgoing_tx_.size() = "

@ -398,11 +398,9 @@ bool BaseCallData::SendMessage::IsIdle() const {
void BaseCallData::SendMessage::OnComplete(absl::Status status) {
Flusher flusher(base_);
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << base_->LogTag()
<< " SendMessage.OnComplete st=" << StateString(state_)
<< " status=" << status;
}
GRPC_TRACE_LOG(channel, INFO)
<< base_->LogTag() << " SendMessage.OnComplete st=" << StateString(state_)
<< " status=" << status;
switch (state_) {
case State::kInitial:
case State::kIdle:
@ -429,11 +427,9 @@ void BaseCallData::SendMessage::OnComplete(absl::Status status) {
void BaseCallData::SendMessage::Done(const ServerMetadata& metadata,
Flusher* flusher) {
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << base_->LogTag()
<< " SendMessage.Done st=" << StateString(state_)
<< " md=" << metadata.DebugString();
}
GRPC_TRACE_LOG(channel, INFO)
<< base_->LogTag() << " SendMessage.Done st=" << StateString(state_)
<< " md=" << metadata.DebugString();
switch (state_) {
case State::kCancelled:
case State::kCancelledButNotYetPolled:
@ -681,11 +677,10 @@ void BaseCallData::ReceiveMessage::GotPipe(T* pipe_end) {
}
void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) {
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << base_->LogTag()
<< " ReceiveMessage.OnComplete st=" << StateString(state_)
<< " status=" << status;
}
GRPC_TRACE_LOG(channel, INFO)
<< base_->LogTag()
<< " ReceiveMessage.OnComplete st=" << StateString(state_)
<< " status=" << status;
switch (state_) {
case State::kInitial:
case State::kIdle:
@ -722,11 +717,9 @@ void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) {
void BaseCallData::ReceiveMessage::Done(const ServerMetadata& metadata,
Flusher* flusher) {
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << base_->LogTag()
<< " ReceiveMessage.Done st=" << StateString(state_)
<< " md=" << metadata.DebugString();
}
GRPC_TRACE_LOG(channel, INFO)
<< base_->LogTag() << " ReceiveMessage.Done st=" << StateString(state_)
<< " md=" << metadata.DebugString();
switch (state_) {
case State::kInitial:
state_ = State::kCancelled;
@ -842,11 +835,10 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher,
CHECK(push_.has_value());
auto r_push = (*push_)();
if (auto* p = r_push.value_if_ready()) {
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << base_->LogTag()
<< " ReceiveMessage.WakeInsideCombiner push complete: "
<< (*p ? "true" : "false");
}
GRPC_TRACE_LOG(channel, INFO)
<< base_->LogTag()
<< " ReceiveMessage.WakeInsideCombiner push complete: "
<< (*p ? "true" : "false");
// We haven't pulled through yet, so this certainly shouldn't succeed.
CHECK(!*p);
state_ = State::kCancelled;
@ -1366,9 +1358,7 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) {
CapturedBatch batch(b);
Flusher flusher(this);
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << LogTag() << " StartBatch " << DebugString();
}
GRPC_TRACE_LOG(channel, INFO) << LogTag() << " StartBatch " << DebugString();
// If this is a cancel stream, cancel anything we have pending and propagate
// the cancellation.
@ -1489,9 +1479,8 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* b) {
// Handle cancellation.
void ClientCallData::Cancel(grpc_error_handle error, Flusher* flusher) {
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << LogTag() << " Cancel error=" << error.ToString();
}
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << " Cancel error=" << error.ToString();
// Track the latest reason for cancellation.
cancelled_error_ = error;
// Stop running the promise.
@ -1568,11 +1557,10 @@ void ClientCallData::StartPromise(Flusher* flusher) {
}
void ClientCallData::RecvInitialMetadataReady(grpc_error_handle error) {
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << LogTag() << " ClientCallData.RecvInitialMetadataReady "
<< DebugString() << " error:" << error.ToString()
<< " md:" << recv_initial_metadata_->metadata->DebugString();
}
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << " ClientCallData.RecvInitialMetadataReady "
<< DebugString() << " error:" << error.ToString()
<< " md:" << recv_initial_metadata_->metadata->DebugString();
ScopedContext context(this);
Flusher flusher(this);
if (!error.ok()) {
@ -1974,9 +1962,8 @@ ServerCallData::ServerCallData(grpc_call_element* elem,
}
ServerCallData::~ServerCallData() {
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << LogTag() << " ~ServerCallData " << DebugString();
}
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << " ~ServerCallData " << DebugString();
if (send_initial_metadata_ != nullptr) {
send_initial_metadata_->~SendInitialMetadata();
}
@ -2001,9 +1988,7 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
Flusher flusher(this);
bool wake = false;
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << LogTag() << " StartBatch: " << DebugString();
}
GRPC_TRACE_LOG(channel, INFO) << LogTag() << " StartBatch: " << DebugString();
// If this is a cancel stream, cancel anything we have pending and
// propagate the cancellation.
@ -2306,9 +2291,8 @@ void ServerCallData::RecvInitialMetadataReadyCallback(void* arg,
void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) {
Flusher flusher(this);
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << LogTag() << ": RecvInitialMetadataReady " << error;
}
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << ": RecvInitialMetadataReady " << error;
CHECK(recv_initial_state_ == RecvInitialState::kForwarded);
// If there was an error we just propagate that through
if (!error.ok()) {
@ -2370,9 +2354,8 @@ std::string ServerCallData::DebugString() const {
// Wakeup and poll the promise if appropriate.
void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
PollContext poll_ctx(this, flusher);
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << LogTag() << ": WakeInsideCombiner " << DebugString();
}
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << ": WakeInsideCombiner " << DebugString();
poll_ctx.ClearRepoll();
if (send_initial_metadata_ != nullptr) {
if (send_initial_metadata_->state ==
@ -2392,12 +2375,12 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
}
if (send_initial_metadata_->metadata_push_.has_value()) {
if ((*send_initial_metadata_->metadata_push_)().ready()) {
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << LogTag() << ": WakeInsideCombiner: metadata_push done";
}
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << ": WakeInsideCombiner: metadata_push done";
send_initial_metadata_->metadata_push_.reset();
} else if (GRPC_TRACE_FLAG_ENABLED(channel)) {
LOG(INFO) << LogTag() << ": WakeInsideCombiner: metadata_push pending";
} else {
GRPC_TRACE_LOG(channel, INFO)
<< LogTag() << ": WakeInsideCombiner: metadata_push pending";
}
}
}

@ -161,9 +161,8 @@ void WorkSerializer::LegacyWorkSerializer::Run(std::function<void()> callback,
refs_.fetch_sub(MakeRefPair(1, 0), std::memory_order_acq_rel);
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
LOG(INFO) << " Scheduling on queue : item " << cb_wrapper;
}
GRPC_TRACE_LOG(work_serializer, INFO)
<< " Scheduling on queue : item " << cb_wrapper;
queue_.Push(&cb_wrapper->mpscq_node);
}
}
@ -172,19 +171,15 @@ void WorkSerializer::LegacyWorkSerializer::Schedule(
std::function<void()> callback, const DebugLocation& location) {
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
LOG(INFO) << "WorkSerializer::Schedule() " << this
<< " Scheduling callback " << cb_wrapper << " ["
<< location.file() << ":" << location.line() << "]";
}
GRPC_TRACE_LOG(work_serializer, INFO)
<< "WorkSerializer::Schedule() " << this << " Scheduling callback "
<< cb_wrapper << " [" << location.file() << ":" << location.line() << "]";
refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_acq_rel);
queue_.Push(&cb_wrapper->mpscq_node);
}
void WorkSerializer::LegacyWorkSerializer::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
LOG(INFO) << "WorkSerializer::Orphan() " << this;
}
GRPC_TRACE_LOG(work_serializer, INFO) << "WorkSerializer::Orphan() " << this;
const uint64_t prev_ref_pair =
refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
if (GetOwners(prev_ref_pair) == 0 && GetSize(prev_ref_pair) == 1) {
@ -196,9 +191,8 @@ void WorkSerializer::LegacyWorkSerializer::Orphan() {
// The thread that calls this loans itself to the work serializer so as to
// execute all the scheduled callbacks.
void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
LOG(INFO) << "WorkSerializer::DrainQueue() " << this;
}
GRPC_TRACE_LOG(work_serializer, INFO)
<< "WorkSerializer::DrainQueue() " << this;
// Attempt to take ownership of the WorkSerializer. Also increment the queue
// size as required by `DrainQueueOwned()`.
const uint64_t prev_ref_pair =
@ -217,9 +211,8 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
}
void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
LOG(INFO) << "WorkSerializer::DrainQueueOwned() " << this;
}
GRPC_TRACE_LOG(work_serializer, INFO)
<< "WorkSerializer::DrainQueueOwned() " << this;
while (true) {
auto prev_ref_pair = refs_.fetch_sub(MakeRefPair(0, 1));
// It is possible that while draining the queue, the last callback ended
@ -264,11 +257,10 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
GRPC_TRACE_LOG(work_serializer, INFO)
<< " Queue returned nullptr, trying again";
}
if (GRPC_TRACE_FLAG_ENABLED(work_serializer)) {
LOG(INFO) << " Running item " << cb_wrapper
<< " : callback scheduled at [" << cb_wrapper->location.file()
<< ":" << cb_wrapper->location.line() << "]";
}
GRPC_TRACE_LOG(work_serializer, INFO)
<< " Running item " << cb_wrapper << " : callback scheduled at ["
<< cb_wrapper->location.file() << ":" << cb_wrapper->location.line()
<< "]";
cb_wrapper->callback();
delete cb_wrapper;
}

@ -39,15 +39,6 @@
#include "src/core/lib/gprpp/time_util.h"
#include "src/core/lib/iomgr/ev_apple.h"
#ifndef NDEBUG
#define GRPC_POLLING_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(apple_polling)) { \
VLOG(2) << "(polling) " << absl::StrFormat(format, __VA_ARGS__); \
}
#else
#define GRPC_POLLING_TRACE(...)
#endif // NDEBUG
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
struct GlobalRunLoopContext {

@ -727,9 +727,8 @@ static grpc_error_handle do_epoll_wait(grpc_pollset* ps,
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << "ps: " << ps << " poll got " << r << " events";
}
GRPC_TRACE_LOG(polling, INFO)
<< "ps: " << ps << " poll got " << r << " events";
gpr_atm_rel_store(&g_epoll_set.num_events, r);
gpr_atm_rel_store(&g_epoll_set.cursor, 0);
@ -746,9 +745,8 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
pollset->begin_refs++;
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << "PS:" << pollset << " BEGIN_STARTS:" << worker;
}
GRPC_TRACE_LOG(polling, INFO)
<< "PS:" << pollset << " BEGIN_STARTS:" << worker;
if (pollset->seen_inactive) {
// pollset has been observed to be inactive, we need to move back to the
@ -765,11 +763,10 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
retry_lock_neighborhood:
gpr_mu_lock(&neighborhood->mu);
gpr_mu_lock(&pollset->mu);
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << "PS:" << pollset << " BEGIN_REORG:" << worker
<< " kick_state=" << kick_state_string(worker->state)
<< " is_reassigning=" << is_reassigning;
}
GRPC_TRACE_LOG(polling, INFO)
<< "PS:" << pollset << " BEGIN_REORG:" << worker
<< " kick_state=" << kick_state_string(worker->state)
<< " is_reassigning=" << is_reassigning;
if (pollset->seen_inactive) {
if (neighborhood != pollset->neighborhood) {
gpr_mu_unlock(&neighborhood->mu);
@ -818,11 +815,10 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
while (worker->state == UNKICKED && !pollset->shutting_down) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << "PS:" << pollset << " BEGIN_WAIT:" << worker
<< " kick_state=" << kick_state_string(worker->state)
<< " shutdown=" << pollset->shutting_down;
}
GRPC_TRACE_LOG(polling, INFO)
<< "PS:" << pollset << " BEGIN_WAIT:" << worker
<< " kick_state=" << kick_state_string(worker->state)
<< " shutdown=" << pollset->shutting_down;
if (gpr_cv_wait(&worker->cv, &pollset->mu,
deadline.as_timespec(GPR_CLOCK_MONOTONIC)) &&
@ -877,17 +873,15 @@ static bool check_neighborhood_for_available_poller(
if (gpr_atm_no_barrier_cas(
&g_active_poller, 0,
reinterpret_cast<gpr_atm>(inspect_worker))) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. choose next poller to be " << inspect_worker;
}
GRPC_TRACE_LOG(polling, INFO)
<< " .. choose next poller to be " << inspect_worker;
SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
if (inspect_worker->initialized_cv) {
gpr_cv_signal(&inspect_worker->cv);
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. beaten to choose next poller";
}
GRPC_TRACE_LOG(polling, INFO)
<< " .. beaten to choose next poller";
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true;
@ -903,9 +897,8 @@ static bool check_neighborhood_for_available_poller(
} while (!found_worker && inspect_worker != inspect->root_worker);
}
if (!found_worker) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. mark pollset " << inspect << " inactive";
}
GRPC_TRACE_LOG(polling, INFO)
<< " .. mark pollset " << inspect << " inactive";
inspect->seen_inactive = true;
if (inspect == neighborhood->active_root) {
neighborhood->active_root =
@ -922,9 +915,7 @@ static bool check_neighborhood_for_available_poller(
static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
grpc_pollset_worker** worker_hdl) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << "PS:" << pollset << " END_WORKER:" << worker;
}
GRPC_TRACE_LOG(polling, INFO) << "PS:" << pollset << " END_WORKER:" << worker;
if (worker_hdl != nullptr) *worker_hdl = nullptr;
// Make sure we appear kicked
SET_KICK_STATE(worker, KICKED);
@ -933,9 +924,8 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
if (gpr_atm_no_barrier_load(&g_active_poller) ==
reinterpret_cast<gpr_atm>(worker)) {
if (worker->next != worker && worker->next->state == UNKICKED) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. choose next poller to be peer " << worker;
}
GRPC_TRACE_LOG(polling, INFO)
<< " .. choose next poller to be peer " << worker;
CHECK(worker->next->initialized_cv);
gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
@ -984,9 +974,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. remove worker";
}
GRPC_TRACE_LOG(polling, INFO) << " .. remove worker";
if (EMPTIED == worker_remove(pollset, worker)) {
pollset_maybe_finish_shutdown(pollset);
}
@ -1075,22 +1063,16 @@ static grpc_error_handle pollset_kick(grpc_pollset* pollset,
grpc_pollset_worker* root_worker = pollset->root_worker;
if (root_worker == nullptr) {
pollset->kicked_without_poller = true;
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. kicked_without_poller";
}
GRPC_TRACE_LOG(polling, INFO) << " .. kicked_without_poller";
goto done;
}
grpc_pollset_worker* next_worker = root_worker->next;
if (root_worker->state == KICKED) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. already kicked " << root_worker;
}
GRPC_TRACE_LOG(polling, INFO) << " .. already kicked " << root_worker;
SET_KICK_STATE(root_worker, KICKED);
goto done;
} else if (next_worker->state == KICKED) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. already kicked " << next_worker;
}
GRPC_TRACE_LOG(polling, INFO) << " .. already kicked " << next_worker;
SET_KICK_STATE(next_worker, KICKED);
goto done;
} else if (root_worker == next_worker && // only try and wake up a poller
@ -1098,27 +1080,22 @@ static grpc_error_handle pollset_kick(grpc_pollset* pollset,
root_worker ==
reinterpret_cast<grpc_pollset_worker*>(
gpr_atm_no_barrier_load(&g_active_poller))) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. kicked " << root_worker;
}
GRPC_TRACE_LOG(polling, INFO) << " .. kicked " << root_worker;
SET_KICK_STATE(root_worker, KICKED);
ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
goto done;
} else if (next_worker->state == UNKICKED) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. kicked " << next_worker;
}
GRPC_TRACE_LOG(polling, INFO) << " .. kicked " << next_worker;
CHECK(next_worker->initialized_cv);
SET_KICK_STATE(next_worker, KICKED);
gpr_cv_signal(&next_worker->cv);
goto done;
} else if (next_worker->state == DESIGNATED_POLLER) {
if (root_worker->state != DESIGNATED_POLLER) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. kicked root non-poller " << root_worker
<< " (initialized_cv=" << root_worker->initialized_cv
<< ") (poller=" << next_worker << ")";
}
GRPC_TRACE_LOG(polling, INFO)
<< " .. kicked root non-poller " << root_worker
<< " (initialized_cv=" << root_worker->initialized_cv
<< ") (poller=" << next_worker << ")";
SET_KICK_STATE(root_worker, KICKED);
if (root_worker->initialized_cv) {
gpr_cv_signal(&root_worker->cv);
@ -1137,9 +1114,7 @@ static grpc_error_handle pollset_kick(grpc_pollset* pollset,
goto done;
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. kicked while waking up";
}
GRPC_TRACE_LOG(polling, INFO) << " .. kicked while waking up";
goto done;
}
@ -1147,36 +1122,27 @@ static grpc_error_handle pollset_kick(grpc_pollset* pollset,
}
if (specific_worker->state == KICKED) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. specific worker already kicked";
}
GRPC_TRACE_LOG(polling, INFO) << " .. specific worker already kicked";
goto done;
} else if (g_current_thread_worker == specific_worker) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. mark " << specific_worker << " kicked";
}
GRPC_TRACE_LOG(polling, INFO)
<< " .. mark " << specific_worker << " kicked";
SET_KICK_STATE(specific_worker, KICKED);
goto done;
} else if (specific_worker ==
reinterpret_cast<grpc_pollset_worker*>(
gpr_atm_no_barrier_load(&g_active_poller))) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. kick active poller";
}
GRPC_TRACE_LOG(polling, INFO) << " .. kick active poller";
SET_KICK_STATE(specific_worker, KICKED);
ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
goto done;
} else if (specific_worker->initialized_cv) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. kick waiting worker";
}
GRPC_TRACE_LOG(polling, INFO) << " .. kick waiting worker";
SET_KICK_STATE(specific_worker, KICKED);
gpr_cv_signal(&specific_worker->cv);
goto done;
} else {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << " .. kick non-waiting worker";
}
GRPC_TRACE_LOG(polling, INFO) << " .. kick non-waiting worker";
SET_KICK_STATE(specific_worker, KICKED);
goto done;
}

@ -1028,9 +1028,7 @@ static grpc_error_handle pollset_work(grpc_pollset* pollset,
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << pollset << " poll=" << r;
}
GRPC_TRACE_LOG(polling, INFO) << pollset << " poll=" << r;
if (r < 0) {
if (errno != EINTR) {
@ -1052,9 +1050,7 @@ static grpc_error_handle pollset_work(grpc_pollset* pollset,
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
LOG(INFO) << pollset << ": got_wakeup";
}
GRPC_TRACE_LOG(polling, INFO) << pollset << ": got_wakeup";
work_combine_error(
&error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
}

@ -618,18 +618,14 @@ static void tcp_drop_uncovered_then_handle_write(void* arg /* grpc_tcp */,
static void done_poller(void* bp, grpc_error_handle /*error_ignored*/) {
backup_poller* p = static_cast<backup_poller*>(bp);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "BACKUP_POLLER:" << p << " destroy";
}
GRPC_TRACE_LOG(tcp, INFO) << "BACKUP_POLLER:" << p << " destroy";
grpc_pollset_destroy(BACKUP_POLLER_POLLSET(p));
gpr_free(p);
}
static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {
backup_poller* p = static_cast<backup_poller*>(bp);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "BACKUP_POLLER:" << p << " run";
}
GRPC_TRACE_LOG(tcp, INFO) << "BACKUP_POLLER:" << p << " run";
gpr_mu_lock(p->pollset_mu);
grpc_core::Timestamp deadline =
grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(10);
@ -644,17 +640,13 @@ static void run_poller(void* bp, grpc_error_handle /*error_ignored*/) {
g_backup_poller = nullptr;
g_uncovered_notifications_pending = 0;
g_backup_poller_mu->Unlock();
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "BACKUP_POLLER:" << p << " shutdown";
}
GRPC_TRACE_LOG(tcp, INFO) << "BACKUP_POLLER:" << p << " shutdown";
grpc_pollset_shutdown(BACKUP_POLLER_POLLSET(p),
GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p,
grpc_schedule_on_exec_ctx));
} else {
g_backup_poller_mu->Unlock();
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "BACKUP_POLLER:" << p << " reschedule";
}
GRPC_TRACE_LOG(tcp, INFO) << "BACKUP_POLLER:" << p << " reschedule";
grpc_core::Executor::Run(&p->run_poller, absl::OkStatus(),
grpc_core::ExecutorType::DEFAULT,
grpc_core::ExecutorJobType::LONG);
@ -691,9 +683,7 @@ static void cover_self(grpc_tcp* tcp) {
g_backup_poller = p;
grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
g_backup_poller_mu->Unlock();
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "BACKUP_POLLER:" << p << " create";
}
GRPC_TRACE_LOG(tcp, INFO) << "BACKUP_POLLER:" << p << " create";
grpc_core::Executor::Run(
GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, nullptr),
absl::OkStatus(), grpc_core::ExecutorType::DEFAULT,
@ -709,16 +699,12 @@ static void cover_self(grpc_tcp* tcp) {
}
static void notify_on_read(grpc_tcp* tcp) {
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " notify_on_read";
}
GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " notify_on_read";
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_done_closure);
}
static void notify_on_write(grpc_tcp* tcp) {
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " notify_on_write";
}
GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " notify_on_write";
if (!grpc_event_engine_run_in_background()) {
cover_self(tcp);
}
@ -815,9 +801,8 @@ static void tcp_destroy(grpc_endpoint* ep) {
static void perform_reclamation(grpc_tcp* tcp)
ABSL_LOCKS_EXCLUDED(tcp->read_mu) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
LOG(INFO) << "TCP: benign reclamation to free memory";
}
GRPC_TRACE_LOG(resource_quota, INFO)
<< "TCP: benign reclamation to free memory";
tcp->read_mu.Lock();
if (tcp->incoming_buffer != nullptr) {
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
@ -910,9 +895,7 @@ static void update_rcvlowat(grpc_tcp* tcp)
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
GRPC_LATENT_SEE_INNER_SCOPE("tcp_do_read");
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " do_read";
}
GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " do_read";
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
@ -1479,9 +1462,7 @@ static bool process_errors(grpc_tcp* tcp) {
static void tcp_handle_error(void* arg /* grpc_tcp */,
grpc_error_handle error) {
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " got_error: " << error;
}
GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " got_error: " << error;
if (!error.ok() ||
static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
@ -1809,9 +1790,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */,
? tcp_flush_zerocopy(tcp, tcp->current_zerocopy_send, &error)
: tcp_flush(tcp, &error);
if (!flush_result) {
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "write: delayed";
}
GRPC_TRACE_LOG(tcp, INFO) << "write: delayed";
notify_on_write(tcp);
// tcp_flush does not populate error if it has returned false.
DCHECK(error.ok());
@ -1880,9 +1859,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
TCP_REF(tcp, "write");
tcp->write_cb = cb;
tcp->current_zerocopy_send = zerocopy_send_record;
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "write: delayed";
}
GRPC_TRACE_LOG(tcp, INFO) << "write: delayed";
notify_on_write(tcp);
} else {
GRPC_TRACE_LOG(tcp, INFO) << "write: " << grpc_core::StatusToString(error);

@ -459,9 +459,8 @@ static void on_read(void* arg, grpc_error_handle err) {
LOG(ERROR) << "Invalid address: " << addr_uri.status();
goto error;
}
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "SERVER_CONNECT: incoming connection: " << *addr_uri;
}
GRPC_TRACE_LOG(tcp, INFO)
<< "SERVER_CONNECT: incoming connection: " << *addr_uri;
std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);

@ -176,9 +176,7 @@ static void on_read(void* tcpp, grpc_error_handle error) {
grpc_winsocket* socket = tcp->socket;
grpc_winsocket_callback_info* info = &socket->read_info;
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " on_read";
}
GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " on_read";
if (error.ok()) {
if (info->wsa_error != 0 && !tcp->shutting_down) {
@ -208,9 +206,7 @@ static void on_read(void* tcpp, grpc_error_handle error) {
}
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " unref read_slice";
}
GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " unref read_slice";
grpc_slice_buffer_reset_and_unref(tcp->read_slices);
error = grpc_error_set_int(
tcp->shutting_down ? GRPC_ERROR_CREATE("TCP stream shutting down")
@ -239,9 +235,7 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
WSABUF buffers[MAX_WSABUF_COUNT];
size_t i;
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " win_read";
}
GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " win_read";
if (tcp->shutting_down) {
grpc_core::ExecCtx::Run(
@ -310,9 +304,7 @@ static void on_write(void* tcpp, grpc_error_handle error) {
grpc_winsocket_callback_info* info = &handle->write_info;
grpc_closure* cb;
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " on_write";
}
GRPC_TRACE_LOG(tcp, INFO) << "TCP:" << tcp << " on_write";
gpr_mu_lock(&tcp->mu);
cb = tcp->write_cb;

@ -195,11 +195,10 @@ static bool wait_until(grpc_core::Timestamp next) {
gpr_cv_wait(&g_cv_wait, &g_mu, next.as_timespec(GPR_CLOCK_MONOTONIC));
if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
LOG(INFO) << "wait ended: was_timed:"
<< (my_timed_waiter_generation == g_timed_waiter_generation)
<< " kicked:" << g_kicked;
}
GRPC_TRACE_LOG(timer_check, INFO)
<< "wait ended: was_timed:"
<< (my_timed_waiter_generation == g_timed_waiter_generation)
<< " kicked:" << g_kicked;
// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above)

@ -172,16 +172,13 @@ class ForEach {
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Poll<Result> PollReaderNext() {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << " PollReaderNext";
}
GRPC_TRACE_LOG(promise_primitives, INFO) << DebugTag() << " PollReaderNext";
auto r = reader_next_();
if (auto* p = r.value_if_ready()) {
switch (NextValueTraits<ReaderResult>::Type(*p)) {
case NextValueType::kValue: {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << " PollReaderNext: got value";
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << " PollReaderNext: got value";
Destruct(&reader_next_);
auto action = action_factory_.Make(
std::move(NextValueTraits<ReaderResult>::MutableValue(*p)));
@ -190,15 +187,13 @@ class ForEach {
return PollAction();
}
case NextValueType::kEndOfStream: {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << " PollReaderNext: got end of stream";
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << " PollReaderNext: got end of stream";
return Done<Result>::Make(false);
}
case NextValueType::kError: {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << " PollReaderNext: got error";
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << " PollReaderNext: got error";
return Done<Result>::Make(true);
}
}
@ -207,9 +202,7 @@ class ForEach {
}
Poll<Result> PollAction() {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << " PollAction";
}
GRPC_TRACE_LOG(promise_primitives, INFO) << DebugTag() << " PollAction";
auto r = in_action_.promise();
if (auto* p = r.value_if_ready()) {
if (IsStatusOk(*p)) {

@ -45,9 +45,8 @@ class InterActivityLatch {
auto Wait() {
return [this]() -> Poll<T> {
MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "PollWait " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "PollWait " << StateString();
if (is_set_) {
return std::move(value_);
} else {
@ -60,9 +59,8 @@ class InterActivityLatch {
// Set the latch.
void Set(T value) {
MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "Set " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "Set " << StateString();
is_set_ = true;
value_ = std::move(value);
waiters_.WakeupAsync();
@ -102,9 +100,8 @@ class InterActivityLatch<void> {
auto Wait() {
return [this]() -> Poll<Empty> {
MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "PollWait " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "PollWait " << StateString();
if (is_set_) {
return Empty{};
} else {
@ -117,9 +114,8 @@ class InterActivityLatch<void> {
// Set the latch.
void Set() {
MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "Set " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "Set " << StateString();
is_set_ = true;
waiters_.WakeupAsync();
}

@ -67,9 +67,8 @@ class Latch {
has_had_waiters_ = true;
#endif
return [this]() -> Poll<T> {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "Wait " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "Wait " << StateString();
if (has_value_) {
return std::move(value_);
} else {
@ -85,9 +84,8 @@ class Latch {
has_had_waiters_ = true;
#endif
return [this]() -> Poll<T> {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "WaitAndCopy " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "WaitAndCopy " << StateString();
if (has_value_) {
return value_;
} else {
@ -98,9 +96,8 @@ class Latch {
// Set the value of the latch. Can only be called once.
void Set(T value) {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "Set " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "Set " << StateString();
DCHECK(!has_value_);
value_ = std::move(value);
has_value_ = true;
@ -161,9 +158,8 @@ class Latch<void> {
has_had_waiters_ = true;
#endif
return [this]() -> Poll<Empty> {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "PollWait " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "PollWait " << StateString();
if (is_set_) {
return Empty{};
} else {
@ -174,9 +170,8 @@ class Latch<void> {
// Set the latch. Can only be called once.
void Set() {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "Set " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "Set " << StateString();
DCHECK(!is_set_);
is_set_ = true;
waiter_.Wake();
@ -223,9 +218,8 @@ class ExternallyObservableLatch<void> {
// Produce a promise to wait for this latch.
auto Wait() {
return [this]() -> Poll<Empty> {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "PollWait " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "PollWait " << StateString();
if (IsSet()) {
return Empty{};
} else {
@ -236,9 +230,8 @@ class ExternallyObservableLatch<void> {
// Set the latch.
void Set() {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "Set " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "Set " << StateString();
is_set_.store(true, std::memory_order_relaxed);
waiter_.Wake();
}
@ -246,9 +239,8 @@ class ExternallyObservableLatch<void> {
bool IsSet() const { return is_set_.load(std::memory_order_relaxed); }
void Reset() {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
LOG(INFO) << DebugTag() << "Reset " << StateString();
}
GRPC_TRACE_LOG(promise_primitives, INFO)
<< DebugTag() << "Reset " << StateString();
is_set_.store(false, std::memory_order_relaxed);
}

@ -355,9 +355,8 @@ void GrpcMemoryAllocatorImpl::MaybeDonateBack() {
if (free_bytes_.compare_exchange_weak(free, new_free,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
LOG(INFO) << "[" << this << "] Early return " << ret << " bytes";
}
GRPC_TRACE_LOG(resource_quota, INFO)
<< "[" << this << "] Early return " << ret << " bytes";
CHECK(taken_bytes_.fetch_sub(ret, std::memory_order_relaxed) >= ret);
memory_quota_->Return(ret);
return;
@ -548,9 +547,7 @@ void BasicMemoryQuota::Return(size_t amount) {
}
void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
LOG(INFO) << "Adding allocator " << allocator;
}
GRPC_TRACE_LOG(resource_quota, INFO) << "Adding allocator " << allocator;
AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator);
@ -561,9 +558,7 @@ void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
}
void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
LOG(INFO) << "Removing allocator " << allocator;
}
GRPC_TRACE_LOG(resource_quota, INFO) << "Removing allocator " << allocator;
AllocatorBucket::Shard& small_shard =
small_allocators_.SelectShard(allocator);
@ -608,9 +603,8 @@ void BasicMemoryQuota::MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator,
void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
LOG(INFO) << "Moving allocator " << allocator << " to small";
}
GRPC_TRACE_LOG(resource_quota, INFO)
<< "Moving allocator " << allocator << " to small";
AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator);
@ -629,9 +623,8 @@ void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig(
GrpcMemoryAllocatorImpl* allocator) {
if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
LOG(INFO) << "Moving allocator " << allocator << " to big";
}
GRPC_TRACE_LOG(resource_quota, INFO)
<< "Moving allocator " << allocator << " to big";
AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator);

@ -167,11 +167,10 @@ absl::Status FileWatcherAuthorizationPolicyProvider::ForceUpdate() {
if (cb_ != nullptr) {
cb_(contents_changed, absl::OkStatus());
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_authz_api)) {
LOG(INFO) << "authorization policy reload status: successfully loaded new "
"policy\n"
<< file_contents_;
}
GRPC_TRACE_LOG(grpc_authz_api, INFO)
<< "authorization policy reload status: successfully loaded new "
"policy\n"
<< file_contents_;
return absl::OkStatus();
}

@ -511,11 +511,10 @@ grpc_call_credentials* grpc_google_refresh_token_credentials_create(
const char* json_refresh_token, void* reserved) {
grpc_auth_refresh_token token =
grpc_auth_refresh_token_create_from_string(json_refresh_token);
if (GRPC_TRACE_FLAG_ENABLED(api)) {
LOG(INFO) << "grpc_refresh_token_credentials_create(json_refresh_token="
<< create_loggable_refresh_token(&token)
<< ", reserved=" << reserved << ")";
}
GRPC_TRACE_LOG(api, INFO)
<< "grpc_refresh_token_credentials_create(json_refresh_token="
<< create_loggable_refresh_token(&token) << ", reserved=" << reserved
<< ")";
CHECK_EQ(reserved, nullptr);
return grpc_refresh_token_credentials_create_from_auth_refresh_token(token)
.release();

@ -47,18 +47,16 @@ Timestamp BdpEstimator::CompletePing() {
1e-9 * static_cast<double>(dt_ts.tv_nsec);
double bw = dt > 0 ? (static_cast<double>(accumulator_) / dt) : 0;
Duration start_inter_ping_delay = inter_ping_delay_;
if (GRPC_TRACE_FLAG_ENABLED(bdp_estimator)) {
LOG(INFO) << "bdp[" << name_ << "]:complete acc=" << accumulator_
<< " est=" << estimate_ << " dt=" << dt << " bw=" << bw / 125000.0
<< "Mbs bw_est=" << bw_est_ / 125000.0 << "Mbs";
}
GRPC_TRACE_LOG(bdp_estimator, INFO)
<< "bdp[" << name_ << "]:complete acc=" << accumulator_
<< " est=" << estimate_ << " dt=" << dt << " bw=" << bw / 125000.0
<< "Mbs bw_est=" << bw_est_ / 125000.0 << "Mbs";
CHECK(ping_state_ == PingState::STARTED);
if (accumulator_ > 2 * estimate_ / 3 && bw > bw_est_) {
estimate_ = std::max(accumulator_, estimate_ * 2);
bw_est_ = bw;
if (GRPC_TRACE_FLAG_ENABLED(bdp_estimator)) {
LOG(INFO) << "bdp[" << name_ << "]: estimate increased to " << estimate_;
}
GRPC_TRACE_LOG(bdp_estimator, INFO)
<< "bdp[" << name_ << "]: estimate increased to " << estimate_;
inter_ping_delay_ /= 2; // if the ping estimate changes,
// exponentially get faster at probing
} else if (inter_ping_delay_ < Duration::Seconds(10)) {

@ -200,11 +200,9 @@ void CallFilters::CancelDueToFailedPipeOperation(SourceLocation but_where) {
void CallFilters::PushServerTrailingMetadata(ServerMetadataHandle md) {
CHECK(md != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(call)) {
LOG(INFO) << GetContext<Activity>()->DebugTag()
<< " PushServerTrailingMetadata[" << this
<< "]: " << md->DebugString() << " into " << DebugString();
}
GRPC_TRACE_LOG(call, INFO)
<< GetContext<Activity>()->DebugTag() << " PushServerTrailingMetadata["
<< this << "]: " << md->DebugString() << " into " << DebugString();
CHECK(md != nullptr);
if (call_state_.PushServerTrailingMetadata(
md->get(GrpcCallWasCancelled()).value_or(false))) {

@ -1174,17 +1174,16 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
if (response.client_stats_report_interval != Duration::Zero()) {
client_stats_report_interval_ = std::max(
Duration::Seconds(1), response.client_stats_report_interval);
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Received initial LB response message; client load "
"reporting interval = "
<< client_stats_report_interval_.millis()
<< " milliseconds";
}
} else if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Received initial LB response message; client load "
"reporting NOT enabled";
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Received initial LB response message; client load "
"reporting interval = "
<< client_stats_report_interval_.millis() << " milliseconds";
} else {
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Received initial LB response message; client load "
"reporting NOT enabled";
}
seen_initial_response_ = true;
break;
@ -1193,13 +1192,11 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
CHECK_NE(lb_call_, nullptr);
auto serverlist_wrapper =
MakeRefCounted<Serverlist>(std::move(response.serverlist));
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Serverlist with "
<< serverlist_wrapper->serverlist().size()
<< " servers received:\n"
<< serverlist_wrapper->AsText();
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Serverlist with " << serverlist_wrapper->serverlist().size()
<< " servers received:\n"
<< serverlist_wrapper->AsText();
seen_serverlist_ = true;
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
@ -1213,11 +1210,10 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
// Check if the serverlist differs from the previous one.
if (grpclb_policy()->serverlist_ != nullptr &&
*grpclb_policy()->serverlist_ == *serverlist_wrapper) {
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Incoming server list identical to current, "
"ignoring.";
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Incoming server list identical to current, "
"ignoring.";
} else { // New serverlist.
// Dispose of the fallback.
// TODO(roth): Ideally, we should stay in fallback mode until we
@ -1457,11 +1453,10 @@ GrpcLb::GrpcLb(Args args)
GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS)
.value_or(Duration::Milliseconds(
GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS)))) {
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << this << "] Will use '"
<< std::string(channel_control_helper()->GetAuthority())
<< "' as the server name for LB request.";
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << this << "] Will use '"
<< std::string(channel_control_helper()->GetAuthority())
<< "' as the server name for LB request.";
}
void GrpcLb::ShutdownLocked() {
@ -1542,9 +1537,7 @@ class GrpcLb::NullLbTokenEndpointIterator final
};
absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << this << "] received update";
}
GRPC_TRACE_LOG(glb, INFO) << "[grpclb " << this << "] received update";
const bool is_initial_update = lb_channel_ == nullptr;
config_ = args.config.TakeAsSubclass<GrpcLbConfig>();
CHECK(config_ != nullptr);
@ -1656,11 +1649,10 @@ void GrpcLb::StartBalancerCallLocked() {
// Init the LB call data.
CHECK(lb_calld_ == nullptr);
lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << this
<< "] Query for backends (lb_channel: " << lb_channel_.get()
<< ", lb_calld: " << lb_calld_.get() << ")";
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << this
<< "] Query for backends (lb_channel: " << lb_channel_.get()
<< ", lb_calld: " << lb_calld_.get() << ")";
lb_calld_->StartQuery();
}
@ -1695,9 +1687,8 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
void GrpcLb::OnBalancerCallRetryTimerLocked() {
lb_call_retry_timer_handle_.reset();
if (!shutting_down_ && lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << this << "] Restarting call to LB server";
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << this << "] Restarting call to LB server";
StartBalancerCallLocked();
}
}

@ -146,11 +146,9 @@ void HealthProducer::HealthChecker::OnConnectivityStateChangeLocked(
void HealthProducer::HealthChecker::NotifyWatchersLocked(
grpc_connectivity_state state, absl::Status status) {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthProducer " << producer_.get() << " HealthChecker "
<< this << ": reporting state " << ConnectivityStateName(state)
<< " to watchers";
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthProducer " << producer_.get() << " HealthChecker " << this
<< ": reporting state " << ConnectivityStateName(state) << " to watchers";
work_serializer_->Schedule(
[self = Ref(), state, status = std::move(status)]() {
MutexLock lock(&self->producer_->mu_);
@ -285,11 +283,10 @@ class HealthProducer::HealthChecker::HealthStreamEventHandler final
void SetHealthStatusLocked(SubchannelStreamClient* client,
grpc_connectivity_state state,
const char* reason) {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthCheckClient " << client
<< ": setting state=" << ConnectivityStateName(state)
<< " reason=" << reason;
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthCheckClient " << client
<< ": setting state=" << ConnectivityStateName(state)
<< " reason=" << reason;
health_checker_->OnHealthWatchStatusChange(
state, state == GRPC_CHANNEL_TRANSIENT_FAILURE
? absl::UnavailableError(reason)
@ -300,11 +297,9 @@ class HealthProducer::HealthChecker::HealthStreamEventHandler final
};
void HealthProducer::HealthChecker::StartHealthStreamLocked() {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthProducer " << producer_.get() << " HealthChecker "
<< this << ": creating HealthClient for \""
<< health_check_service_name_ << "\"";
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthProducer " << producer_.get() << " HealthChecker " << this
<< ": creating HealthClient for \"" << health_check_service_name_ << "\"";
stream_client_ = MakeOrphanable<SubchannelStreamClient>(
producer_->connected_subchannel_, producer_->subchannel_->pollset_set(),
std::make_unique<HealthStreamEventHandler>(Ref()),
@ -356,9 +351,8 @@ void HealthProducer::Start(RefCountedPtr<Subchannel> subchannel) {
}
void HealthProducer::Orphaned() {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthProducer " << this << ": shutting down";
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthProducer " << this << ": shutting down";
{
MutexLock lock(&mu_);
health_checkers_.clear();
@ -406,11 +400,10 @@ void HealthProducer::RemoveWatcher(
void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status) {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthProducer " << this
<< ": subchannel state update: state="
<< ConnectivityStateName(state) << " status=" << status;
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthProducer " << this
<< ": subchannel state update: state=" << ConnectivityStateName(state)
<< " status=" << status;
MutexLock lock(&mu_);
state_ = state;
status_ = status;
@ -432,11 +425,10 @@ void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
//
HealthWatcher::~HealthWatcher() {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthWatcher " << this << ": unregistering from producer "
<< producer_.get() << " (health_check_service_name=\""
<< health_check_service_name_.value_or("N/A") << "\")";
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthWatcher " << this << ": unregistering from producer "
<< producer_.get() << " (health_check_service_name=\""
<< health_check_service_name_.value_or("N/A") << "\")";
if (producer_ != nullptr) {
producer_->RemoveWatcher(this, health_check_service_name_);
}

@ -532,11 +532,10 @@ OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
RefCountedPtr<SubchannelPicker> picker,
bool counting_enabled)
: picker_(std::move(picker)), counting_enabled_(counting_enabled) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << outlier_detection_lb
<< "] constructed new picker " << this << " and counting "
<< "is " << (counting_enabled ? "enabled" : "disabled");
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << outlier_detection_lb
<< "] constructed new picker " << this << " and counting "
<< "is " << (counting_enabled ? "enabled" : "disabled");
}
LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
@ -574,9 +573,8 @@ LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
OutlierDetectionLb::OutlierDetectionLb(Args args)
: LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this << "] created";
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this << "] created";
}
OutlierDetectionLb::~OutlierDetectionLb() {
@ -586,9 +584,8 @@ OutlierDetectionLb::~OutlierDetectionLb() {
}
void OutlierDetectionLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this << "] shutting down";
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this << "] shutting down";
ejection_timer_.reset();
shutting_down_ = true;
// Remove the child policy's interested_parties pollset_set from the
@ -612,9 +609,8 @@ void OutlierDetectionLb::ResetBackoffLocked() {
}
absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this << "] Received update";
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this << "] Received update";
auto old_config = std::move(config_);
// Update config.
config_ = args.config.TakeAsSubclass<OutlierDetectionLbConfig>();
@ -627,9 +623,8 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
ejection_timer_.reset();
} else if (ejection_timer_ == nullptr) {
// No timer running. Start it now.
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this << "] starting timer";
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this << "] starting timer";
ejection_timer_ = MakeOrphanable<EjectionTimer>(
RefAsSubclass<OutlierDetectionLb>(), Timestamp::Now());
for (const auto& p : endpoint_state_map_) {
@ -687,11 +682,9 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
key, MakeRefCounted<EndpointState>(std::move(subchannels)));
} else if (!config_->CountingEnabled()) {
// If counting is not enabled, reset state.
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this
<< "] counting disabled; disabling ejection for "
<< key.ToString();
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this
<< "] counting disabled; disabling ejection for " << key.ToString();
it->second->DisableEjection();
}
});
@ -931,17 +924,14 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
const double success_rate_stdev_factor =
static_cast<double>(config.success_rate_ejection->stdev_factor) / 1000;
double ejection_threshold = mean - stdev * success_rate_stdev_factor;
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << parent_.get()
<< "] stdev=" << stdev
<< ", ejection_threshold=" << ejection_threshold;
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << parent_.get() << "] stdev=" << stdev
<< ", ejection_threshold=" << ejection_threshold;
for (auto& candidate : success_rate_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << parent_.get()
<< "] checking candidate " << candidate.first
<< ": success_rate=" << candidate.second;
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << parent_.get()
<< "] checking candidate " << candidate.first
<< ": success_rate=" << candidate.second;
if (candidate.second < ejection_threshold) {
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
double current_percent =
@ -979,11 +969,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
<< config.failure_percentage_ejection->enforcement_percentage;
}
for (auto& candidate : failure_percentage_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << parent_.get()
<< "] checking candidate " << candidate.first
<< ": success_rate=" << candidate.second;
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << parent_.get()
<< "] checking candidate " << candidate.first
<< ": success_rate=" << candidate.second;
// Extra check to make sure success rate algorithm didn't already
// eject this backend.
if (candidate.first->ejection_time().has_value()) continue;

@ -420,22 +420,16 @@ PickFirst::PickFirst(Args args)
.GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
.value_or(250),
100, 2000))) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << this << " created.";
}
GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " created.";
}
PickFirst::~PickFirst() {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Destroying Pick First " << this;
}
GRPC_TRACE_LOG(pick_first, INFO) << "Destroying Pick First " << this;
CHECK(subchannel_list_ == nullptr);
}
void PickFirst::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << this << " Shutting down";
}
GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " Shutting down";
shutdown_ = true;
UnsetSelectedSubchannel();
subchannel_list_.reset();
@ -444,9 +438,8 @@ void PickFirst::ShutdownLocked() {
void PickFirst::ExitIdleLocked() {
if (shutdown_) return;
if (IsIdle()) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << this << " exiting idle";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "Pick First " << this << " exiting idle";
AttemptToConnectUsingLatestUpdateArgsLocked();
}
}
@ -681,11 +674,10 @@ PickFirst::SubchannelList::SubchannelData::SubchannelState::SubchannelState(
}
void PickFirst::SubchannelList::SubchannelData::SubchannelState::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << pick_first_.get() << "] subchannel state " << this
<< " (subchannel " << subchannel_.get()
<< "): cancelling watch and unreffing subchannel";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << pick_first_.get() << "] subchannel state " << this
<< " (subchannel " << subchannel_.get()
<< "): cancelling watch and unreffing subchannel";
subchannel_data_ = nullptr;
subchannel_->CancelConnectivityStateWatch(watcher_);
watcher_ = nullptr;
@ -706,9 +698,8 @@ void PickFirst::SubchannelList::SubchannelData::SubchannelState::Select() {
// for the health status notification.
// If health checking is NOT enabled, report READY.
if (pick_first_->enable_health_watch_) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << pick_first_.get() << "] starting health watch";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << pick_first_.get() << "] starting health watch";
auto watcher = std::make_unique<HealthWatcher>(
pick_first_.Ref(DEBUG_LOCATION, "HealthWatcher"));
pick_first_->health_watcher_ = watcher.get();
@ -767,11 +758,10 @@ void PickFirst::SubchannelList::SubchannelData::SubchannelState::
}
// We aren't trying to connect, so we must be the selected subchannel.
CHECK(pick_first_->selected_.get() == this);
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << pick_first_.get()
<< " selected subchannel connectivity changed to "
<< ConnectivityStateName(new_state);
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "Pick First " << pick_first_.get()
<< " selected subchannel connectivity changed to "
<< ConnectivityStateName(new_state);
// Any state change is considered to be a failure of the existing
// connection. Report the failure.
auto& stats_plugins =
@ -791,11 +781,10 @@ PickFirst::SubchannelList::SubchannelData::SubchannelData(
SubchannelList* subchannel_list, size_t index,
RefCountedPtr<SubchannelInterface> subchannel)
: subchannel_list_(subchannel_list), index_(index) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << subchannel_list_->policy_.get()
<< "] subchannel list " << subchannel_list_ << " index " << index_
<< ": creating subchannel data";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << subchannel_list_->policy_.get() << "] subchannel list "
<< subchannel_list_ << " index " << index_
<< ": creating subchannel data";
subchannel_state_ =
MakeOrphanable<SubchannelState>(this, std::move(subchannel));
}
@ -856,11 +845,10 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
// is not in the new list. In that case, we drop the current
// connection and report IDLE.
if (p->selected_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << p << "] subchannel list " << subchannel_list_
<< ": new update has no subchannels in "
<< "state READY; dropping existing connection and going IDLE";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << p << "] subchannel list " << subchannel_list_
<< ": new update has no subchannels in state READY; dropping "
"existing connection and going IDLE";
p->GoIdle();
} else {
// Start trying to connect, starting with the first subchannel.
@ -1016,18 +1004,16 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
address.address(), address.args(), args_);
if (subchannel == nullptr) {
// Subchannel could not be created.
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << policy_.get()
<< "] could not create subchannel for address "
<< address.ToString() << ", ignoring";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << policy_.get()
<< "] could not create subchannel for address " << address.ToString()
<< ", ignoring";
return;
}
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << policy_.get() << "] subchannel list " << this
<< " index " << subchannels_.size() << ": Created subchannel "
<< subchannel.get() << " for address " << address.ToString();
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << policy_.get() << "] subchannel list " << this << " index "
<< subchannels_.size() << ": Created subchannel " << subchannel.get()
<< " for address " << address.ToString();
subchannels_.emplace_back(std::make_unique<SubchannelData>(
this, subchannels_.size(), std::move(subchannel)));
});
@ -1374,23 +1360,17 @@ OldPickFirst::OldPickFirst(Args args)
.GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
.value_or(250),
100, 2000))) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << this << " created.";
}
GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " created.";
}
OldPickFirst::~OldPickFirst() {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Destroying Pick First " << this;
}
GRPC_TRACE_LOG(pick_first, INFO) << "Destroying Pick First " << this;
CHECK(subchannel_list_ == nullptr);
CHECK(latest_pending_subchannel_list_ == nullptr);
}
void OldPickFirst::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << this << " Shutting down";
}
GRPC_TRACE_LOG(pick_first, INFO) << "Pick First " << this << " Shutting down";
shutdown_ = true;
UnsetSelectedSubchannel();
subchannel_list_.reset();
@ -1400,9 +1380,8 @@ void OldPickFirst::ShutdownLocked() {
void OldPickFirst::ExitIdleLocked() {
if (shutdown_) return;
if (IsIdle()) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << this << " exiting idle";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "Pick First " << this << " exiting idle";
AttemptToConnectUsingLatestUpdateArgsLocked();
}
}
@ -1597,11 +1576,10 @@ OldPickFirst::SubchannelList::SubchannelData::SubchannelData(
: subchannel_list_(subchannel_list),
index_(index),
subchannel_(std::move(subchannel)) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << subchannel_list_->policy_.get()
<< "] subchannel list " << subchannel_list_ << " index " << index_
<< " (subchannel " << subchannel_.get() << "): starting watch";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << subchannel_list_->policy_.get() << "] subchannel list "
<< subchannel_list_ << " index " << index_ << " (subchannel "
<< subchannel_.get() << "): starting watch";
auto watcher = std::make_unique<Watcher>(
subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"), index_);
pending_watcher_ = watcher.get();
@ -1658,11 +1636,9 @@ void OldPickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
// Handle updates for the currently selected subchannel.
if (p->selected_ == this) {
CHECK(subchannel_list_ == p->subchannel_list_.get());
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << p
<< " selected subchannel connectivity changed to "
<< ConnectivityStateName(new_state);
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "Pick First " << p << " selected subchannel connectivity changed to "
<< ConnectivityStateName(new_state);
// Any state change is considered to be a failure of the existing
// connection.
stats_plugins.AddCounter(
@ -1677,11 +1653,10 @@ void OldPickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
p->channel_control_helper()->RequestReresolution();
// If there is a pending update, switch to the pending update.
if (p->latest_pending_subchannel_list_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << p << " promoting pending subchannel "
<< "list " << p->latest_pending_subchannel_list_.get()
<< " to replace " << p->subchannel_list_.get();
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "Pick First " << p << " promoting pending subchannel "
<< "list " << p->latest_pending_subchannel_list_.get()
<< " to replace " << p->subchannel_list_.get();
p->UnsetSelectedSubchannel();
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
@ -1895,11 +1870,10 @@ void OldPickFirst::SubchannelList::SubchannelData::
subchannel_list_ == p->latest_pending_subchannel_list_.get());
// Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "Pick First " << p << " promoting pending subchannel list "
<< p->latest_pending_subchannel_list_.get() << " to replace "
<< p->subchannel_list_.get();
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "Pick First " << p << " promoting pending subchannel list "
<< p->latest_pending_subchannel_list_.get() << " to replace "
<< p->subchannel_list_.get();
p->UnsetSelectedSubchannel();
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
@ -1912,9 +1886,8 @@ void OldPickFirst::SubchannelList::SubchannelData::
// for the health status notification.
// If health checking is NOT enabled, report READY.
if (p->enable_health_watch_) {
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << p << "] starting health watch";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << p << "] starting health watch";
auto watcher = std::make_unique<HealthWatcher>(
p->RefAsSubclass<OldPickFirst>(DEBUG_LOCATION, "HealthWatcher"));
p->health_watcher_ = watcher.get();
@ -1959,18 +1932,16 @@ OldPickFirst::SubchannelList::SubchannelList(
address.address(), address.args(), args_);
if (subchannel == nullptr) {
// Subchannel could not be created.
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << policy_.get()
<< "] could not create subchannel for address "
<< address.ToString() << ", ignoring";
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << policy_.get()
<< "] could not create subchannel for address " << address.ToString()
<< ", ignoring";
return;
}
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << policy_.get() << "] subchannel list " << this
<< " index " << subchannels_.size() << ": Created subchannel "
<< subchannel.get() << " for address " << address.ToString();
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << policy_.get() << "] subchannel list " << this << " index "
<< subchannels_.size() << ": Created subchannel " << subchannel.get()
<< " for address " << address.ToString();
subchannels_.emplace_back(this, subchannels_.size(), std::move(subchannel));
});
}

@ -294,21 +294,17 @@ PriorityLb::PriorityLb(Args args)
channel_args()
.GetDurationFromIntMillis(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS)
.value_or(kDefaultChildFailoverTimeout))) {
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << this << "] created";
}
GRPC_TRACE_LOG(priority_lb, INFO) << "[priority_lb " << this << "] created";
}
PriorityLb::~PriorityLb() {
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << this << "] destroying priority LB policy";
}
GRPC_TRACE_LOG(priority_lb, INFO)
<< "[priority_lb " << this << "] destroying priority LB policy";
}
void PriorityLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << this << "] shutting down";
}
GRPC_TRACE_LOG(priority_lb, INFO)
<< "[priority_lb " << this << "] shutting down";
shutting_down_ = true;
children_.clear();
}
@ -316,11 +312,9 @@ void PriorityLb::ShutdownLocked() {
void PriorityLb::ExitIdleLocked() {
if (current_priority_ != UINT32_MAX) {
const std::string& child_name = config_->priorities()[current_priority_];
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << this
<< "] exiting IDLE for current priority " << current_priority_
<< " child " << child_name;
}
GRPC_TRACE_LOG(priority_lb, INFO)
<< "[priority_lb " << this << "] exiting IDLE for current priority "
<< current_priority_ << " child " << child_name;
children_[child_name]->ExitIdleLocked();
}
}
@ -330,9 +324,8 @@ void PriorityLb::ResetBackoffLocked() {
}
absl::Status PriorityLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << this << "] received update";
}
GRPC_TRACE_LOG(priority_lb, INFO)
<< "[priority_lb " << this << "] received update";
// Update config.
config_ = args.config.TakeAsSubclass<PriorityLbConfig>();
// Update args.
@ -451,11 +444,10 @@ void PriorityLb::ChoosePriorityLocked() {
}
// If we didn't find any priority to try, pick the first one in state
// CONNECTING.
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << this
<< "] no priority reachable, checking for CONNECTING priority to "
"delegate to";
}
GRPC_TRACE_LOG(priority_lb, INFO)
<< "[priority_lb " << this
<< "] no priority reachable, checking for CONNECTING priority to "
"delegate to";
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
// If the child for the priority does not exist yet, create it.
@ -530,11 +522,10 @@ PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
if (timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << child_priority_->priority_policy_.get()
<< "] child " << child_priority_->name_ << " ("
<< child_priority_.get() << "): reactivating";
}
GRPC_TRACE_LOG(priority_lb, INFO)
<< "[priority_lb " << child_priority_->priority_policy_.get()
<< "] child " << child_priority_->name_ << " (" << child_priority_.get()
<< "): reactivating";
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->Cancel(*timer_handle_);
@ -588,11 +579,10 @@ PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
if (timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << child_priority_->priority_policy_.get()
<< "] child " << child_priority_->name_ << " ("
<< child_priority_.get() << "): cancelling failover timer";
}
GRPC_TRACE_LOG(priority_lb, INFO)
<< "[priority_lb " << child_priority_->priority_policy_.get()
<< "] child " << child_priority_->name_ << " (" << child_priority_.get()
<< "): cancelling failover timer";
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->Cancel(*timer_handle_);
@ -686,11 +676,10 @@ absl::Status PriorityLb::ChildPriority::UpdateLocked(
update_args.resolution_note = priority_policy_->resolution_note_;
update_args.args = priority_policy_->args_;
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << priority_policy_.get() << "] child "
<< name_ << " (" << this << "): updating child policy handler "
<< child_policy_.get();
}
GRPC_TRACE_LOG(priority_lb, INFO)
<< "[priority_lb " << priority_policy_.get() << "] child " << name_
<< " (" << this << "): updating child policy handler "
<< child_policy_.get();
return child_policy_->UpdateLocked(std::move(update_args));
}
@ -704,11 +693,10 @@ PriorityLb::ChildPriority::CreateChildPolicyLocked(const ChannelArgs& args) {
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&priority_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(priority_lb)) {
LOG(INFO) << "[priority_lb " << priority_policy_.get() << "] child "
<< name_ << " (" << this << "): created new child policy handler "
<< lb_policy.get();
}
GRPC_TRACE_LOG(priority_lb, INFO)
<< "[priority_lb " << priority_policy_.get() << "] child " << name_
<< " (" << this << "): created new child policy handler "
<< lb_policy.get();
// Add the parent's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon
// activity on the parent LB, which in turn is tied to the application's call.

@ -597,21 +597,16 @@ void RingHash::RingHashEndpoint::OnStateUpdate(
//
RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(ring_hash_lb)) {
LOG(INFO) << "[RH " << this << "] Created";
}
GRPC_TRACE_LOG(ring_hash_lb, INFO) << "[RH " << this << "] Created";
}
RingHash::~RingHash() {
if (GRPC_TRACE_FLAG_ENABLED(ring_hash_lb)) {
LOG(INFO) << "[RH " << this << "] Destroying Ring Hash policy";
}
GRPC_TRACE_LOG(ring_hash_lb, INFO)
<< "[RH " << this << "] Destroying Ring Hash policy";
}
void RingHash::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(ring_hash_lb)) {
LOG(INFO) << "[RH " << this << "] Shutting down";
}
GRPC_TRACE_LOG(ring_hash_lb, INFO) << "[RH " << this << "] Shutting down";
shutdown_ = true;
endpoint_map_.clear();
}
@ -625,9 +620,7 @@ void RingHash::ResetBackoffLocked() {
absl::Status RingHash::UpdateLocked(UpdateArgs args) {
// Check address list.
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(ring_hash_lb)) {
LOG(INFO) << "[RH " << this << "] received update";
}
GRPC_TRACE_LOG(ring_hash_lb, INFO) << "[RH " << this << "] received update";
// De-dup endpoints, taking weight into account.
endpoints_.clear();
std::map<EndpointAddressSet, size_t> endpoint_indices;
@ -641,11 +634,10 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
int prev_weight_arg =
prev_endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
if (GRPC_TRACE_FLAG_ENABLED(ring_hash_lb)) {
LOG(INFO) << "[RH " << this << "] merging duplicate endpoint for "
<< key.ToString() << ", combined weight "
<< weight_arg + prev_weight_arg;
}
GRPC_TRACE_LOG(ring_hash_lb, INFO)
<< "[RH " << this << "] merging duplicate endpoint for "
<< key.ToString() << ", combined weight "
<< weight_arg + prev_weight_arg;
prev_endpoint = EndpointAddresses(
prev_endpoint.addresses(),
prev_endpoint.args().Set(GRPC_ARG_ADDRESS_WEIGHT,

@ -805,11 +805,9 @@ RlsLb::ChildPolicyWrapper::ChildPolicyWrapper(RefCountedPtr<RlsLb> lb_policy,
}
void RlsLb::ChildPolicyWrapper::Orphaned() {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << lb_policy_.get()
<< "] ChildPolicyWrapper=" << this << " [" << target_
<< "]: shutdown";
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
<< " [" << target_ << "]: shutdown";
is_shutdown_ = true;
lb_policy_->child_policy_map_.erase(target_);
if (child_policy_ != nullptr) {
@ -876,11 +874,9 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() {
*child_policy_config);
// Returned RLS target fails the validation.
if (!config.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << lb_policy_.get()
<< "] ChildPolicyWrapper=" << this << " [" << target_
<< "]: config failed to parse: " << config.status();
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
<< " [" << target_ << "]: config failed to parse: " << config.status();
pending_config_.reset();
picker_ = MakeRefCounted<TransientFailurePicker>(
absl::UnavailableError(config.status().message()));
@ -913,11 +909,10 @@ absl::Status RlsLb::ChildPolicyWrapper::MaybeFinishUpdate() {
lb_policy_->interested_parties());
}
// Send the child the updated config.
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << lb_policy_.get()
<< "] ChildPolicyWrapper=" << this << " [" << target_
<< "], updating child policy handler " << child_policy_.get();
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_.get() << "] ChildPolicyWrapper=" << this
<< " [" << target_ << "], updating child policy handler "
<< child_policy_.get();
UpdateArgs update_args;
update_args.config = std::move(pending_config_);
update_args.addresses = lb_policy_->addresses_;
@ -1430,9 +1425,8 @@ void RlsLb::Cache::Shutdown() {
if (cleanup_timer_handle_.has_value() &&
lb_policy_->channel_control_helper()->GetEventEngine()->Cancel(
*cleanup_timer_handle_)) {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << lb_policy_ << "] cache cleanup timer canceled";
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_ << "] cache cleanup timer canceled";
}
cleanup_timer_handle_.reset();
}
@ -1468,9 +1462,8 @@ void RlsLb::Cache::StartCleanupTimer() {
}
void RlsLb::Cache::OnCleanupTimer() {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << lb_policy_ << "] cache cleanup timer fired";
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_ << "] cache cleanup timer fired";
MutexLock lock(&lb_policy_->mu_);
if (!cleanup_timer_handle_.has_value()) return;
if (lb_policy_->is_shutdown_) return;
@ -1503,11 +1496,9 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
size_ -= map_it->second->Size();
map_.erase(map_it);
}
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << lb_policy_
<< "] LRU pass complete: desired size=" << bytes
<< " size=" << size_;
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_
<< "] LRU pass complete: desired size=" << bytes << " size=" << size_;
}
//
@ -1517,11 +1508,10 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
grpc_connectivity_state new_state, const absl::Status& status) {
auto* lb_policy = rls_channel_->lb_policy_.get();
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << lb_policy << "] RlsChannel=" << rls_channel_.get()
<< " StateWatcher=" << this << ": state changed to "
<< ConnectivityStateName(new_state) << " (" << status << ")";
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy << "] RlsChannel=" << rls_channel_.get()
<< " StateWatcher=" << this << ": state changed to "
<< ConnectivityStateName(new_state) << " (" << status << ")";
if (rls_channel_->is_shutdown_) return;
MutexLock lock(&lb_policy->mu_);
if (new_state == GRPC_CHANNEL_READY && was_transient_failure_) {
@ -1614,11 +1604,10 @@ RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
channel_.reset(Channel::FromC(
grpc_channel_create(lb_policy_->config_->lookup_service().c_str(),
creds.get(), args.ToC().get())));
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this
<< ": created channel " << channel_.get() << " for "
<< lb_policy_->config_->lookup_service();
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_.get() << "] RlsChannel=" << this
<< ": created channel " << channel_.get() << " for "
<< lb_policy_->config_->lookup_service();
if (channel_ != nullptr) {
// Set up channelz linkage.
channelz::ChannelNode* child_channelz_node = channel_->channelz_node();
@ -1821,11 +1810,9 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
grpc_call_unref(call_);
call_ = nullptr;
// Return result to cache.
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << lb_policy_.get() << "] rls_request=" << this
<< " " << key_.ToString()
<< ": response info: " << response.ToString();
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " "
<< key_.ToString() << ": response info: " << response.ToString();
std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
{
MutexLock lock(&lb_policy_->mu_);
@ -1940,9 +1927,7 @@ RlsLb::RlsLb(Args args)
cache_.ReportMetricsLocked(reporter);
},
Duration::Seconds(5), kMetricCacheSize, kMetricCacheEntries)) {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << this << "] policy created";
}
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy created";
}
bool EndpointsEqual(
@ -1967,9 +1952,7 @@ bool EndpointsEqual(
}
absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << this << "] policy updated";
}
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy updated";
update_in_progress_ = true;
// Swap out config.
RefCountedPtr<RlsLbConfig> old_config = std::move(config_);
@ -2002,16 +1985,14 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
if (old_config == nullptr ||
config_->default_target() != old_config->default_target()) {
if (config_->default_target().empty()) {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << this << "] unsetting default target";
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] unsetting default target";
default_child_policy_.reset();
} else {
auto it = child_policy_map_.find(config_->default_target());
if (it == child_policy_map_.end()) {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << this << "] creating new default target";
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] creating new default target";
default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
RefAsSubclass<RlsLb>(DEBUG_LOCATION, "ChildPolicyWrapper"),
config_->default_target());
@ -2040,9 +2021,8 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
}
// Start update of child policies if needed.
if (update_child_policies) {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << this << "] starting child policy updates";
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] starting child policy updates";
for (auto& p : child_policy_map_) {
p.second->StartUpdate();
}
@ -2055,9 +2035,8 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
// Now that we've released the lock, finish update of child policies.
std::vector<std::string> errors;
if (update_child_policies) {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << this << "] finishing child policy updates";
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] finishing child policy updates";
for (auto& p : child_policy_map_) {
absl::Status status = p.second->MaybeFinishUpdate();
if (!status.ok()) {
@ -2109,9 +2088,7 @@ void RlsLb::ResetBackoffLocked() {
}
void RlsLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << this << "] policy shutdown";
}
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy shutdown";
registered_metric_callback_.reset();
MutexLock lock(&mu_);
is_shutdown_ = true;
@ -2153,9 +2130,7 @@ void RlsLb::UpdatePickerLocked() {
// all children. This avoids unnecessary picker churn while an update
// is being propagated to our children.
if (update_in_progress_) return;
if (GRPC_TRACE_FLAG_ENABLED(rls_lb)) {
LOG(INFO) << "[rlslb " << this << "] updating picker";
}
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] updating picker";
grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
if (!child_policy_map_.empty()) {
state = GRPC_CHANNEL_TRANSIENT_FAILURE;

@ -196,11 +196,9 @@ RoundRobin::Picker::Picker(
RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
pickers_.size();
if (GRPC_TRACE_FLAG_ENABLED(round_robin)) {
LOG(INFO) << "[RR " << parent_ << " picker " << this
<< "] using picker index " << index
<< ", picker=" << pickers_[index].get();
}
GRPC_TRACE_LOG(round_robin, INFO)
<< "[RR " << parent_ << " picker " << this << "] using picker index "
<< index << ", picker=" << pickers_[index].get();
return pickers_[index]->Pick(args);
}
@ -209,23 +207,18 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
//
RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(round_robin)) {
LOG(INFO) << "[RR " << this << "] Created";
}
GRPC_TRACE_LOG(round_robin, INFO) << "[RR " << this << "] Created";
}
RoundRobin::~RoundRobin() {
if (GRPC_TRACE_FLAG_ENABLED(round_robin)) {
LOG(INFO) << "[RR " << this << "] Destroying Round Robin policy";
}
GRPC_TRACE_LOG(round_robin, INFO)
<< "[RR " << this << "] Destroying Round Robin policy";
CHECK(endpoint_list_ == nullptr);
CHECK(latest_pending_endpoint_list_ == nullptr);
}
void RoundRobin::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(round_robin)) {
LOG(INFO) << "[RR " << this << "] Shutting down";
}
GRPC_TRACE_LOG(round_robin, INFO) << "[RR " << this << "] Shutting down";
shutdown_ = true;
endpoint_list_.reset();
latest_pending_endpoint_list_.reset();
@ -241,9 +234,7 @@ void RoundRobin::ResetBackoffLocked() {
absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
EndpointAddressesIterator* addresses = nullptr;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(round_robin)) {
LOG(INFO) << "[RR " << this << "] received update";
}
GRPC_TRACE_LOG(round_robin, INFO) << "[RR " << this << "] received update";
addresses = args.addresses->get();
} else {
GRPC_TRACE_LOG(round_robin, INFO)
@ -416,11 +407,10 @@ void RoundRobin::RoundRobinEndpointList::
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(nullptr));
} else if (num_transient_failure_ == size()) {
if (GRPC_TRACE_FLAG_ENABLED(round_robin)) {
LOG(INFO) << "[RR " << round_robin
<< "] reporting TRANSIENT_FAILURE with child list " << this
<< ": " << status_for_tf;
}
GRPC_TRACE_LOG(round_robin, INFO)
<< "[RR " << round_robin
<< "] reporting TRANSIENT_FAILURE with child list " << this << ": "
<< status_for_tf;
if (!status_for_tf.ok()) {
last_failure_ = absl::UnavailableError(
absl::StrCat("connections to all backends failing; last error: ",

@ -555,11 +555,10 @@ WeightedRoundRobin::Picker::Picker(RefCountedPtr<WeightedRoundRobin> wrr,
}
global_stats().IncrementWrrSubchannelListSize(endpoint_list->size());
global_stats().IncrementWrrSubchannelReadySize(endpoints_.size());
if (GRPC_TRACE_FLAG_ENABLED(weighted_round_robin_lb)) {
LOG(INFO) << "[WRR " << wrr_.get() << " picker " << this
<< "] created picker from endpoint_list=" << endpoint_list
<< " with " << endpoints_.size() << " subchannels";
}
GRPC_TRACE_LOG(weighted_round_robin_lb, INFO)
<< "[WRR " << wrr_.get() << " picker " << this
<< "] created picker from endpoint_list=" << endpoint_list << " with "
<< endpoints_.size() << " subchannels";
// Note: BuildSchedulerAndStartTimerLocked() passes out pointers to `this`,
// so we need to ensure that we really hold timer_mu_.
MutexLock lock(&timer_mu_);
@ -584,11 +583,9 @@ WeightedRoundRobin::PickResult WeightedRoundRobin::Picker::Pick(PickArgs args) {
size_t index = PickIndex();
CHECK(index < endpoints_.size());
auto& endpoint_info = endpoints_[index];
if (GRPC_TRACE_FLAG_ENABLED(weighted_round_robin_lb)) {
LOG(INFO) << "[WRR " << wrr_.get() << " picker " << this
<< "] returning index " << index
<< ", picker=" << endpoint_info.picker.get();
}
GRPC_TRACE_LOG(weighted_round_robin_lb, INFO)
<< "[WRR " << wrr_.get() << " picker " << this << "] returning index "
<< index << ", picker=" << endpoint_info.picker.get();
auto result = endpoint_info.picker->Pick(args);
// Collect per-call utilization data if needed.
if (!config_->enable_oob_load_report()) {
@ -665,11 +662,10 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
scheduler_ = std::move(scheduler);
}
// Start timer.
if (GRPC_TRACE_FLAG_ENABLED(weighted_round_robin_lb)) {
LOG(INFO) << "[WRR " << wrr_.get() << " picker " << this
<< "] scheduling timer for "
<< config_->weight_update_period().ToString();
}
GRPC_TRACE_LOG(weighted_round_robin_lb, INFO)
<< "[WRR " << wrr_.get() << " picker " << this
<< "] scheduling timer for "
<< config_->weight_update_period().ToString();
// It's insufficient to hold the implicit constructor lock here, a real lock
// over timer_mu_ is needed: we update timer_handle_ after the timer is
// scheduled, but it may run on another thread before that occurs, causing a
@ -713,17 +709,15 @@ WeightedRoundRobin::WeightedRoundRobin(Args args)
}
WeightedRoundRobin::~WeightedRoundRobin() {
if (GRPC_TRACE_FLAG_ENABLED(weighted_round_robin_lb)) {
LOG(INFO) << "[WRR " << this << "] Destroying Round Robin policy";
}
GRPC_TRACE_LOG(weighted_round_robin_lb, INFO)
<< "[WRR " << this << "] Destroying Round Robin policy";
CHECK(endpoint_list_ == nullptr);
CHECK(latest_pending_endpoint_list_ == nullptr);
}
void WeightedRoundRobin::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(weighted_round_robin_lb)) {
LOG(INFO) << "[WRR " << this << "] Shutting down";
}
GRPC_TRACE_LOG(weighted_round_robin_lb, INFO)
<< "[WRR " << this << "] Shutting down";
shutdown_ = true;
endpoint_list_.reset();
latest_pending_endpoint_list_.reset();
@ -741,9 +735,8 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
config_ = args.config.TakeAsSubclass<WeightedRoundRobinConfig>();
std::shared_ptr<EndpointAddressesIterator> addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(weighted_round_robin_lb)) {
LOG(INFO) << "[WRR " << this << "] received update";
}
GRPC_TRACE_LOG(weighted_round_robin_lb, INFO)
<< "[WRR " << this << "] received update";
// Weed out duplicate endpoints. Also sort the endpoints so that if
// the set of endpoints doesn't change, their indexes in the endpoint
// list don't change, since this avoids unnecessary churn in the
@ -992,11 +985,9 @@ void WeightedRoundRobin::WrrEndpointList::
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(nullptr));
} else if (num_transient_failure_ == size()) {
if (GRPC_TRACE_FLAG_ENABLED(weighted_round_robin_lb)) {
LOG(INFO) << "[WRR " << wrr
<< "] reporting TRANSIENT_FAILURE with endpoint list " << this
<< ": " << status_for_tf;
}
GRPC_TRACE_LOG(weighted_round_robin_lb, INFO)
<< "[WRR " << wrr << "] reporting TRANSIENT_FAILURE with endpoint list "
<< this << ": " << status_for_tf;
if (!status_for_tf.ok()) {
last_failure_ = absl::UnavailableError(
absl::StrCat("connections to all backends failing; last error: ",

@ -284,9 +284,8 @@ WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
WeightedTargetLb::WeightedTargetLb(Args args)
: LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
LOG(INFO) << "[weighted_target_lb " << this << "] created";
}
GRPC_TRACE_LOG(weighted_target_lb, INFO)
<< "[weighted_target_lb " << this << "] created";
}
WeightedTargetLb::~WeightedTargetLb() {
@ -296,9 +295,8 @@ WeightedTargetLb::~WeightedTargetLb() {
}
void WeightedTargetLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
LOG(INFO) << "[weighted_target_lb " << this << "] shutting down";
}
GRPC_TRACE_LOG(weighted_target_lb, INFO)
<< "[weighted_target_lb " << this << "] shutting down";
shutting_down_ = true;
targets_.clear();
}
@ -309,9 +307,8 @@ void WeightedTargetLb::ResetBackoffLocked() {
absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
if (shutting_down_) return absl::OkStatus();
if (GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
LOG(INFO) << "[weighted_target_lb " << this << "] received update";
}
GRPC_TRACE_LOG(weighted_target_lb, INFO)
<< "[weighted_target_lb " << this << "] received update";
update_in_progress_ = true;
// Update config.
config_ = args.config.TakeAsSubclass<WeightedTargetLbConfig>();
@ -528,20 +525,16 @@ WeightedTargetLb::WeightedChild::WeightedChild(
}
WeightedTargetLb::WeightedChild::~WeightedChild() {
if (GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
LOG(INFO) << "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_
<< ": destroying child";
}
GRPC_TRACE_LOG(weighted_target_lb, INFO)
<< "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_ << ": destroying child";
weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild");
}
void WeightedTargetLb::WeightedChild::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
LOG(INFO) << "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_
<< ": shutting down child";
}
GRPC_TRACE_LOG(weighted_target_lb, INFO)
<< "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_ << ": shutting down child";
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(
@ -566,11 +559,10 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&weighted_target_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
LOG(INFO) << "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_
<< ": created new child policy handler " << lb_policy.get();
}
GRPC_TRACE_LOG(weighted_target_lb, INFO)
<< "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_
<< ": created new child policy handler " << lb_policy.get();
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
@ -594,11 +586,9 @@ absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
weight_ = config.weight;
// Reactivate if needed.
if (delayed_removal_timer_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
LOG(INFO) << "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_
<< ": reactivating";
}
GRPC_TRACE_LOG(weighted_target_lb, INFO)
<< "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_ << ": reactivating";
delayed_removal_timer_.reset();
}
// Create child policy if needed.
@ -613,11 +603,10 @@ absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
update_args.resolution_note = resolution_note;
update_args.args = std::move(args);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
LOG(INFO) << "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_
<< ": updating child policy handler " << child_policy_.get();
}
GRPC_TRACE_LOG(weighted_target_lb, INFO)
<< "[weighted_target_lb " << weighted_target_policy_.get()
<< "] WeightedChild " << this << " " << name_
<< ": updating child policy handler " << child_policy_.get();
return child_policy_->UpdateLocked(std::move(update_args));
}

@ -183,21 +183,16 @@ class CdsLb final : public LoadBalancingPolicy {
//
CdsLb::CdsLb(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(cds_lb)) {
LOG(INFO) << "[cdslb " << this << "] created";
}
GRPC_TRACE_LOG(cds_lb, INFO) << "[cdslb " << this << "] created";
}
CdsLb::~CdsLb() {
if (GRPC_TRACE_FLAG_ENABLED(cds_lb)) {
LOG(INFO) << "[cdslb " << this << "] destroying cds LB policy";
}
GRPC_TRACE_LOG(cds_lb, INFO)
<< "[cdslb " << this << "] destroying cds LB policy";
}
void CdsLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(cds_lb)) {
LOG(INFO) << "[cdslb " << this << "] shutting down";
}
GRPC_TRACE_LOG(cds_lb, INFO) << "[cdslb " << this << "] shutting down";
shutting_down_ = true;
ResetState();
}
@ -280,11 +275,10 @@ class PriorityEndpointIterator final : public EndpointAddressesIterator {
absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
// Get new config.
auto new_config = args.config.TakeAsSubclass<CdsLbConfig>();
if (GRPC_TRACE_FLAG_ENABLED(cds_lb)) {
LOG(INFO) << "[cdslb " << this
<< "] received update: cluster=" << new_config->cluster()
<< " is_dynamic=" << new_config->is_dynamic();
}
GRPC_TRACE_LOG(cds_lb, INFO)
<< "[cdslb " << this
<< "] received update: cluster=" << new_config->cluster()
<< " is_dynamic=" << new_config->is_dynamic();
CHECK(new_config != nullptr);
// Cluster name should never change, because we should use a different
// child name in xds_cluster_manager in that case.
@ -295,11 +289,9 @@ absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
}
// Start dynamic subscription if needed.
if (new_config->is_dynamic() && subscription_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(cds_lb)) {
LOG(INFO) << "[cdslb " << this
<< "] obtaining dynamic subscription for cluster "
<< cluster_name_;
}
GRPC_TRACE_LOG(cds_lb, INFO)
<< "[cdslb " << this << "] obtaining dynamic subscription for cluster "
<< cluster_name_;
auto* dependency_mgr = args.args.GetObject<XdsDependencyManager>();
if (dependency_mgr == nullptr) {
// Should never happen.
@ -326,11 +318,10 @@ absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
// If we are already subscribed, it's possible that we just
// recently subscribed but another update came through before we
// got the new cluster, in which case it will still be missing.
if (GRPC_TRACE_FLAG_ENABLED(cds_lb)) {
LOG(INFO) << "[cdslb " << this
<< "] xDS config has no entry for dynamic cluster "
<< cluster_name_ << ", waiting for subsequent update";
}
GRPC_TRACE_LOG(cds_lb, INFO)
<< "[cdslb " << this
<< "] xDS config has no entry for dynamic cluster " << cluster_name_
<< ", waiting for subsequent update";
// Stay in CONNECTING until we get an update that has the cluster.
return absl::OkStatus();
}
@ -452,11 +443,9 @@ absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
}
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
interested_parties());
if (GRPC_TRACE_FLAG_ENABLED(cds_lb)) {
LOG(INFO) << "[cdslb " << this << "] created child policy "
<< (*child_config)->name() << " (" << child_policy_.get()
<< ")";
}
GRPC_TRACE_LOG(cds_lb, INFO)
<< "[cdslb " << this << "] created child policy "
<< (*child_config)->name() << " (" << child_policy_.get() << ")";
}
// Update child policy.
update_args.config = std::move(*child_config);

@ -511,9 +511,8 @@ XdsClusterImplLb::~XdsClusterImplLb() {
}
void XdsClusterImplLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_impl_lb)) {
LOG(INFO) << "[xds_cluster_impl_lb " << this << "] shutting down";
}
GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
<< "[xds_cluster_impl_lb " << this << "] shutting down";
shutting_down_ = true;
ResetState();
xds_client_.reset(DEBUG_LOCATION, "XdsClusterImpl");
@ -560,9 +559,8 @@ std::string GetEdsResourceName(const XdsClusterResource& cluster_resource) {
}
absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_impl_lb)) {
LOG(INFO) << "[xds_cluster_impl_lb " << this << "] Received update";
}
GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
<< "[xds_cluster_impl_lb " << this << "] Received update";
// Grab new LB policy config.
auto new_config = args.config.TakeAsSubclass<XdsClusterImplLbConfig>();
// Cluster name should never change, because the cds policy will assign a
@ -732,11 +730,10 @@ void XdsClusterImplLb::MaybeUpdatePickerLocked() {
// whether) the child has reported.
if (drop_config_ != nullptr && drop_config_->drop_all()) {
auto drop_picker = MakeRefCounted<Picker>(this, picker_);
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_impl_lb)) {
LOG(INFO) << "[xds_cluster_impl_lb " << this
<< "] updating connectivity (drop all): state=READY picker="
<< drop_picker.get();
}
GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
<< "[xds_cluster_impl_lb " << this
<< "] updating connectivity (drop all): state=READY picker="
<< drop_picker.get();
channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
std::move(drop_picker));
return;

@ -255,9 +255,8 @@ XdsClusterManagerLb::~XdsClusterManagerLb() {
}
void XdsClusterManagerLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_manager_lb)) {
LOG(INFO) << "[xds_cluster_manager_lb " << this << "] shutting down";
}
GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
<< "[xds_cluster_manager_lb " << this << "] shutting down";
shutting_down_ = true;
children_.clear();
}
@ -272,9 +271,8 @@ void XdsClusterManagerLb::ResetBackoffLocked() {
absl::Status XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
if (shutting_down_) return absl::OkStatus();
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_manager_lb)) {
LOG(INFO) << "[xds_cluster_manager_lb " << this << "] Received update";
}
GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
<< "[xds_cluster_manager_lb " << this << "] Received update";
update_in_progress_ = true;
// Update config.
config_ = args.config.TakeAsSubclass<XdsClusterManagerLbConfig>();
@ -365,22 +363,18 @@ void XdsClusterManagerLb::UpdateStateLocked() {
} else {
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
}
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_manager_lb)) {
LOG(INFO) << "[xds_cluster_manager_lb " << this
<< "] connectivity changed to "
<< ConnectivityStateName(connectivity_state);
}
GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
<< "[xds_cluster_manager_lb " << this << "] connectivity changed to "
<< ConnectivityStateName(connectivity_state);
ClusterPicker::ClusterMap cluster_map;
for (const auto& p : config_->cluster_map()) {
const std::string& cluster_name = p.first;
RefCountedPtr<SubchannelPicker>& child_picker = cluster_map[cluster_name];
child_picker = children_[cluster_name]->picker();
if (child_picker == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_manager_lb)) {
LOG(INFO) << "[xds_cluster_manager_lb " << this << "] child "
<< cluster_name
<< " has not yet returned a picker; creating a QueuePicker.";
}
GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
<< "[xds_cluster_manager_lb " << this << "] child " << cluster_name
<< " has not yet returned a picker; creating a QueuePicker.";
child_picker =
MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
}
@ -418,11 +412,9 @@ XdsClusterManagerLb::ClusterChild::~ClusterChild() {
}
void XdsClusterManagerLb::ClusterChild::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_manager_lb)) {
LOG(INFO) << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
<< "] ClusterChild " << this << " " << name_
<< ": shutting down child";
}
GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
<< "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
<< "] ClusterChild " << this << " " << name_ << ": shutting down child";
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(
@ -453,11 +445,10 @@ XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&xds_cluster_manager_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_manager_lb)) {
LOG(INFO) << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
<< "] ClusterChild " << this << " " << name_
<< ": Created new child policy handler " << lb_policy.get();
}
GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
<< "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
<< "] ClusterChild " << this << " " << name_
<< ": Created new child policy handler " << lb_policy.get();
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
@ -490,11 +481,10 @@ absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
update_args.addresses = addresses;
update_args.args = args;
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(xds_cluster_manager_lb)) {
LOG(INFO) << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
<< "] ClusterChild " << this << " " << name_
<< ": Updating child policy handler " << child_policy_.get();
}
GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
<< "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
<< "] ClusterChild " << this << " " << name_
<< ": Updating child policy handler " << child_policy_.get();
return child_policy_->UpdateLocked(std::move(update_args));
}

@ -486,18 +486,15 @@ XdsOverrideHostLb::Picker::PickOverridenHost(
if (it == policy_->subchannel_map_.end()) continue;
if (!override_host_health_status_set_.Contains(
it->second->eds_health_status())) {
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "Subchannel " << address
<< " health status is not overridden ("
<< it->second->eds_health_status().ToString() << ")";
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "Subchannel " << address << " health status is not overridden ("
<< it->second->eds_health_status().ToString() << ")";
continue;
}
auto subchannel = it->second->GetSubchannelRef();
if (subchannel == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "No subchannel for " << address;
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "No subchannel for " << address;
if (address_with_no_subchannel.empty()) {
address_with_no_subchannel = it->first;
}
@ -507,9 +504,8 @@ XdsOverrideHostLb::Picker::PickOverridenHost(
if (connectivity_state == GRPC_CHANNEL_READY) {
// Found a READY subchannel. Pass back the actual address list
// and return the subchannel.
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "Picker override found READY subchannel " << address;
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "Picker override found READY subchannel " << address;
it->second->set_last_used_time();
override_host_attr->set_actual_address_list(it->second->address_list());
return PickResult::Complete(subchannel->wrapped_subchannel());
@ -523,9 +519,8 @@ XdsOverrideHostLb::Picker::PickOverridenHost(
// No READY subchannel found. If we found an IDLE subchannel, trigger
// a connection attempt and queue the pick until that attempt completes.
if (idle_subchannel != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "Picker override found IDLE subchannel";
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "Picker override found IDLE subchannel";
// Deletes itself after the connection is requested.
new SubchannelConnectionRequester(std::move(idle_subchannel));
return PickResult::Queue();
@ -533,18 +528,16 @@ XdsOverrideHostLb::Picker::PickOverridenHost(
// No READY or IDLE subchannels. If we found a CONNECTING subchannel,
// queue the pick and wait for the connection attempt to complete.
if (found_connecting) {
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "Picker override found CONNECTING subchannel";
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "Picker override found CONNECTING subchannel";
return PickResult::Queue();
}
// No READY, IDLE, or CONNECTING subchannels found. If we found an
// entry that has no subchannel, then queue the pick and trigger
// creation of a subchannel for that entry.
if (!address_with_no_subchannel.empty()) {
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "Picker override found entry with no subchannel";
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "Picker override found entry with no subchannel";
if (!IsWorkSerializerDispatchEnabled()) {
new SubchannelCreationRequester(policy_, address_with_no_subchannel);
} else {
@ -645,9 +638,8 @@ void XdsOverrideHostLb::IdleTimer::OnTimerLocked() {
XdsOverrideHostLb::XdsOverrideHostLb(Args args)
: LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "[xds_override_host_lb " << this << "] created";
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "[xds_override_host_lb " << this << "] created";
}
XdsOverrideHostLb::~XdsOverrideHostLb() {
@ -657,9 +649,8 @@ XdsOverrideHostLb::~XdsOverrideHostLb() {
}
void XdsOverrideHostLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "[xds_override_host_lb " << this << "] shutting down";
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "[xds_override_host_lb " << this << "] shutting down";
shutting_down_ = true;
ResetState();
}
@ -726,11 +717,9 @@ class ChildEndpointIterator final : public EndpointAddressesIterator {
parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
if (status.status() != XdsHealthStatus::kDraining) {
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "[xds_override_host_lb " << this << "] endpoint "
<< endpoint.ToString()
<< ": not draining, passing to child";
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "[xds_override_host_lb " << this << "] endpoint "
<< endpoint.ToString() << ": not draining, passing to child";
callback(endpoint);
}
});
@ -741,9 +730,8 @@ class ChildEndpointIterator final : public EndpointAddressesIterator {
};
absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(xds_override_host_lb)) {
LOG(INFO) << "[xds_override_host_lb " << this << "] Received update";
}
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "[xds_override_host_lb " << this << "] Received update";
// Grab new LB policy config.
if (args.config == nullptr) {
return absl::InvalidArgumentError("Missing policy config");

@ -136,15 +136,13 @@ XdsWrrLocalityLb::XdsWrrLocalityLb(Args args)
: LoadBalancingPolicy(std::move(args)) {}
XdsWrrLocalityLb::~XdsWrrLocalityLb() {
if (GRPC_TRACE_FLAG_ENABLED(xds_wrr_locality_lb)) {
LOG(INFO) << "[xds_wrr_locality_lb " << this << "] destroying";
}
GRPC_TRACE_LOG(xds_wrr_locality_lb, INFO)
<< "[xds_wrr_locality_lb " << this << "] destroying";
}
void XdsWrrLocalityLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(xds_wrr_locality_lb)) {
LOG(INFO) << "[xds_wrr_locality_lb " << this << "] shutting down";
}
GRPC_TRACE_LOG(xds_wrr_locality_lb, INFO)
<< "[xds_wrr_locality_lb " << this << "] shutting down";
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
@ -161,9 +159,8 @@ void XdsWrrLocalityLb::ResetBackoffLocked() {
}
absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(xds_wrr_locality_lb)) {
LOG(INFO) << "[xds_wrr_locality_lb " << this << "] Received update";
}
GRPC_TRACE_LOG(xds_wrr_locality_lb, INFO)
<< "[xds_wrr_locality_lb " << this << "] Received update";
auto config = args.config.TakeAsSubclass<XdsWrrLocalityLbConfig>();
// Scan the addresses to find the weight for each locality.
std::map<RefCountedStringValue, uint32_t> locality_weights;
@ -203,11 +200,9 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
})},
}),
});
if (GRPC_TRACE_FLAG_ENABLED(xds_wrr_locality_lb)) {
LOG(INFO) << "[xds_wrr_locality_lb " << this
<< "] generated child policy config: "
<< JsonDump(child_config_json, /*indent=*/1);
}
GRPC_TRACE_LOG(xds_wrr_locality_lb, INFO)
<< "[xds_wrr_locality_lb " << this << "] generated child policy config: "
<< JsonDump(child_config_json, /*indent=*/1);
// Parse config.
auto child_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(

@ -30,6 +30,7 @@
#include "src/core/lib/gprpp/crash.h"
extern int gpr_should_log(gpr_log_severity severity);
extern void gpr_log_message(const char* file, int line,
gpr_log_severity severity, const char* message);

@ -47,6 +47,7 @@
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/examine_stack.h"
extern int gpr_should_log(gpr_log_severity severity);
extern void gpr_log_message(const char* file, int line,
gpr_log_severity severity, const char* message);

@ -40,6 +40,8 @@ void gpr_unreachable_code(const char* reason, const char* file, int line) {
grpc_core::SourceLocation(file, line));
}
int absl_vlog2_enabled() { return ABSL_VLOG_IS_ON(2); }
int gpr_should_log(gpr_log_severity severity) {
switch (severity) {
case GPR_LOG_SEVERITY_ERROR:
@ -119,10 +121,3 @@ void gpr_log_verbosity_init(void) {
}
#endif // GRPC_VERBOSITY_MACRO
}
void gpr_set_log_function([[maybe_unused]] gpr_log_func deprecated_setting) {
LOG(ERROR)
<< "This function is deprecated. This function will be deleted in the "
"next gRPC release. You may create a new absl LogSink with similar "
"functionality. gRFC: https://github.com/grpc/proposal/pull/425 ";
}

@ -38,6 +38,7 @@
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/examine_stack.h"
extern int gpr_should_log(gpr_log_severity severity);
extern void gpr_log_message(const char* file, int line,
gpr_log_severity severity, const char* message);

@ -33,6 +33,7 @@
#include "src/core/lib/gprpp/examine_stack.h"
#include "src/core/util/string.h"
extern int gpr_should_log(gpr_log_severity severity);
extern void gpr_log_message(const char* file, int line,
gpr_log_severity severity, const char* message);

@ -32,22 +32,23 @@ class ReflectionServicer(BaseReflectionServicer):
) -> AsyncIterable[_reflection_pb2.ServerReflectionResponse]:
async for request in request_iterator:
if request.HasField("file_by_filename"):
yield self._file_by_filename(request.file_by_filename)
yield self._file_by_filename(request, request.file_by_filename)
elif request.HasField("file_containing_symbol"):
yield self._file_containing_symbol(
request.file_containing_symbol
request, request.file_containing_symbol
)
elif request.HasField("file_containing_extension"):
yield self._file_containing_extension(
request,
request.file_containing_extension.containing_type,
request.file_containing_extension.extension_number,
)
elif request.HasField("all_extension_numbers_of_type"):
yield self._all_extension_numbers_of_type(
request.all_extension_numbers_of_type
request, request.all_extension_numbers_of_type
)
elif request.HasField("list_services"):
yield self._list_services()
yield self._list_services(request)
else:
yield _reflection_pb2.ServerReflectionResponse(
error_response=_reflection_pb2.ErrorResponse(
@ -55,7 +56,8 @@ class ReflectionServicer(BaseReflectionServicer):
error_message=grpc.StatusCode.INVALID_ARGUMENT.value[
1
].encode(),
)
),
original_request=request,
)

@ -22,12 +22,13 @@ from grpc_reflection.v1alpha import reflection_pb2_grpc as _reflection_pb2_grpc
_POOL = descriptor_pool.Default()
def _not_found_error():
def _not_found_error(original_request):
return _reflection_pb2.ServerReflectionResponse(
error_response=_reflection_pb2.ErrorResponse(
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
)
),
original_request=original_request,
)
@ -39,7 +40,7 @@ def _collect_transitive_dependencies(descriptor, seen_files):
_collect_transitive_dependencies(dependency, seen_files)
def _file_descriptor_response(descriptor):
def _file_descriptor_response(descriptor, original_request):
# collect all dependencies
descriptors = {}
_collect_transitive_dependencies(descriptor, descriptors)
@ -55,6 +56,7 @@ def _file_descriptor_response(descriptor):
file_descriptor_response=_reflection_pb2.FileDescriptorResponse(
file_descriptor_proto=(serialized_proto_list)
),
original_request=original_request,
)
@ -71,25 +73,27 @@ class BaseReflectionServicer(_reflection_pb2_grpc.ServerReflectionServicer):
self._service_names = tuple(sorted(service_names))
self._pool = _POOL if pool is None else pool
def _file_by_filename(self, filename):
def _file_by_filename(self, request, filename):
try:
descriptor = self._pool.FindFileByName(filename)
except KeyError:
return _not_found_error()
return _not_found_error(request)
else:
return _file_descriptor_response(descriptor)
return _file_descriptor_response(descriptor, request)
def _file_containing_symbol(self, fully_qualified_name):
def _file_containing_symbol(self, request, fully_qualified_name):
try:
descriptor = self._pool.FindFileContainingSymbol(
fully_qualified_name
)
except KeyError:
return _not_found_error()
return _not_found_error(request)
else:
return _file_descriptor_response(descriptor)
return _file_descriptor_response(descriptor, request)
def _file_containing_extension(self, containing_type, extension_number):
def _file_containing_extension(
self, request, containing_type, extension_number
):
try:
message_descriptor = self._pool.FindMessageTypeByName(
containing_type
@ -101,11 +105,11 @@ class BaseReflectionServicer(_reflection_pb2_grpc.ServerReflectionServicer):
extension_descriptor.full_name
)
except KeyError:
return _not_found_error()
return _not_found_error(request)
else:
return _file_descriptor_response(descriptor)
return _file_descriptor_response(descriptor, request)
def _all_extension_numbers_of_type(self, containing_type):
def _all_extension_numbers_of_type(self, request, containing_type):
try:
message_descriptor = self._pool.FindMessageTypeByName(
containing_type
@ -119,23 +123,25 @@ class BaseReflectionServicer(_reflection_pb2_grpc.ServerReflectionServicer):
)
)
except KeyError:
return _not_found_error()
return _not_found_error(request)
else:
return _reflection_pb2.ServerReflectionResponse(
all_extension_numbers_response=_reflection_pb2.ExtensionNumberResponse(
base_type_name=message_descriptor.full_name,
extension_number=extension_numbers,
)
),
original_request=request,
)
def _list_services(self):
def _list_services(self, request):
return _reflection_pb2.ServerReflectionResponse(
list_services_response=_reflection_pb2.ListServiceResponse(
service=[
_reflection_pb2.ServiceResponse(name=service_name)
for service_name in self._service_names
]
)
),
original_request=request,
)

@ -32,22 +32,23 @@ class ReflectionServicer(BaseReflectionServicer):
# pylint: disable=unused-argument
for request in request_iterator:
if request.HasField("file_by_filename"):
yield self._file_by_filename(request.file_by_filename)
yield self._file_by_filename(request, request.file_by_filename)
elif request.HasField("file_containing_symbol"):
yield self._file_containing_symbol(
request.file_containing_symbol
request, request.file_containing_symbol
)
elif request.HasField("file_containing_extension"):
yield self._file_containing_extension(
request,
request.file_containing_extension.containing_type,
request.file_containing_extension.extension_number,
)
elif request.HasField("all_extension_numbers_of_type"):
yield self._all_extension_numbers_of_type(
request.all_extension_numbers_of_type
request, request.all_extension_numbers_of_type
)
elif request.HasField("list_services"):
yield self._list_services()
yield self._list_services(request)
else:
yield _reflection_pb2.ServerReflectionResponse(
error_response=_reflection_pb2.ErrorResponse(
@ -55,7 +56,8 @@ class ReflectionServicer(BaseReflectionServicer):
error_message=grpc.StatusCode.INVALID_ARGUMENT.value[
1
].encode(),
)
),
original_request=request,
)

@ -90,6 +90,7 @@ class ReflectionServicerTest(unittest.TestCase):
_file_descriptor_to_proto(empty_pb2.DESCRIPTOR),
)
),
original_request=requests[0],
),
reflection_pb2.ServerReflectionResponse(
valid_host="",
@ -97,6 +98,7 @@ class ReflectionServicerTest(unittest.TestCase):
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
),
original_request=requests[1],
),
)
self.assertEqual(expected_responses, responses)
@ -119,6 +121,7 @@ class ReflectionServicerTest(unittest.TestCase):
_file_descriptor_to_proto(empty_pb2.DESCRIPTOR),
)
),
original_request=requests[0],
),
reflection_pb2.ServerReflectionResponse(
valid_host="",
@ -126,6 +129,7 @@ class ReflectionServicerTest(unittest.TestCase):
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
),
original_request=requests[1],
),
)
self.assertEqual(expected_responses, responses)
@ -157,6 +161,7 @@ class ReflectionServicerTest(unittest.TestCase):
_file_descriptor_to_proto(empty2_pb2.DESCRIPTOR),
)
),
original_request=requests[0],
),
reflection_pb2.ServerReflectionResponse(
valid_host="",
@ -164,6 +169,7 @@ class ReflectionServicerTest(unittest.TestCase):
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
),
original_request=requests[1],
),
)
self.assertEqual(expected_responses, responses)
@ -185,6 +191,7 @@ class ReflectionServicerTest(unittest.TestCase):
base_type_name=_EMPTY_EXTENSIONS_SYMBOL_NAME,
extension_number=_EMPTY_EXTENSIONS_NUMBERS,
),
original_request=requests[0],
),
reflection_pb2.ServerReflectionResponse(
valid_host="",
@ -192,6 +199,7 @@ class ReflectionServicerTest(unittest.TestCase):
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
),
original_request=requests[1],
),
)
self.assertEqual(expected_responses, responses)
@ -212,6 +220,7 @@ class ReflectionServicerTest(unittest.TestCase):
for name in _SERVICE_NAMES
)
),
original_request=requests[0],
),
)
self.assertEqual(expected_responses, responses)

@ -89,6 +89,7 @@ class ReflectionServicerTest(AioTestBase):
_file_descriptor_to_proto(empty_pb2.DESCRIPTOR),
)
),
original_request=requests[0],
),
reflection_pb2.ServerReflectionResponse(
valid_host="",
@ -96,6 +97,7 @@ class ReflectionServicerTest(AioTestBase):
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
),
original_request=requests[1],
),
)
self.assertSequenceEqual(expected_responses, responses)
@ -120,6 +122,7 @@ class ReflectionServicerTest(AioTestBase):
_file_descriptor_to_proto(empty_pb2.DESCRIPTOR),
)
),
original_request=requests[0],
),
reflection_pb2.ServerReflectionResponse(
valid_host="",
@ -127,6 +130,7 @@ class ReflectionServicerTest(AioTestBase):
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
),
original_request=requests[1],
),
)
self.assertSequenceEqual(expected_responses, responses)
@ -160,6 +164,7 @@ class ReflectionServicerTest(AioTestBase):
_file_descriptor_to_proto(empty2_pb2.DESCRIPTOR),
)
),
original_request=requests[0],
),
reflection_pb2.ServerReflectionResponse(
valid_host="",
@ -167,6 +172,7 @@ class ReflectionServicerTest(AioTestBase):
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
),
original_request=requests[1],
),
)
self.assertSequenceEqual(expected_responses, responses)
@ -190,6 +196,7 @@ class ReflectionServicerTest(AioTestBase):
base_type_name=_EMPTY_EXTENSIONS_SYMBOL_NAME,
extension_number=_EMPTY_EXTENSIONS_NUMBERS,
),
original_request=requests[0],
),
reflection_pb2.ServerReflectionResponse(
valid_host="",
@ -197,6 +204,7 @@ class ReflectionServicerTest(AioTestBase):
error_code=grpc.StatusCode.NOT_FOUND.value[0],
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
),
original_request=requests[1],
),
)
self.assertSequenceEqual(expected_responses, responses)
@ -219,6 +227,7 @@ class ReflectionServicerTest(AioTestBase):
for name in _SERVICE_NAMES
)
),
original_request=requests[0],
),
)
self.assertSequenceEqual(expected_responses, responses)

@ -809,7 +809,7 @@ struct call_run_batch_args {
};
static void cancel_call_unblock_func(void* arg) {
gpr_log(GPR_INFO, "GRPC_RUBY: cancel_call_unblock_func");
gpr_log(GPR_DEBUG, "GRPC_RUBY: cancel_call_unblock_func");
grpc_call* call = (grpc_call*)arg;
grpc_call_cancel(call, NULL);
}

@ -60,7 +60,7 @@ static VALUE grpc_rb_call_credentials_callback(VALUE args) {
VALUE callback_func = rb_ary_entry(args, 0);
VALUE callback_args = rb_ary_entry(args, 1);
VALUE md_ary_obj = rb_ary_entry(args, 2);
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
if (absl_vlog2_enabled()) {
VALUE callback_func_str = rb_funcall(callback_func, rb_intern("to_s"), 0);
VALUE callback_args_str = rb_funcall(callback_args, rb_intern("to_s"), 0);
VALUE callback_source_info =
@ -112,7 +112,7 @@ static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args,
VALUE rb_exception_info =
rb_funcall(exception_object, rb_intern("inspect"), 0);
(void)args;
gpr_log(GPR_INFO,
gpr_log(GPR_DEBUG,
"GRPC_RUBY call credentials callback failed, exception inspect:|%s| "
"backtrace:|%s|",
StringValueCStr(rb_exception_info), StringValueCStr(backtrace_str));

@ -328,7 +328,7 @@ static void grpc_ruby_init_threads() {
// in gpr_once_init. In general, it appears to be unsafe to call
// into the ruby library while holding a non-ruby mutex, because a gil yield
// could end up trying to lock onto that same mutex and deadlocking.
gpr_log(GPR_INFO,
gpr_log(GPR_DEBUG,
"GRPC_RUBY: grpc_ruby_init_threads g_bg_thread_init_done=%d",
g_bg_thread_init_done);
rb_mutex_lock(g_bg_thread_init_rb_mu);

@ -254,9 +254,8 @@ gpr_free_aligned_type gpr_free_aligned_import;
gpr_cpu_num_cores_type gpr_cpu_num_cores_import;
gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import;
gpr_log_type gpr_log_import;
gpr_should_log_type gpr_should_log_import;
absl_vlog2_enabled_type absl_vlog2_enabled_import;
gpr_log_verbosity_init_type gpr_log_verbosity_init_import;
gpr_set_log_function_type gpr_set_log_function_import;
gpr_format_message_type gpr_format_message_import;
gpr_strdup_type gpr_strdup_import;
gpr_asprintf_type gpr_asprintf_import;
@ -540,9 +539,8 @@ void grpc_rb_load_imports(HMODULE library) {
gpr_cpu_num_cores_import = (gpr_cpu_num_cores_type) GetProcAddress(library, "gpr_cpu_num_cores");
gpr_cpu_current_cpu_import = (gpr_cpu_current_cpu_type) GetProcAddress(library, "gpr_cpu_current_cpu");
gpr_log_import = (gpr_log_type) GetProcAddress(library, "gpr_log");
gpr_should_log_import = (gpr_should_log_type) GetProcAddress(library, "gpr_should_log");
absl_vlog2_enabled_import = (absl_vlog2_enabled_type) GetProcAddress(library, "absl_vlog2_enabled");
gpr_log_verbosity_init_import = (gpr_log_verbosity_init_type) GetProcAddress(library, "gpr_log_verbosity_init");
gpr_set_log_function_import = (gpr_set_log_function_type) GetProcAddress(library, "gpr_set_log_function");
gpr_format_message_import = (gpr_format_message_type) GetProcAddress(library, "gpr_format_message");
gpr_strdup_import = (gpr_strdup_type) GetProcAddress(library, "gpr_strdup");
gpr_asprintf_import = (gpr_asprintf_type) GetProcAddress(library, "gpr_asprintf");

@ -738,15 +738,12 @@ extern gpr_cpu_current_cpu_type gpr_cpu_current_cpu_import;
typedef void(*gpr_log_type)(const char* file, int line, gpr_log_severity severity, const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
extern gpr_log_type gpr_log_import;
#define gpr_log gpr_log_import
typedef int(*gpr_should_log_type)(gpr_log_severity severity);
extern gpr_should_log_type gpr_should_log_import;
#define gpr_should_log gpr_should_log_import
typedef int(*absl_vlog2_enabled_type)();
extern absl_vlog2_enabled_type absl_vlog2_enabled_import;
#define absl_vlog2_enabled absl_vlog2_enabled_import
typedef void(*gpr_log_verbosity_init_type)(void);
extern gpr_log_verbosity_init_type gpr_log_verbosity_init_import;
#define gpr_log_verbosity_init gpr_log_verbosity_init_import
typedef void(*gpr_set_log_function_type)(gpr_log_func deprecated_setting);
extern gpr_set_log_function_type gpr_set_log_function_import;
#define gpr_set_log_function gpr_set_log_function_import
typedef char*(*gpr_format_message_type)(int messageid);
extern gpr_format_message_type gpr_format_message_import;
#define gpr_format_message gpr_format_message_import

@ -68,7 +68,7 @@ static void grpc_rb_server_shutdown_and_notify_internal(grpc_rb_server* server,
server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL, NULL);
}
if (ev.type != GRPC_OP_COMPLETE) {
gpr_log(GPR_INFO,
gpr_log(GPR_DEBUG,
"GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d",
ev.type);
}
@ -192,7 +192,7 @@ struct server_request_call_args {
static void shutdown_server_unblock_func(void* arg) {
grpc_rb_server* server = (grpc_rb_server*)arg;
gpr_log(GPR_INFO, "GRPC_RUBY: shutdown_server_unblock_func");
gpr_log(GPR_DEBUG, "GRPC_RUBY: shutdown_server_unblock_func");
GRPC_RUBY_ASSERT(server->wrapped != NULL);
grpc_event event;
void* tag = &event;
@ -202,7 +202,7 @@ static void shutdown_server_unblock_func(void* arg) {
// cancelled all calls.
event = grpc_completion_queue_pluck(server->queue, tag,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
gpr_log(GPR_INFO,
gpr_log(GPR_DEBUG,
"GRPC_RUBY: shutdown_server_unblock_func pluck event.type: %d "
"event.success: %d",
event.type, event.success);

@ -37,6 +37,11 @@ os.chdir(os.path.join(os.path.dirname(sys.argv[0]), "../../.."))
# Map of deprecated functions to allowlist files
DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"absl_vlog2_enabled(": [
"./include/grpc/support/log.h",
"./src/core/util/log.cc",
"./src/ruby/ext/grpc/rb_call_credentials.c",
],
"gpr_log_severity": [
"./include/grpc/support/log.h",
"./src/core/util/android/log.cc",
@ -63,13 +68,11 @@ DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"./src/ruby/ext/grpc/rb_server.c",
],
"gpr_should_log(": [
"./include/grpc/support/log.h",
"./src/core/util/android/log.cc",
"./src/core/util/linux/log.cc",
"./src/core/util/log.cc",
"./src/core/util/posix/log.cc",
"./src/core/util/windows/log.cc",
"./src/ruby/ext/grpc/rb_call_credentials.c",
],
"gpr_log_message(": [
"./src/core/util/android/log.cc",
@ -78,14 +81,8 @@ DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"./src/core/util/posix/log.cc",
"./src/core/util/windows/log.cc",
],
"gpr_log_func_args": [
"./include/grpc/support/log.h",
"./src/core/util/log.cc",
],
"gpr_set_log_function(": [
"./include/grpc/support/log.h",
"./src/core/util/log.cc",
],
"gpr_log_func_args": [],
"gpr_set_log_function(": [],
"GPR_ASSERT": [],
"gpr_assertion_failed": [],
"GPR_DEBUG_ASSERT": [],

Loading…
Cancel
Save