[fuzzing] Fix timeout in retry_exceeds_buffer_size_in_delay (#34627)

Fix b/304114403

- adds a new experimental tracer useful for diagnosing ping timeout
failures in unit tests
- adds a pair of experimental tracers for fuzzing event engine
- fix the behavior of FuzzingEventEngine so that a RunAfter(0, ...) runs
in the same tick
- up the rate of sends (reduce the send delay) so we guarantee to be
able to send 200kb/sec in fuzzed e2e unit tests

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/34556/head
Craig Tiller 1 year ago committed by GitHub
parent be1cf357ba
commit ec49866463
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      doc/environment_variables.md
  2. 2
      src/core/BUILD
  3. 8
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  4. 12
      src/core/ext/transport/chttp2/transport/frame_ping.cc
  5. 7
      src/core/ext/transport/chttp2/transport/ping_callbacks.cc
  6. 9
      src/core/ext/transport/chttp2/transport/ping_callbacks.h
  7. 34
      src/core/ext/transport/chttp2/transport/writing.cc
  8. 22502
      test/core/end2end/end2end_test_corpus/retry_exceeds_buffer_size_in_delay/4539040427737088
  9. 2
      test/core/end2end/end2end_test_fuzzer.cc
  10. 39
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  11. 2
      test/core/transport/chttp2/ping_callbacks_test.cc

@ -69,6 +69,7 @@ some configuration as environment variables that can be set.
- health_check_client - traces health checking client code
- http - traces state in the http2 transport engine
- http2_stream_state - traces all http2 stream state mutations.
- http2_ping - traces pings/ping acks/antagonist writes in http2 stack.
- http1 - traces HTTP/1.x operations performed by gRPC
- inproc - traces the in-process transport
- http_keepalive - traces gRPC keepalive pings

@ -5704,12 +5704,14 @@ grpc_cc_library(
"absl/meta:type_traits",
"absl/random:bit_gen_ref",
"absl/random:distributions",
"absl/types:optional",
],
deps = [
"time",
"//:event_engine_base_hdrs",
"//:gpr",
"//:gpr_platform",
"//:grpc_trace",
],
)

@ -1071,6 +1071,10 @@ static void write_action(grpc_chttp2_transport* t) {
if (max_frame_size == 0) {
max_frame_size = INT_MAX;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace)) {
gpr_log(GPR_INFO, "%s[%p]: Write %" PRIdPTR " bytes",
t->is_client ? "CLIENT" : "SERVER", t, t->outbuf.Length());
}
grpc_endpoint_write(t->ep, t->outbuf.c_slice_buffer(),
grpc_core::InitTransportClosure<write_action_end>(
t->Ref(), &t->write_action_end_locked),
@ -1080,6 +1084,10 @@ static void write_action(grpc_chttp2_transport* t) {
static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
grpc_error_handle error) {
auto* tp = t.get();
if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace)) {
gpr_log(GPR_INFO, "%s[%p]: Finish write",
t->is_client ? "CLIENT" : "SERVER", t.get());
}
tp->combiner->Run(grpc_core::InitTransportClosure<write_action_end_locked>(
std::move(t), &tp->write_action_end_locked),
error);

