Merge Master

pull/37269/head
tanvi-jagtap 4 months ago
commit cb4b0d9e17
  1. 1
      BUILD
  2. 2
      CMakeLists.txt
  3. 4
      build_autogenerated.yaml
  4. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  5. 2
      src/core/ext/transport/chttp2/transport/internal.h
  6. 2
      src/core/ext/transport/chttp2/transport/parsing.cc
  7. 7
      src/core/ext/transport/chttp2/transport/writing.cc
  8. 1
      src/core/handshaker/security/secure_endpoint.cc
  9. 5
      src/core/lib/channel/promise_based_filter.cc
  10. 2
      src/core/lib/channel/promise_based_filter.h
  11. 3
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  12. 20
      src/core/lib/experiments/config.cc
  13. 8
      src/core/lib/iomgr/endpoint_cfstream.cc
  14. 13
      src/core/lib/iomgr/exec_ctx.h
  15. 3
      src/core/lib/iomgr/tcp_posix.cc
  16. 5
      src/core/lib/resource_quota/periodic_update.cc
  17. 2
      src/core/lib/security/credentials/alts/check_gcp_environment.cc
  18. 2
      src/core/lib/security/credentials/alts/check_gcp_environment_no_op.cc
  19. 2
      src/core/lib/security/credentials/jwt/json_token.cc
  20. 17
      src/core/lib/security/credentials/jwt/jwt_credentials.cc
  21. 6
      src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.cc
  22. 5
      src/core/lib/security/credentials/tls/grpc_tls_credentials_options.cc
  23. 4
      src/core/lib/security/credentials/tls/tls_credentials.cc
  24. 15
      src/core/lib/security/security_connector/ssl_utils.cc
  25. 6
      src/core/lib/surface/completion_queue.cc
  26. 2
      src/core/lib/surface/filter_stack_call.cc
  27. 18
      src/core/lib/transport/call_arena_allocator.cc
  28. 25
      src/core/lib/transport/call_arena_allocator.h
  29. 12
      src/core/tsi/alts/handshaker/alts_handshaker_client.cc
  30. 6
      src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
  31. 36
      src/core/tsi/ssl_transport_security.cc
  32. 13
      src/core/util/latent_see.cc
  33. 96
      src/core/util/latent_see.h
  34. 1
      test/core/end2end/tests/no_logging.cc
  35. 30
      test/core/resource_quota/periodic_update_test.cc
  36. 8
      tools/run_tests/sanity/banned_functions.py

@ -3279,6 +3279,7 @@ grpc_cc_library(
"//src/core:experiments",
"//src/core:gpr_atm",
"//src/core:gpr_spinlock",
"//src/core:latent_see",
"//src/core:time",
"//src/core:useful",
],

2
CMakeLists.txt generated

