Create a CircularBuffer to store events generated by latent_see. This bounds the total number of generated events.

PiperOrigin-RevId: 658611718
pull/37317/head
Vignesh Babu 4 months ago committed by Copybara-Service
parent 3e704c7552
commit 449d1b248f
  1. 33
      CMakeLists.txt
  2. 1
      Package.swift
  3. 32
      build_autogenerated.yaml
  4. 2
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 15
      src/core/BUILD
  9. 14
      src/core/util/latent_see.cc
  10. 9
      src/core/util/latent_see.h
  11. 109
      src/core/util/ring_buffer.h
  12. 13
      test/core/util/BUILD
  13. 88
      test/core/util/ring_buffer_test.cc
  14. 1
      tools/doxygen/Doxyfile.c++.internal
  15. 1
      tools/doxygen/Doxyfile.core.internal
  16. 24
      tools/run_tests/generated/tests.json

33
CMakeLists.txt generated

@ -1412,6 +1412,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx retry_transparent_not_sent_on_wire_test)
add_dependencies(buildtests_cxx retry_unref_before_finish_test)
add_dependencies(buildtests_cxx retry_unref_before_recv_test)
add_dependencies(buildtests_cxx ring_buffer_test)
add_dependencies(buildtests_cxx ring_hash_test)
add_dependencies(buildtests_cxx rls_end2end_test)
add_dependencies(buildtests_cxx rls_lb_config_parser_test)
@ -27274,6 +27275,38 @@ target_link_libraries(retry_unref_before_recv_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(ring_buffer_test
test/core/util/ring_buffer_test.cc
)
target_compile_features(ring_buffer_test PUBLIC cxx_std_14)
target_include_directories(ring_buffer_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(ring_buffer_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
)
endif()
if(gRPC_BUILD_TESTS)

1
Package.swift generated

@ -1942,6 +1942,7 @@ let package = Package(
"src/core/util/posix/sync.cc",
"src/core/util/posix/time.cc",
"src/core/util/posix/tmpfile.cc",
"src/core/util/ring_buffer.h",
"src/core/util/spinlock.h",
"src/core/util/string.cc",
"src/core/util/string.h",

@ -1218,6 +1218,7 @@ libs:
- src/core/util/json/json_util.h
- src/core/util/json/json_writer.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- src/core/util/upb_utils.h
- src/core/xds/grpc/certificate_provider_store.h
@ -2705,6 +2706,7 @@ libs:
- src/core/util/json/json_reader.h
- src/core/util/json/json_writer.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- src/core/util/upb_utils.h
- third_party/upb/upb/generated_code_support.h
@ -4755,6 +4757,7 @@ libs:
- src/core/util/json/json_reader.h
- src/core/util/json/json_writer.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
src:
@ -5484,6 +5487,7 @@ targets:
- src/core/lib/promise/seq.h
- src/core/lib/promise/wait_set.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/debug/trace.cc
@ -6714,6 +6718,7 @@ targets:
- src/core/lib/transport/status_conversion.h
- src/core/lib/transport/timeout_encoding.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- test/core/promise/poll_matcher.h
- third_party/upb/upb/generated_code_support.h
@ -6901,6 +6906,7 @@ targets:
- src/core/lib/promise/status_flag.h
- src/core/lib/transport/call_state.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- test/core/promise/poll_matcher.h
src:
- src/core/lib/debug/trace.cc
@ -7245,6 +7251,7 @@ targets:
- src/core/util/json/json_args.h
- src/core/util/json/json_writer.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
src:
@ -7898,6 +7905,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
src:
@ -8693,6 +8701,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
src:
@ -10241,6 +10250,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
src:
@ -10791,6 +10801,7 @@ targets:
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/http2_errors.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
src:
@ -10907,6 +10918,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- test/core/promise/test_wakeup_schedulers.h
- third_party/upb/upb/generated_code_support.h
@ -12491,6 +12503,7 @@ targets:
- src/core/lib/promise/poll.h
- src/core/lib/promise/seq.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/debug/trace.cc
@ -12581,6 +12594,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- test/core/promise/test_context.h
- third_party/upb/upb/generated_code_support.h
@ -13053,6 +13067,7 @@ targets:
- src/core/lib/promise/poll.h
- src/core/lib/promise/seq.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/debug/trace.cc
@ -13286,6 +13301,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- test/core/promise/test_wakeup_schedulers.h
- third_party/upb/upb/generated_code_support.h
@ -13840,6 +13856,7 @@ targets:
- src/core/lib/promise/promise.h
- src/core/lib/promise/wait_set.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- test/core/promise/poll_matcher.h
src:
- src/core/lib/debug/trace.cc
@ -14153,6 +14170,7 @@ targets:
- src/core/lib/promise/observable.h
- src/core/lib/promise/poll.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- test/core/promise/poll_matcher.h
src:
- src/core/lib/debug/trace.cc
@ -14507,6 +14525,7 @@ targets:
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- src/core/util/spinlock.h
- third_party/upb/upb/generated_code_support.h
src:
@ -15072,6 +15091,7 @@ targets:
- src/core/lib/promise/promise_mutex.h
- src/core/lib/promise/seq.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/debug/trace.cc
@ -18044,6 +18064,17 @@ targets:
- grpc_authorization_provider
- grpc_unsecure
- grpc_test_util
- name: ring_buffer_test
gtest: true
build: test
language: c++
headers:
- src/core/util/ring_buffer.h
src:
- test/core/util/ring_buffer_test.cc
deps:
- gtest
uses_polling: false
- name: ring_hash_test
gtest: true
build: test
@ -20861,6 +20892,7 @@ targets:
- src/core/lib/promise/poll.h
- src/core/lib/promise/wait_for_callback.h
- src/core/util/latent_see.h
- src/core/util/ring_buffer.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/debug/trace.cc

2
gRPC-C++.podspec generated

@ -1322,6 +1322,7 @@ Pod::Spec.new do |s|
'src/core/util/json/json_util.h',
'src/core/util/json/json_writer.h',
'src/core/util/latent_see.h',
'src/core/util/ring_buffer.h',
'src/core/util/spinlock.h',
'src/core/util/string.h',
'src/core/util/time_precise.h',
@ -2605,6 +2606,7 @@ Pod::Spec.new do |s|
'src/core/util/json/json_util.h',
'src/core/util/json/json_writer.h',
'src/core/util/latent_see.h',
'src/core/util/ring_buffer.h',
'src/core/util/spinlock.h',
'src/core/util/string.h',
'src/core/util/time_precise.h',

2
gRPC-Core.podspec generated

@ -2058,6 +2058,7 @@ Pod::Spec.new do |s|
'src/core/util/posix/sync.cc',
'src/core/util/posix/time.cc',
'src/core/util/posix/tmpfile.cc',
'src/core/util/ring_buffer.h',
'src/core/util/spinlock.h',
'src/core/util/string.cc',
'src/core/util/string.h',
@ -3386,6 +3387,7 @@ Pod::Spec.new do |s|
'src/core/util/json/json_util.h',
'src/core/util/json/json_writer.h',
'src/core/util/latent_see.h',
'src/core/util/ring_buffer.h',
'src/core/util/spinlock.h',
'src/core/util/string.h',
'src/core/util/time_precise.h',

1
grpc.gemspec generated

@ -1944,6 +1944,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/util/posix/sync.cc )
s.files += %w( src/core/util/posix/time.cc )
s.files += %w( src/core/util/posix/tmpfile.cc )
s.files += %w( src/core/util/ring_buffer.h )
s.files += %w( src/core/util/spinlock.h )
s.files += %w( src/core/util/string.cc )
s.files += %w( src/core/util/string.h )

1
package.xml generated

@ -1926,6 +1926,7 @@
<file baseinstalldir="/" name="src/core/util/posix/sync.cc" role="src" />
<file baseinstalldir="/" name="src/core/util/posix/time.cc" role="src" />
<file baseinstalldir="/" name="src/core/util/posix/tmpfile.cc" role="src" />
<file baseinstalldir="/" name="src/core/util/ring_buffer.h" role="src" />
<file baseinstalldir="/" name="src/core/util/spinlock.h" role="src" />
<file baseinstalldir="/" name="src/core/util/string.cc" role="src" />
<file baseinstalldir="/" name="src/core/util/string.h" role="src" />

@ -148,10 +148,25 @@ grpc_cc_library(
],
deps = [
"per_cpu",
"ring_buffer",
"//:gpr",
],
)
grpc_cc_library(
name = "ring_buffer",
srcs = [],
hdrs = [
"util/ring_buffer.h",
],
external_deps = [
"absl/types:optional",
],
deps = [
"//:gpr_platform",
],
)
grpc_cc_library(
name = "transport_fwd",
hdrs = [

@ -15,10 +15,17 @@
#include "src/core/util/latent_see.h"
#ifdef GRPC_ENABLE_LATENT_SEE
#include <atomic>
#include <chrono>
#include <cstdint>
#include <string>
#include <vector>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/util/ring_buffer.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
namespace grpc_core {
@ -34,7 +41,10 @@ std::string Log::GenerateJson() {
std::vector<RecordedEvent> events;
for (auto& fragment : fragments_) {
MutexLock lock(&fragment.mu);
events.insert(events.end(), fragment.events.begin(), fragment.events.end());
for (auto it = fragment.events.begin(); it != fragment.events.end(); ++it) {
events.push_back(*it);
}
fragment.events.Clear();
}
absl::optional<std::chrono::steady_clock::time_point> start_time;
for (auto& event : events) {
@ -103,7 +113,7 @@ void Log::FlushBin(Bin* bin) {
{
MutexLock lock(&fragment.mu);
for (auto event : bin->events) {
fragment.events.push_back(RecordedEvent{thread_id, batch_id, event});
fragment.events.Append(RecordedEvent{thread_id, batch_id, event});
}
}
bin->events.clear();

@ -18,8 +18,12 @@
#include <grpc/support/port_platform.h>
#ifdef GRPC_ENABLE_LATENT_SEE
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <string>
#include <utility>
#include <vector>
@ -27,6 +31,7 @@
#include "src/core/lib/gprpp/per_cpu.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/util/ring_buffer.h"
namespace grpc_core {
namespace latent_see {
@ -59,6 +64,7 @@ struct Bin {
class Log {
public:
static constexpr int kMaxEventsPerCpu = 50000;
static Bin* MaybeStartBin(void* owner) {
if (bin_ != nullptr) return bin_;
Bin* bin = free_bins_.load(std::memory_order_acquire);
@ -120,7 +126,8 @@ class Log {
static std::atomic<Bin*> free_bins_;
struct Fragment {
Mutex mu;
std::vector<RecordedEvent> events ABSL_GUARDED_BY(mu);
grpc_core::RingBuffer<RecordedEvent, Log::kMaxEventsPerCpu> events
ABSL_GUARDED_BY(mu);
};
PerCpu<Fragment> fragments_{PerCpuOptions()};
};

@ -0,0 +1,109 @@
#ifndef GRPC_SRC_CORE_UTIL_RING_BUFFER_H_
#define GRPC_SRC_CORE_UTIL_RING_BUFFER_H_
#include <grpc/support/port_platform.h>
#include <array>
#include <cstddef>
#include <iterator>
#include "absl/types/optional.h"
namespace grpc_core {
template <typename T, int kCapacity>
class RingBuffer {
public:
class RingBufferIterator {
public:
using iterator_category = std::forward_iterator_tag;
using value_type = const T;
using pointer = void;
using reference = void;
using difference_type = std::ptrdiff_t;
RingBufferIterator& operator++() {
if (--size_ <= 0) {
head_ = 0;
size_ = 0;
buffer_ = nullptr;
} else {
head_ = (head_ + 1) % kCapacity;
}
return *this;
}
RingBufferIterator operator++(int) {
RingBufferIterator tmp(*this);
operator++();
return tmp;
}
bool operator==(const RingBufferIterator& rhs) const {
return (buffer_ == rhs.buffer_ && head_ == rhs.head_ &&
size_ == rhs.size_);
}
bool operator!=(const RingBufferIterator& rhs) const {
return !operator==(rhs);
}
T operator*() { return buffer_->data_[head_]; }
RingBufferIterator() : buffer_(nullptr), head_(0), size_(0) {};
RingBufferIterator(const RingBufferIterator& other) = default;
RingBufferIterator(const RingBuffer<T, kCapacity>* buffer)
: buffer_(buffer), head_(buffer->head_), size_(buffer->size_) {
if (!size_) {
buffer_ = nullptr;
}
}
private:
friend class RingBuffer<T, kCapacity>;
const RingBuffer<T, kCapacity>* buffer_;
int head_ = 0;
int size_ = 0;
};
RingBuffer() = default;
void Append(T data) {
if (size_ < kCapacity) {
data_[size_] = std::move(data);
size_++;
} else {
data_[head_] = std::move(data);
head_ = (head_ + 1) % kCapacity;
}
}
// Returns the data of the first element in the buffer and removes it from
// the buffer. If the buffer is empty, returns absl::nullopt.
absl::optional<T> PopIfNotEmpty() {
if (!size_) return absl::nullopt;
T data = std::move(data_[head_]);
--size_;
head_ = (head_ + 1) % kCapacity;
return data;
}
void Clear() {
head_ = 0;
size_ = 0;
}
RingBufferIterator begin() const { return RingBufferIterator(this); }
RingBufferIterator end() const { return RingBufferIterator(); }
private:
friend class RingBufferIterator;
std::array<T, kCapacity> data_;
int head_ = 0;
int size_ = 0;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_UTIL_RING_BUFFER_H_

@ -124,3 +124,16 @@ grpc_cc_test(
"//src/core:useful",
],
)
grpc_cc_test(
name = "ring_buffer_test",
srcs = ["ring_buffer_test.cc"],
external_deps = ["gtest"],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:gpr_platform",
"//src/core:ring_buffer",
],
)

@ -0,0 +1,88 @@
//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "gtest/gtest.h"
#include "src/core/util/ring_buffer.h"
namespace grpc_core {
constexpr int kBufferCapacity = 1000;
TEST(RingBufferTest, BufferAppendPopTest) {
RingBuffer<int, kBufferCapacity> buffer;
EXPECT_FALSE(buffer.PopIfNotEmpty().has_value());
for (int i = 0; i < (3 * kBufferCapacity)/2; ++i) {
buffer.Append(i);
}
// Pop half of the elements. Elements in [kBufferCapacity / 2,
// kBufferCapacity) are popped.
int j = kBufferCapacity / 2;
for (int i = 0; i < kBufferCapacity / 2; ++i) {
EXPECT_EQ(buffer.PopIfNotEmpty(), j++);
}
EXPECT_EQ(j, kBufferCapacity);
// Iterate over the remaining elements.
for (auto it = buffer.begin(); it != buffer.end(); ++it) {
EXPECT_EQ(*it, j++);
}
// Elements in [kBufferCapacity, (3 * kBufferCapacity) / 2) should be present.
EXPECT_EQ(j, (3 * kBufferCapacity) / 2);
// Append some more elements. The buffer should now have elements in
// [kBufferCapacity, 2 * kBufferCapacity).
for (int i = 0; i < kBufferCapacity / 2; ++i) {
buffer.Append(j++);
}
// Pop all the elements.
j = kBufferCapacity;
while (true) {
auto ret = buffer.PopIfNotEmpty();
if (!ret.has_value()) break;
EXPECT_EQ(*ret, j++);
}
EXPECT_EQ(j, 2 * kBufferCapacity);
}
TEST(RingBufferTest, BufferAppendIterateTest) {
RingBuffer<int, kBufferCapacity> buffer;
for (int i = 0; i < 5 * kBufferCapacity; ++i) {
buffer.Append(i);
int j = std::max(0, i + 1 - kBufferCapacity);
// If i >= kBufferCapacity, check that the buffer contains only the last
// kBufferCapacity elements [i + 1 - kBufferCapacity, i]. Otherwise check
// that the buffer contains all elements from 0 to i.
for (auto it = buffer.begin(); it != buffer.end(); ++it) {
EXPECT_EQ(*it, j++);
}
// Check that j was incremented at each step which implies that all the
// required elements were present in the buffer.
EXPECT_EQ(j, i + 1);
}
buffer.Clear();
EXPECT_EQ(buffer.begin(), buffer.end());
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -2948,6 +2948,7 @@ src/core/util/posix/string.cc \
src/core/util/posix/sync.cc \
src/core/util/posix/time.cc \
src/core/util/posix/tmpfile.cc \
src/core/util/ring_buffer.h \
src/core/util/spinlock.h \
src/core/util/string.cc \
src/core/util/string.h \

@ -2728,6 +2728,7 @@ src/core/util/posix/string.cc \
src/core/util/posix/sync.cc \
src/core/util/posix/time.cc \
src/core/util/posix/tmpfile.cc \
src/core/util/ring_buffer.h \
src/core/util/spinlock.h \
src/core/util/string.cc \
src/core/util/string.h \

@ -9083,6 +9083,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "ring_buffer_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save