@ -20,6 +20,7 @@
#include "src/core/ext/transport/chttp2/transport/frame_ping.h"
#include <inttypes.h>
#include <string.h>
#include <algorithm>
@ -35,6 +36,7 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/lib/debug/trace.h"
extern grpc_core::TraceFlag grpc_keepalive_trace;
@ -96,18 +98,26 @@ grpc_error_handle grpc_chttp2_ping_parser_parse(void* parser,
if (p->byte == 8) {
GPR_ASSERT(is_last);
if (p->is_ack) {
if (grpc_ping_trace.enabled()) {
gpr_log(GPR_INFO, "%s[%p]: received ping ack %" PRIx64,
t->is_client ? "CLIENT" : "SERVER", t, p->opaque_8bytes);
}
grpc_chttp2_ack_ping(t, p->opaque_8bytes);
} else {
if (!t->is_client) {
const bool transport_idle =
t->keepalive_permit_without_calls == 0 && t->stream_map.empty();
if (grpc_keepalive_trace.enabled() || grpc_http_trace.enabled()) {
gpr_log(GPR_INFO, "t=%p received ping: %s", t,
gpr_log(GPR_INFO, "SERVER[%p]: received ping %" PRIx64 ": %s", t,
p->opaque_8bytes,
t->ping_abuse_policy.GetDebugString(transport_idle).c_str());
}
if (t->ping_abuse_policy.ReceivedOnePing(transport_idle)) {
grpc_chttp2_exceeded_ping_strikes(t);
}
} else if (grpc_ping_trace.enabled()) {
gpr_log(GPR_INFO, "CLIENT[%p]: received ping %" PRIx64, t,
p->opaque_8bytes);
}
if (t->ack_pings) {
if (t->ping_ack_count == t->ping_ack_capacity) {

@ -23,6 +23,8 @@
#include <grpc/support/log.h>
grpc_core::TraceFlag grpc_ping_trace(false, "http2_ping");
namespace grpc_core {
void Chttp2PingCallbacks::OnPing(Callback on_start, Callback on_ack) {
@ -90,16 +92,17 @@ void Chttp2PingCallbacks::CancelAll(
ping_requested_ = false;
}
void Chttp2PingCallbacks::OnPingTimeout(
absl::optional<uint64_t> Chttp2PingCallbacks::OnPingTimeout(
Duration ping_timeout,
grpc_event_engine::experimental::EventEngine* event_engine,
Callback callback) {
GPR_ASSERT(started_new_ping_without_setting_timeout_);
started_new_ping_without_setting_timeout_ = false;
auto it = inflight_.find(most_recent_inflight_);
if (it == inflight_.end()) return;
if (it == inflight_.end()) return absl::nullopt;
it->second.on_timeout =
event_engine->RunAfter(ping_timeout, std::move(callback));
return most_recent_inflight_;
}
} // namespace grpc_core

@ -27,11 +27,15 @@
#include "absl/functional/any_invocable.h"
#include "absl/hash/hash.h"
#include "absl/random/bit_gen_ref.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/time.h"
extern grpc_core::TraceFlag grpc_ping_trace;
namespace grpc_core {
class Chttp2PingCallbacks {
@ -84,7 +88,10 @@ class Chttp2PingCallbacks {
// Add a ping timeout for the most recently started ping.
// started_new_ping_without_setting_timeout must be set.
// Clears started_new_ping_without_setting_timeout.
void OnPingTimeout(Duration ping_timeout,
// Returns the ping id of the ping the timeout was attached to if a timer was
// started, or nullopt otherwise.
absl::optional<uint64_t> OnPingTimeout(
Duration ping_timeout,
grpc_event_engine::experimental::EventEngine* event_engine,
Callback callback);

@ -132,9 +132,10 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
grpc_core::global_stats().IncrementHttp2PingsSent();
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Ping sent [%s]: %s",
t->is_client ? "CLIENT" : "SERVER",
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace)) {
gpr_log(GPR_INFO, "%s[%p]: Ping %" PRIx64 " sent [%s]: %s",
t->is_client ? "CLIENT" : "SERVER", t, id,
std::string(t->peer_string.as_string_view()).c_str(),
t->ping_rate_policy.GetDebugString().c_str());
}
@ -143,9 +144,11 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
// need to receive something of substance before sending a ping again
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace)) {
gpr_log(GPR_INFO,
"CLIENT: Ping delayed [%s]: too many recent pings: %s",
"%s[%p]: Ping delayed [%s]: too many recent pings: %s",
t->is_client ? "CLIENT" : "SERVER", t,
std::string(t->peer_string.as_string_view()).c_str(),
t->ping_rate_policy.GetDebugString().c_str());
}
@ -154,12 +157,13 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
// not enough elapsed time between successive pings
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO,
"%s: Ping delayed [%s]: not enough time elapsed since last "
"ping. "
" Last ping:%s, minimum wait:%s need to wait:%s",
t->is_client ? "CLIENT" : "SERVER",
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace)) {
gpr_log(
GPR_INFO,
"%s[%p]: Ping delayed [%s]: not enough time elapsed since last "
"ping. Last ping:%s, minimum wait:%s need to wait:%s",
t->is_client ? "CLIENT" : "SERVER", t,
std::string(t->peer_string.as_string_view()).c_str(),
too_soon.last_ping.ToString().c_str(),
too_soon.next_allowed_ping_interval.ToString().c_str(),
@ -706,12 +710,18 @@ void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
t->keepalive_timeout != grpc_core::Duration::Infinity()) {
// Set ping timeout after finishing write so we don't measure our own send
// time.
t->ping_callbacks.OnPingTimeout(
auto id = t->ping_callbacks.OnPingTimeout(
t->keepalive_timeout, t->event_engine.get(), [t = t->Ref()] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_chttp2_ping_timeout(t);
});
if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace) && id.has_value()) {
gpr_log(GPR_INFO,
"%s[%p]: Set ping timeout timer of %s for ping id %" PRIx64,
t->is_client ? "CLIENT" : "SERVER", t,
t->keepalive_timeout.ToString().c_str(), id.value());
}
}
while (grpc_chttp2_list_pop_writing_stream(t, &s)) {

@ -116,7 +116,7 @@ DEFINE_PROTO_FUZZER(const core_end2end_test_fuzzer::Msg& msg) {
[actions = msg.event_engine_actions()]() {
FuzzingEventEngine::Options options;
options.max_delay_run_after = std::chrono::milliseconds(500);
options.max_delay_write = std::chrono::milliseconds(2);
options.max_delay_write = std::chrono::microseconds(5);
return std::make_unique<FuzzingEventEngine>(options, actions);
});
auto engine =

@ -14,6 +14,7 @@
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include <inttypes.h>
#include <stdlib.h>
#include <algorithm>
@ -30,6 +31,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/time.h"
@ -40,6 +42,9 @@
extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
static grpc_core::TraceFlag trace_writes(false, "fuzzing_ee_writes");
static grpc_core::TraceFlag trace_timers(false, "fuzzing_ee_timers");
using namespace std::chrono_literals;
namespace grpc_event_engine {
@ -133,10 +138,13 @@ gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) {
}
void FuzzingEventEngine::Tick(Duration max_time) {
bool incremented_time = false;
while (true) {
std::vector<absl::AnyInvocable<void()>> to_run;
{
grpc_core::MutexLock lock(&*mu_);
grpc_core::MutexLock now_lock(&*now_mu_);
if (!incremented_time) {
Duration incr = max_time;
// TODO(ctiller): look at tasks_by_time_ and jump forward (once iomgr
// timers are gone)
@ -155,6 +163,8 @@ void FuzzingEventEngine::Tick(Duration max_time) {
now_ += incr;
GPR_ASSERT(now_.time_since_epoch().count() >= 0);
++current_tick_;
incremented_time = true;
}
// Find newly expired timers.
while (!tasks_by_time_.empty() && tasks_by_time_.begin()->first <= now_) {
auto& task = *tasks_by_time_.begin()->second;
@ -165,9 +175,11 @@ void FuzzingEventEngine::Tick(Duration max_time) {
tasks_by_time_.erase(tasks_by_time_.begin());
}
}
if (to_run.empty()) return;
for (auto& closure : to_run) {
closure();
}
}
}
void FuzzingEventEngine::TickUntilIdle() {
@ -299,6 +311,10 @@ bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
// If the write_len is zero, we still need to write something, so we write one
// byte.
if (write_len == 0) write_len = 1;
if (trace_writes.enabled()) {
gpr_log(GPR_INFO, "WRITE[%p:%d]: %" PRIdPTR " bytes", this, index,
write_len);
}
// Expand the pending buffer.
size_t prev_len = pending[index].size();
pending[index].resize(prev_len + write_len);
@ -526,15 +542,31 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAfterLocked(
RunType run_type, Duration when, absl::AnyInvocable<void()> closure) {
const intptr_t id = next_task_id_;
++next_task_id_;
Duration delay_taken = Duration::zero();
if (run_type != RunType::kExact && !task_delays_.empty()) {
when += grpc_core::Clamp(task_delays_.front(), Duration::zero(),
delay_taken = grpc_core::Clamp(task_delays_.front(), Duration::zero(),
max_delay_[static_cast<int>(run_type)]);
when += delay_taken;
task_delays_.pop();
}
auto task = std::make_shared<Task>(id, std::move(closure));
tasks_by_id_.emplace(id, task);
Time final_time;
Time now;
{
grpc_core::MutexLock lock(&*now_mu_);
tasks_by_time_.emplace(now_ + when, std::move(task));
final_time = now_ + when;
now = now_;
tasks_by_time_.emplace(final_time, std::move(task));
}
if (trace_timers.enabled()) {
gpr_log(GPR_INFO,
"Schedule timer %" PRIx64 " @ %" PRIu64 " (now=%" PRIu64
"; delay=%" PRIu64 "; fuzzing_added=%" PRIu64 "; type=%d)",
id, static_cast<uint64_t>(final_time.time_since_epoch().count()),
now.time_since_epoch().count(), when.count(), delay_taken.count(),
static_cast<int>(run_type));
}
return TaskHandle{id, kTaskHandleSalt};
}
@ -549,6 +581,9 @@ bool FuzzingEventEngine::Cancel(TaskHandle handle) {
if (it->second->closure == nullptr) {
return false;
}
if (trace_timers.enabled()) {
gpr_log(GPR_INFO, "Cancel timer %" PRIx64, id);
}
it->second->closure = nullptr;
return true;
}

@ -587,7 +587,7 @@ TEST(PingCallbacksTest, CancelAllCancelsInflightPings) {
.WillOnce(Return(true));
callbacks.CancelAll(&event_engine);
// Ensure Cancel call comes from CancelAll
testing::Mock::VerifyAndClearExpectations(&event_engine);
::testing::Mock::VerifyAndClearExpectations(&event_engine);
EXPECT_FALSE(acked);
EXPECT_FALSE(callbacks.ping_requested());
// Ping should still be valid, but no callback should be invoked

Loading…
Cancel
Save