@ -22414,6 +22414,7 @@ add_executable(periodic_update_test
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/gprpp/glob.cc
src/core/lib/gprpp/per_cpu.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/iomgr/closure.cc
@ -22426,6 +22427,7 @@ add_executable(periodic_update_test
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/util/latent_see.cc
test/core/resource_quota/periodic_update_test.cc
)
if(WIN32 AND MSVC)

@ -14454,6 +14454,7 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/glob.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/per_cpu.h
- src/core/lib/gprpp/status_helper.h
- src/core/lib/gprpp/time.h
- src/core/lib/iomgr/closure.h
@ -14468,6 +14469,7 @@ targets:
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/latent_see.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
src:
@ -14478,6 +14480,7 @@ targets:
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/gprpp/glob.cc
- src/core/lib/gprpp/per_cpu.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/iomgr/closure.cc
@ -14490,6 +14493,7 @@ targets:
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/util/latent_see.cc
- test/core/resource_quota/periodic_update_test.cc
deps:
- gtest

@ -1026,6 +1026,7 @@ static const char* begin_writing_desc(bool partial) {
static void write_action_begin_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
grpc_error_handle /*error_ignored*/) {
GRPC_LATENT_SEE_INNER_SCOPE("write_action_begin_locked");
CHECK(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_chttp2_begin_write_result r;
if (!t->closed_with_error.ok()) {
@ -2676,6 +2677,7 @@ static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
static void read_action_parse_loop_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
grpc_error_handle error) {
GRPC_LATENT_SEE_INNER_SCOPE("read_action_parse_loop_locked");
if (t->closed_with_error.ok()) {
grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()};
size_t requests_started = 0;

@ -545,6 +545,8 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
// What percentage of rst_stream frames on the server should cause a ping
// frame to be generated.
uint8_t ping_on_rst_stream_percent;
GPR_NO_UNIQUE_ADDRESS grpc_core::latent_see::Flow write_flow;
};
typedef enum {

@ -205,6 +205,8 @@ std::string FrameTypeString(uint8_t frame_type, uint8_t flags) {
absl::variant<size_t, absl::Status> grpc_chttp2_perform_read(
grpc_chttp2_transport* t, const grpc_slice& slice,
size_t& requests_started) {
GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_perform_read");
const uint8_t* beg = GRPC_SLICE_START_PTR(slice);
const uint8_t* end = GRPC_SLICE_END_PTR(slice);
const uint8_t* cur = beg;

@ -677,6 +677,8 @@ class StreamWriteContext {
grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_chttp2_transport* t) {
GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_begin_write");
int64_t outbuf_relative_start_pos = 0;
WriteContext ctx(t);
ctx.FlushSettings();
@ -732,12 +734,17 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
maybe_initiate_ping(t);
t->write_flow.Begin(GRPC_LATENT_SEE_METADATA("write"));
return ctx.Result();
}
void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_end_write");
grpc_chttp2_stream* s;
t->write_flow.End();
if (t->channelz_socket != nullptr) {
t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
}

@ -387,6 +387,7 @@ static void on_write(void* user_data, grpc_error_handle error) {
static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg, int max_frame_size) {
GRPC_LATENT_SEE_INNER_SCOPE("secure_endpoint write");
unsigned i;
tsi_result result = TSI_OK;
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);

@ -244,7 +244,10 @@ void BaseCallData::CapturedBatch::CancelWith(grpc_error_handle error,
///////////////////////////////////////////////////////////////////////////////
// BaseCallData::Flusher
BaseCallData::Flusher::Flusher(BaseCallData* call) : call_(call) {
BaseCallData::Flusher::Flusher(BaseCallData* call)
: latent_see::InnerScope(
GRPC_LATENT_SEE_METADATA("PromiseBasedFilter Flusher")),
call_(call) {
GRPC_CALL_STACK_REF(call_->call_stack(), "flusher");
}

@ -947,7 +947,7 @@ class BaseCallData : public Activity, private Wakeable {
}
};
class Flusher {
class Flusher : public latent_see::InnerScope {
public:
explicit Flusher(BaseCallData* call);
// Calls closures, schedules batches, relinquishes call combiner.

@ -102,6 +102,7 @@ namespace {
// of bytes sent.
ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno,
int additional_flags = 0) {
GRPC_LATENT_SEE_INNER_SCOPE("TcpSend");
ssize_t sent_length;
do {
sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);
@ -286,6 +287,8 @@ absl::Status PosixEndpointImpl::TcpAnnotateError(absl::Status src_error) const {
// Returns true if data available to read or error other than EAGAIN.
bool PosixEndpointImpl::TcpDoRead(absl::Status& status) {
GRPC_LATENT_SEE_INNER_SCOPE("TcpDoRead");
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;

@ -254,20 +254,20 @@ void PrintExperimentsList() {
}
if (experiment_status.empty()) {
if (!defaulted_on_experiments.empty()) {
LOG(INFO) << "gRPC experiments enabled: "
<< absl::StrJoin(defaulted_on_experiments, ", ");
VLOG(2) << "gRPC experiments enabled: "
<< absl::StrJoin(defaulted_on_experiments, ", ");
}
} else {
if (defaulted_on_experiments.empty()) {
LOG(INFO) << "gRPC experiments: "
<< absl::StrJoin(experiment_status, ", ",
absl::PairFormatter(":"));
VLOG(2) << "gRPC experiments: "
<< absl::StrJoin(experiment_status, ", ",
absl::PairFormatter(":"));
} else {
LOG(INFO) << "gRPC experiments: "
<< absl::StrJoin(experiment_status, ", ",
absl::PairFormatter(":"))
<< "; default-enabled: "
<< absl::StrJoin(defaulted_on_experiments, ", ");
VLOG(2) << "gRPC experiments: "
<< absl::StrJoin(experiment_status, ", ",
absl::PairFormatter(":"))
<< "; default-enabled: "
<< absl::StrJoin(defaulted_on_experiments, ", ");
}
}
}

@ -76,8 +76,8 @@ static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
const char* file, int line) {
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
VLOG(2) << "CFStream endpoint unref " << ep << " : " << reason << " " << val
<< " -> " << val - 1;
VLOG(2).AtLocation(file, line) << "CFStream endpoint unref " << ep << " : "
<< reason << " " << val << " -> " << val - 1;
}
if (gpr_unref(&ep->refcount)) {
CFStreamFree(ep);
@ -87,8 +87,8 @@ static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
const char* file, int line) {
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
VLOG(2) << "CFStream endpoint ref " << ep << " : " << reason << " " << val
<< " -> " << val + 1;
VLOG(2).AtLocation(file, line) << "CFStream endpoint ref " << ep << " : "
<< reason << " " << val << " -> " << val + 1;
}
gpr_ref(&ep->refcount);
}

@ -41,6 +41,7 @@
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/util/latent_see.h"
#include "src/core/util/time_precise.h"
#if !defined(_WIN32) || !defined(_DLL)
@ -109,17 +110,23 @@ class Combiner;
/// since that implies a core re-entry outside of application
/// callbacks.
///
class GRPC_DLL ExecCtx {
class GRPC_DLL ExecCtx : public latent_see::ParentScope {
public:
/// Default Constructor
ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
ExecCtx()
: latent_see::ParentScope(GRPC_LATENT_SEE_METADATA("ExecCtx")),
flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
Fork::IncExecCtxCount();
Set(this);
}
/// Parameterised Constructor
explicit ExecCtx(uintptr_t fl) : flags_(fl) {
explicit ExecCtx(uintptr_t fl)
: ExecCtx(fl, GRPC_LATENT_SEE_METADATA("ExecCtx")) {}
explicit ExecCtx(uintptr_t fl, latent_see::Metadata* latent_see_metadata)
: latent_see::ParentScope(latent_see_metadata), flags_(fl) {
if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) {
Fork::IncExecCtxCount();
}

@ -915,6 +915,7 @@ static void update_rcvlowat(grpc_tcp* tcp)
#define MAX_READ_IOVEC 64
static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(tcp->read_mu) {
GRPC_LATENT_SEE_INNER_SCOPE("tcp_do_read");
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
LOG(INFO) << "TCP:" << tcp << " do_read";
}
@ -1211,6 +1212,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
// of bytes sent.
ssize_t tcp_send(int fd, const struct msghdr* msg, int* saved_errno,
int additional_flags = 0) {
GRPC_LATENT_SEE_INNER_SCOPE("tcp_send");
ssize_t sent_length;
do {
// TODO(klempner): Cork if this is a partial write
@ -1408,6 +1410,7 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
/// messages from the queue.
///
static bool process_errors(grpc_tcp* tcp) {
GRPC_LATENT_SEE_INNER_SCOPE("process_errors");
bool processed_err = false;
struct iovec iov;
iov.iov_base = nullptr;

@ -56,9 +56,8 @@ bool PeriodicUpdate::MaybeEndPeriod(absl::FunctionRef<void(Duration)> f) {
// Store the remainder left. Note that updates_remaining_ may have been
// decremented by another thread whilst we performed the above calculations:
// we simply discard those decrements.
auto remaining = better_guess - expected_updates_per_period_;
expected_updates_per_period_ = better_guess;
updates_remaining_.store(remaining, std::memory_order_release);
updates_remaining_.store(better_guess - expected_updates_per_period_,
std::memory_order_release);
// Not quite done, return, try for longer.
return false;
}

@ -58,7 +58,7 @@ namespace internal {
char* read_bios_file(const char* bios_file) {
FILE* fp = fopen(bios_file, "r");
if (!fp) {
LOG(INFO) << "BIOS data file does not exist or cannot be opened.";
VLOG(2) << "BIOS data file does not exist or cannot be opened.";
return nullptr;
}
char buf[kBiosDataBufferSize + 1];

@ -26,7 +26,7 @@
#include "src/core/lib/security/credentials/alts/check_gcp_environment.h"
bool grpc_alts_is_running_on_gcp() {
LOG(INFO) << "ALTS: Platforms other than Linux and Windows are not supported";
VLOG(2) << "ALTS: Platforms other than Linux and Windows are not supported";
return false;
}

@ -190,7 +190,7 @@ static char* encoded_jwt_claim(const grpc_auth_json_key* json_key,
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec expiration = gpr_time_add(now, token_lifetime);
if (gpr_time_cmp(token_lifetime, grpc_max_auth_token_lifetime()) > 0) {
LOG(INFO) << "Cropping token lifetime to maximum allowed value.";
VLOG(2) << "Cropping token lifetime to maximum allowed value.";
expiration = gpr_time_add(now, grpc_max_auth_token_lifetime());
}

@ -115,8 +115,8 @@ grpc_service_account_jwt_access_credentials::
: key_(key) {
gpr_timespec max_token_lifetime = grpc_max_auth_token_lifetime();
if (gpr_time_cmp(token_lifetime, max_token_lifetime) > 0) {
LOG(INFO) << "Cropping token lifetime to maximum allowed value ("
<< max_token_lifetime.tv_sec << " secs).";
VLOG(2) << "Cropping token lifetime to maximum allowed value ("
<< max_token_lifetime.tv_sec << " secs).";
token_lifetime = grpc_max_auth_token_lifetime();
}
jwt_lifetime_ = token_lifetime;
@ -155,13 +155,12 @@ grpc_call_credentials* grpc_service_account_jwt_access_credentials_create(
const char* json_key, gpr_timespec token_lifetime, void* reserved) {
if (GRPC_TRACE_FLAG_ENABLED(api)) {
char* clean_json = redact_private_key(json_key);
LOG(INFO) << "grpc_service_account_jwt_access_credentials_create("
<< "json_key=" << clean_json
<< ", token_lifetime=gpr_timespec { tv_sec: "
<< token_lifetime.tv_sec
<< ", tv_nsec: " << token_lifetime.tv_nsec
<< ", clock_type: " << token_lifetime.clock_type
<< " }, reserved=" << reserved << ")";
VLOG(2) << "grpc_service_account_jwt_access_credentials_create("
<< "json_key=" << clean_json
<< ", token_lifetime=gpr_timespec { tv_sec: "
<< token_lifetime.tv_sec << ", tv_nsec: " << token_lifetime.tv_nsec
<< ", clock_type: " << token_lifetime.clock_type
<< " }, reserved=" << reserved << ")";
gpr_free(clean_json);
}
CHECK_EQ(reserved, nullptr);

@ -122,9 +122,9 @@ FileWatcherCertificateProvider::FileWatcherCertificateProvider(
refresh_interval_sec_(refresh_interval_sec),
distributor_(MakeRefCounted<grpc_tls_certificate_distributor>()) {
if (refresh_interval_sec_ < kMinimumFileWatcherRefreshIntervalSeconds) {
LOG(INFO) << "FileWatcherCertificateProvider refresh_interval_sec_ set to "
"value less than minimum. Overriding configured value to "
"minimum.";
VLOG(2) << "FileWatcherCertificateProvider refresh_interval_sec_ set to "
"value less than minimum. Overriding configured value to "
"minimum.";
refresh_interval_sec_ = kMinimumFileWatcherRefreshIntervalSeconds;
}
// Private key and identity cert files must be both set or both unset.

@ -127,10 +127,9 @@ void grpc_tls_credentials_options_set_tls_session_key_log_file_path(
// Tls session key logging is assumed to be enabled if the specified log
// file is non-empty.
if (path != nullptr) {
LOG(INFO) << "Enabling TLS session key logging with keys stored at: "
<< path;
VLOG(2) << "Enabling TLS session key logging with keys stored at: " << path;
} else {
LOG(INFO) << "Disabling TLS session key logging";
VLOG(2) << "Disabling TLS session key logging";
}
options->set_tls_session_key_log_file_path(path != nullptr ? path : "");
}

@ -84,8 +84,8 @@ bool CredentialOptionSanityCheck(grpc_tls_credentials_options* options,
// If no verifier is specified on the client side, use the hostname verifier
// as default. Users who want to bypass all the verifier check should
// implement an external verifier instead.
LOG(INFO) << "No verifier specified on the client side. Using default "
"hostname verifier";
VLOG(2) << "No verifier specified on the client side. Using default "
"hostname verifier";
options->set_certificate_verifier(
grpc_core::MakeRefCounted<grpc_core::HostNameCertificateVerifier>());
}

@ -232,16 +232,18 @@ static bool IsSpiffeId(absl::string_view uri) {
return false;
};
if (uri.size() > 2048) {
LOG(INFO) << "Invalid SPIFFE ID: ID longer than 2048 bytes.";
GRPC_TRACE_LOG(tsi, INFO)
<< "Invalid SPIFFE ID: ID longer than 2048 bytes.";
return false;
}
std::vector<absl::string_view> splits = absl::StrSplit(uri, '/');
if (splits.size() < 4 || splits[3].empty()) {
LOG(INFO) << "Invalid SPIFFE ID: workload id is empty.";
GRPC_TRACE_LOG(tsi, INFO) << "Invalid SPIFFE ID: workload id is empty.";
return false;
}
if (splits[2].size() > 255) {
LOG(INFO) << "Invalid SPIFFE ID: domain longer than 255 characters.";
GRPC_TRACE_LOG(tsi, INFO)
<< "Invalid SPIFFE ID: domain longer than 255 characters.";
return false;
}
return true;
@ -332,7 +334,7 @@ grpc_core::RefCountedPtr<grpc_auth_context> grpc_ssl_peer_to_auth_context(
GRPC_PEER_SPIFFE_ID_PROPERTY_NAME,
spiffe_data, spiffe_length);
} else {
LOG(INFO) << "Invalid SPIFFE ID: multiple URI SANs.";
GRPC_TRACE_LOG(tsi, INFO) << "Invalid SPIFFE ID: multiple URI SANs.";
}
}
return ctx;
@ -419,8 +421,9 @@ grpc_security_status grpc_ssl_tsi_client_handshaker_factory_init(
const char* root_certs;
const tsi_ssl_root_certs_store* root_store;
if (pem_root_certs == nullptr && !skip_server_certificate_verification) {
LOG(INFO) << "No root certificates specified; use ones stored in system "
"default locations instead";
GRPC_TRACE_LOG(tsi, INFO)
<< "No root certificates specified; use ones stored in system "
"default locations instead";
// Use default root certificates.
root_certs = grpc_core::DefaultSslRootStore::GetPemRootCerts();
if (root_certs == nullptr) {

@ -886,7 +886,8 @@ struct cq_is_finished_arg {
class ExecCtxNext : public grpc_core::ExecCtx {
public:
explicit ExecCtxNext(void* arg)
: ExecCtx(0), check_ready_to_finish_arg_(arg) {}
: ExecCtx(0, GRPC_LATENT_SEE_METADATA("ExecCtx for CqNext")),
check_ready_to_finish_arg_(arg) {}
bool CheckReadyToFinish() override {
cq_is_finished_arg* a =
@ -1133,7 +1134,8 @@ static void del_plucker(grpc_completion_queue* cq, void* tag,
class ExecCtxPluck : public grpc_core::ExecCtx {
public:
explicit ExecCtxPluck(void* arg)
: ExecCtx(0), check_ready_to_finish_arg_(arg) {}
: ExecCtx(0, GRPC_LATENT_SEE_METADATA("ExecCtx for CqPluck")),
check_ready_to_finish_arg_(arg) {}
bool CheckReadyToFinish() override {
cq_is_finished_arg* a =

@ -730,6 +730,8 @@ void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) {
grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
void* notify_tag,
bool is_notify_tag_closure) {
GRPC_LATENT_SEE_INNER_SCOPE("FilterStackCall::StartBatch");
size_t i;
const grpc_op* op;
BatchControl* bctl;

@ -20,22 +20,8 @@
namespace grpc_core {
void CallSizeEstimator::UpdateCallSizeEstimate(size_t size) {
size_t cur = call_size_estimate_.load(std::memory_order_relaxed);
if (cur < size) {
// size grew: update estimate
call_size_estimate_.compare_exchange_weak(
cur, size, std::memory_order_relaxed, std::memory_order_relaxed);
// if we lose: never mind, something else will likely update soon enough
} else if (cur == size) {
// no change: holding pattern
} else if (cur > 0) {
// size shrank: decrease estimate
call_size_estimate_.compare_exchange_weak(
cur, std::min(cur - 1, (255 * cur + size) / 256),
std::memory_order_relaxed, std::memory_order_relaxed);
// if we lose: never mind, something else will likely update soon enough
}
void CallArenaAllocator::FinalizeArena(Arena* arena) {
call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
}
} // namespace grpc_core

@ -33,7 +33,7 @@ class CallSizeEstimator final {
explicit CallSizeEstimator(size_t initial_estimate)
: call_size_estimate_(initial_estimate) {}
size_t CallSizeEstimate() {
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION size_t CallSizeEstimate() {
// We round up our current estimate to the NEXT value of kRoundUpSize.
// This ensures:
// 1. a consistent size allocation when our estimate is drifting slowly
@ -46,7 +46,24 @@ class CallSizeEstimator final {
~(kRoundUpSize - 1);
}
void UpdateCallSizeEstimate(size_t size);
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void UpdateCallSizeEstimate(
size_t size) {
size_t cur = call_size_estimate_.load(std::memory_order_relaxed);
if (cur < size) {
// size grew: update estimate
call_size_estimate_.compare_exchange_weak(
cur, size, std::memory_order_relaxed, std::memory_order_relaxed);
// if we lose: never mind, something else will likely update soon enough
} else if (cur == size) {
// no change: holding pattern
} else if (cur > 0) {
// size shrank: decrease estimate
call_size_estimate_.compare_exchange_weak(
cur, std::min(cur - 1, (255 * cur + size) / 256),
std::memory_order_relaxed, std::memory_order_relaxed);
// if we lose: never mind, something else will likely update soon enough
}
}
private:
std::atomic<size_t> call_size_estimate_;
@ -62,9 +79,7 @@ class CallArenaAllocator final : public ArenaFactory {
return Arena::Create(call_size_estimator_.CallSizeEstimate(), Ref());
}
void FinalizeArena(Arena* arena) override {
call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
}
void FinalizeArena(Arena* arena) override;
size_t CallSizeEstimate() { return call_size_estimator_.CallSizeEstimate(); }

@ -220,14 +220,14 @@ void alts_handshaker_client_handle_response(alts_handshaker_client* c,
}
// TSI handshake has been shutdown.
if (alts_tsi_handshaker_has_shutdown(handshaker)) {
LOG(INFO) << "TSI handshake shutdown";
VLOG(2) << "TSI handshake shutdown";
handle_response_done(client, TSI_HANDSHAKE_SHUTDOWN,
"TSI handshake shutdown", nullptr, 0, nullptr);
return;
}
// Check for failed grpc read.
if (!is_ok || client->inject_read_failure) {
LOG(INFO) << "read failed on grpc call to handshaker service";
VLOG(2) << "read failed on grpc call to handshaker service";
handle_response_done(client, TSI_INTERNAL_ERROR,
"read failed on grpc call to handshaker service",
nullptr, 0, nullptr);
@ -470,10 +470,10 @@ static void on_status_received(void* arg, grpc_error_handle error) {
// status from the final ALTS message with the status here.
char* status_details =
grpc_slice_to_c_string(client->handshake_status_details);
LOG(INFO) << "alts_grpc_handshaker_client:" << client
<< " on_status_received status:" << client->handshake_status_code
<< " details:|" << status_details << "| error:|"
<< grpc_core::StatusToString(error) << "|";
VLOG(2) << "alts_grpc_handshaker_client:" << client
<< " on_status_received status:" << client->handshake_status_code
<< " details:|" << status_details << "| error:|"
<< grpc_core::StatusToString(error) << "|";
gpr_free(status_details);
}
maybe_complete_tsi_next(client, true /* receive_status_finished */,

@ -390,8 +390,8 @@ static void on_handshaker_service_resp_recv(void* arg,
}
bool success = true;
if (!error.ok()) {
LOG(INFO) << "ALTS handshaker on_handshaker_service_resp_recv error: "
<< grpc_core::StatusToString(error);
VLOG(2) << "ALTS handshaker on_handshaker_service_resp_recv error: "
<< grpc_core::StatusToString(error);
success = false;
}
alts_handshaker_client_handle_response(client, success);
@ -445,7 +445,7 @@ static tsi_result alts_tsi_handshaker_continue_handshaker_next(
CHECK_EQ(handshaker->client, nullptr);
handshaker->client = client;
if (handshaker->shutdown) {
LOG(INFO) << "TSI handshake shutdown";
VLOG(2) << "TSI handshake shutdown";
if (error != nullptr) *error = "TSI handshaker shutdown";
return TSI_HANDSHAKE_SHUTDOWN;
}

@ -206,7 +206,7 @@ static void init_openssl(void) {
CRYPTO_set_locking_callback(openssl_locking_cb);
CRYPTO_set_id_callback(openssl_thread_id_cb);
} else {
LOG(INFO) << "OpenSSL callback has already been set.";
GRPC_TRACE_LOG(tsi, INFO) << "OpenSSL callback has already been set.";
}
#endif
g_ssl_ctx_ex_factory_index =
@ -337,7 +337,7 @@ static tsi_result peer_property_from_x509_subject(X509* cert,
bool is_verified_root_cert) {
X509_NAME* subject_name = X509_get_subject_name(cert);
if (subject_name == nullptr) {
LOG(INFO) << "Could not get subject name from certificate.";
GRPC_TRACE_LOG(tsi, INFO) << "Could not get subject name from certificate.";
return TSI_NOT_FOUND;
}
BIO* bio = BIO_new(BIO_s_mem());
@ -893,8 +893,9 @@ static tsi_result build_alpn_protocol_name_list(
static int verify_cb(int ok, X509_STORE_CTX* ctx) {
int cert_error = X509_STORE_CTX_get_error(ctx);
if (cert_error == X509_V_ERR_UNABLE_TO_GET_CRL) {
LOG(INFO) << "Certificate verification failed to find relevant CRL file. "
"Ignoring error.";
GRPC_TRACE_LOG(tsi, INFO)
<< "Certificate verification failed to find relevant CRL file. "
"Ignoring error.";
return 1;
}
if (cert_error != 0) {
@ -961,7 +962,8 @@ static int RootCertExtractCallback(X509_STORE_CTX* ctx, void* /*arg*/) {
int success =
SSL_set_ex_data(ssl, g_ssl_ex_verified_root_cert_index, root_cert);
if (success == 0) {
LOG(INFO) << "Could not set verified root cert in SSL's ex_data";
GRPC_TRACE_LOG(tsi, INFO)
<< "Could not set verified root cert in SSL's ex_data";
} else {
#if OPENSSL_VERSION_NUMBER >= 0x10100000L
X509_up_ref(root_cert);
@ -979,7 +981,7 @@ static grpc_core::experimental::CrlProvider* GetCrlProvider(
if (ssl_index < 0) {
char err_str[256];
ERR_error_string_n(ERR_get_error(), err_str, sizeof(err_str));
LOG(INFO)
GRPC_TRACE_LOG(tsi, INFO)
<< "error getting the SSL index from the X509_STORE_CTX while looking "
"up Crl: "
<< err_str;
@ -987,7 +989,8 @@ static grpc_core::experimental::CrlProvider* GetCrlProvider(
}
SSL* ssl = static_cast<SSL*>(X509_STORE_CTX_get_ex_data(ctx, ssl_index));
if (ssl == nullptr) {
LOG(INFO) << "error while fetching from CrlProvider. SSL object is null";
GRPC_TRACE_LOG(tsi, INFO)
<< "error while fetching from CrlProvider. SSL object is null";
return nullptr;
}
SSL_CTX* ssl_ctx = SSL_get_SSL_CTX(ssl);
@ -1005,13 +1008,14 @@ static absl::StatusOr<X509_CRL*> GetCrlFromProvider(
}
absl::StatusOr<std::string> issuer_name = grpc_core::IssuerFromCert(cert);
if (!issuer_name.ok()) {
LOG(INFO) << "Could not get certificate issuer name";
GRPC_TRACE_LOG(tsi, INFO) << "Could not get certificate issuer name";
return absl::InvalidArgumentError(issuer_name.status().message());
}
absl::StatusOr<std::string> akid = grpc_core::AkidFromCertificate(cert);
std::string akid_to_use;
if (!akid.ok()) {
LOG(INFO) << "Could not get certificate authority key identifier.";
GRPC_TRACE_LOG(tsi, INFO)
<< "Could not get certificate authority key identifier.";
} else {
akid_to_use = *akid;
}
@ -1174,8 +1178,8 @@ static tsi_result tsi_set_min_and_max_tls_versions(
SSL_CTX* ssl_context, tsi_tls_version min_tls_version,
tsi_tls_version max_tls_version) {
if (ssl_context == nullptr) {
LOG(INFO) << "Invalid nullptr argument to "
"|tsi_set_min_and_max_tls_versions|.";
GRPC_TRACE_LOG(tsi, INFO) << "Invalid nullptr argument to "
"|tsi_set_min_and_max_tls_versions|.";
return TSI_INVALID_ARGUMENT;
}
#if OPENSSL_VERSION_NUMBER >= 0x10100000
@ -1196,7 +1200,7 @@ static tsi_result tsi_set_min_and_max_tls_versions(
break;
#endif
default:
LOG(INFO) << "TLS version is not supported.";
GRPC_TRACE_LOG(tsi, INFO) << "TLS version is not supported.";
return TSI_FAILED_PRECONDITION;
}
@ -1215,7 +1219,7 @@ static tsi_result tsi_set_min_and_max_tls_versions(
#endif
break;
default:
LOG(INFO) << "TLS version is not supported.";
GRPC_TRACE_LOG(tsi, INFO) << "TLS version is not supported.";
return TSI_FAILED_PRECONDITION;
}
#endif
@ -1836,9 +1840,9 @@ static tsi_result ssl_handshaker_next(tsi_handshaker* self,
reinterpret_cast<tsi_ssl_handshaker_result*>(*handshaker_result);
auto cipher = SSL_get_current_cipher(result->ssl);
if (cipher != nullptr) {
LOG(INFO) << absl::StrFormat("SSL Cipher Version: %s Name: %s",
SSL_CIPHER_get_version(cipher),
SSL_CIPHER_get_name(cipher));
GRPC_TRACE_LOG(tsi, INFO) << absl::StrFormat(
"SSL Cipher Version: %s Name: %s", SSL_CIPHER_get_version(cipher),
SSL_CIPHER_get_name(cipher));
}
}
}

@ -24,9 +24,11 @@
namespace grpc_core {
namespace latent_see {
thread_local std::vector<Log::Event> Log::thread_events_;
thread_local uint64_t Log::thread_id_ = Log::Get().next_thread_id_.fetch_add(1);
thread_local Bin* Log::bin_ = nullptr;
thread_local void* Log::bin_owner_ = nullptr;
std::atomic<uint64_t> Flow::next_flow_id_{1};
std::atomic<Bin*> Log::free_bins_;
std::string Log::GenerateJson() {
std::vector<RecordedEvent> events;
@ -91,9 +93,8 @@ std::string Log::GenerateJson() {
return json;
}
void Log::FlushThreadLog() {
auto& thread_events = thread_events_;
if (thread_events.empty()) return;
void Log::FlushBin(Bin* bin) {
if (bin->events.empty()) return;
auto& log = Get();
const auto batch_id =
log.next_batch_id_.fetch_add(1, std::memory_order_relaxed);
@ -101,11 +102,11 @@ void Log::FlushThreadLog() {
const auto thread_id = thread_id_;
{
MutexLock lock(&fragment.mu);
for (auto event : thread_events) {
for (auto event : bin->events) {
fragment.events.push_back(RecordedEvent{thread_id, batch_id, event});
}
}
thread_events.clear();
bin->events.clear();
}
} // namespace latent_see

@ -39,19 +39,58 @@ struct Metadata {
enum class EventType : uint8_t { kBegin, kEnd, kFlowStart, kFlowEnd, kMark };
// A bin collects all events that occur within a parent scope.
struct Bin {
struct Event {
const Metadata* metadata;
std::chrono::steady_clock::time_point timestamp;
uint64_t id;
EventType type;
};
void Append(const Metadata* metadata, EventType type, uint64_t id) {
events.push_back(
Event{metadata, std::chrono::steady_clock::now(), id, type});
}
std::vector<Event> events;
Bin* next_free;
};
class Log {
public:
static void FlushThreadLog();
static Bin* MaybeStartBin(void* owner) {
if (bin_ != nullptr) return bin_;
Bin* bin = free_bins_.load(std::memory_order_acquire);
do {
if (bin == nullptr) {
bin = new Bin();
break;
}
} while (!free_bins_.compare_exchange_weak(bin, bin->next_free,
std::memory_order_acq_rel));
bin_ = bin;
bin_owner_ = owner;
return bin;
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static void Append(
const Metadata* metadata, EventType type, uint64_t id) {
thread_events_.push_back(
Event{metadata, std::chrono::steady_clock::now(), id, type});
static void EndBin(void* owner) {
if (bin_owner_ != owner) return;
FlushBin(bin_);
bin_->next_free = free_bins_.load(std::memory_order_acquire);
while (!free_bins_.compare_exchange_weak(bin_->next_free, bin_,
std::memory_order_acq_rel)) {
}
bin_ = nullptr;
}
static Bin* CurrentThreadBin() { return bin_; }
private:
Log() = default;
static void FlushBin(Bin* bin);
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Log& Get() {
static Log* log = []() {
atexit([] {
@ -68,21 +107,17 @@ class Log {
std::string GenerateJson();
struct Event {
const Metadata* metadata;
std::chrono::steady_clock::time_point timestamp;
uint64_t id;
EventType type;
};
struct RecordedEvent {
uint64_t thread_id;
uint64_t batch_id;
Event event;
Bin::Event event;
};
std::atomic<uint64_t> next_thread_id_{1};
std::atomic<uint64_t> next_batch_id_{1};
static thread_local std::vector<Event> thread_events_;
static thread_local uint64_t thread_id_;
static thread_local Bin* bin_;
static thread_local void* bin_owner_;
static std::atomic<Bin*> free_bins_;
struct Fragment {
Mutex mu;
std::vector<RecordedEvent> events ABSL_GUARDED_BY(mu);
@ -90,16 +125,16 @@ class Log {
PerCpu<Fragment> fragments_{PerCpuOptions()};
};
template <bool kFlush>
template <bool kParent>
class Scope {
public:
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Scope(const Metadata* metadata)
: metadata_(metadata) {
Log::Append(metadata_, EventType::kBegin, 0);
bin_->Append(metadata_, EventType::kBegin, 0);
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ~Scope() {
Log::Append(metadata_, EventType::kEnd, 0);
if (kFlush) Log::FlushThreadLog();
bin_->Append(metadata_, EventType::kEnd, 0);
if (kParent) Log::EndBin(this);
}
Scope(const Scope&) = delete;
@ -107,6 +142,8 @@ class Scope {
private:
const Metadata* const metadata_;
Bin* const bin_ =
kParent ? Log::MaybeStartBin(this) : Log::CurrentThreadBin();
};
using ParentScope = Scope<true>;
@ -118,11 +155,11 @@ class Flow {
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Flow(const Metadata* metadata)
: metadata_(metadata),
id_(next_flow_id_.fetch_add(1, std::memory_order_relaxed)) {
Log::Append(metadata_, EventType::kFlowStart, id_);
Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowStart, id_);
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ~Flow() {
if (metadata_ != nullptr) {
Log::Append(metadata_, EventType::kFlowEnd, id_);
Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowEnd, id_);
}
}
@ -131,7 +168,9 @@ class Flow {
Flow(Flow&& other) noexcept
: metadata_(std::exchange(other.metadata_, nullptr)), id_(other.id_) {}
Flow& operator=(Flow&& other) noexcept {
if (metadata_ != nullptr) Log::Append(metadata_, EventType::kFlowEnd, id_);
if (metadata_ != nullptr) {
Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowEnd, id_);
}
metadata_ = std::exchange(other.metadata_, nullptr);
id_ = other.id_;
return *this;
@ -142,15 +181,18 @@ class Flow {
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void End() {
if (metadata_ == nullptr) return;
Log::Append(metadata_, EventType::kFlowEnd, id_);
Log::CurrentThreadBin()->Append(metadata_, EventType::kFlowEnd, id_);
metadata_ = nullptr;
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Begin(const Metadata* metadata) {
if (metadata_ != nullptr) Log::Append(metadata_, EventType::kFlowEnd, id_);
auto* bin = Log::CurrentThreadBin();
if (metadata_ != nullptr) {
bin->Append(metadata_, EventType::kFlowEnd, id_);
}
metadata_ = metadata;
if (metadata_ == nullptr) return;
id_ = next_flow_id_.fetch_add(1, std::memory_order_relaxed);
Log::Append(metadata_, EventType::kFlowStart, id_);
bin->Append(metadata_, EventType::kFlowStart, id_);
}
private:
@ -160,7 +202,7 @@ class Flow {
};
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void Mark(const Metadata* md) {
Log::Append(md, EventType::kMark, 0);
Log::CurrentThreadBin()->Append(md, EventType::kMark, 0);
}
} // namespace latent_see
@ -197,6 +239,12 @@ struct Flow {
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void End() {}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Begin(Metadata*) {}
};
struct ParentScope {
explicit ParentScope(Metadata*) {}
};
struct InnerScope {
explicit InnerScope(Metadata*) {}
};
} // namespace latent_see
} // namespace grpc_core
#define GRPC_LATENT_SEE_METADATA(name) nullptr

@ -95,7 +95,6 @@ class VerifyLogNoiseLogSink : public absl::LogSink {
{{"cq_verifier.cc", std::regex("^Verify .* for [0-9]+ms")},
{"chttp2_transport.cc",
std::regex("Sending goaway.*Channel Destroyed")},
{"config.cc", std::regex("gRPC experiments.*")},
{"chaotic_good_server.cc",
std::regex("Failed to bind some addresses for.*")},
{"log.cc",

@ -78,35 +78,7 @@ TEST(PeriodicUpdateTest, SimpleTest) {
}
}
TEST(PeriodicUpdateTest, NoSpin) {
// Ensure that we do not poll the time every update... even initially
class NowCounter final : public Timestamp::ScopedSource {
public:
Timestamp Now() override {
++n_;
return previous()->Now();
}
int now_calls() const { return n_; }
private:
int n_ = 0;
};
NowCounter counter;
PeriodicUpdate upd(Duration::Seconds(5));
while (!upd.Tick([](Duration d) { EXPECT_GE(d, Duration::Seconds(5)); })) {
}
const int initial_now_calls = counter.now_calls();
EXPECT_GT(initial_now_calls, 2);
EXPECT_LT(initial_now_calls, 100);
while (!upd.Tick([](Duration d) { EXPECT_GE(d, Duration::Seconds(5)); })) {
}
const int second_round_calls = counter.now_calls() - initial_now_calls;
EXPECT_GE(second_round_calls, 1);
EXPECT_LE(second_round_calls, initial_now_calls);
}
TEST(PeriodicUpdateTest, ThreadTest) {
TEST(PeriodicUpdate, ThreadTest) {
std::unique_ptr<PeriodicUpdate> upd;
std::atomic<int> count(0);
Timestamp start;

@ -46,9 +46,7 @@ DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"./src/core/util/windows/log.cc",
"./src/ruby/ext/grpc/rb_grpc_imports.generated.c",
"./src/ruby/ext/grpc/rb_grpc_imports.generated.h",
"./test/core/end2end/tests/no_logging.cc",
],
"gpr_log_severity_string": [],
"gpr_log(": [
"./include/grpc/support/log.h",
"./src/core/util/android/log.cc",
@ -72,10 +70,8 @@ DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"./src/core/util/posix/log.cc",
"./src/core/util/windows/log.cc",
"./src/ruby/ext/grpc/rb_call_credentials.c",
"./test/core/end2end/tests/no_logging.cc",
],
"gpr_log_message(": [
"./include/grpc/support/log.h",
"./src/core/util/android/log.cc",
"./src/core/util/linux/log.cc",
"./src/core/util/log.cc",
@ -85,21 +81,19 @@ DEPRECATED_FUNCTION_TEMP_ALLOW_LIST = {
"gpr_set_log_verbosity(": [
"./include/grpc/support/log.h",
"./src/core/util/log.cc",
"./test/core/end2end/tests/no_logging.cc",
],
"gpr_log_func_args": [
"./include/grpc/support/log.h",
"./src/core/util/log.cc",
"./test/core/end2end/tests/no_logging.cc",
],
"gpr_set_log_function(": [
"./include/grpc/support/log.h",
"./src/core/util/log.cc",
"./test/core/end2end/tests/no_logging.cc",
],
"gpr_assertion_failed": [],
"GPR_ASSERT": [],
"GPR_DEBUG_ASSERT": [],
"gpr_log_severity_string": [],
}
errors = 0

Loading…
Cancel
Save