Add schedule over write experiment (#32455)

Allow cancellation op to be scheduled over write (b/244176317).
pull/32370/head
Alisha Nanda 2 years ago committed by GitHub
parent a944a06755
commit 29d4679750
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 28
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  3. 12
      src/core/ext/transport/chttp2/transport/internal.h
  4. 4
      src/core/lib/experiments/experiments.cc
  5. 7
      src/core/lib/experiments/experiments.h
  6. 9
      src/core/lib/experiments/experiments.yaml

@ -3686,10 +3686,12 @@ grpc_cc_library(
"//src/core:chttp2_flow_control",
"//src/core:closure",
"//src/core:error",
"//src/core:experiments",
"//src/core:http2_errors",
"//src/core:http2_settings",
"//src/core:init_internally",
"//src/core:iomgr_fwd",
"//src/core:iomgr_port",
"//src/core:memory_quota",
"//src/core:poll",
"//src/core:ref_counted",

@ -63,6 +63,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/crash.h"
@ -75,6 +76,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
@ -93,6 +95,10 @@
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/iomgr/ev_posix.h"
#endif
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
@ -569,6 +575,14 @@ grpc_chttp2_transport::grpc_chttp2_transport(
if (grpc_core::test_only_init_callback != nullptr) {
grpc_core::test_only_init_callback();
}
#ifdef GRPC_POSIX_SOCKET_TCP
closure_barrier_may_cover_write =
grpc_event_engine_run_in_background() &&
grpc_core::IsScheduleCancellationOverWriteEnabled()
? 0
: CLOSURE_BARRIER_MAY_COVER_WRITE;
#endif
}
static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
@ -1156,14 +1170,6 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
}
}
// Flag that this closure barrier may be covering a write in a pollset, and so
// we should not complete this closure until we can prove that the write got
// scheduled
#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
// First bit of the reference count, stored in the high order bits (with the low
// bits being used for flags defined above)
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure* add_closure_barrier(grpc_closure* closure) {
closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
return closure;
@ -1295,7 +1301,7 @@ static void perform_stream_op_locked(void* stream_op,
t->channelz_socket->RecordStreamStartedFromLocal();
}
GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata =
@ -1351,7 +1357,7 @@ static void perform_stream_op_locked(void* stream_op,
t->num_messages_in_next_write++;
grpc_core::global_stats().IncrementHttp2SendMessageSize(
op->payload->send_message.send_message->Length());
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
s->send_message_finished = add_closure_barrier(op->on_complete);
const uint32_t flags = op_payload->send_message.flags;
if (s->write_closed) {
@ -1425,7 +1431,7 @@ static void perform_stream_op_locked(void* stream_op,
if (op->send_trailing_metadata) {
GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata =
op_payload->send_trailing_metadata.send_trailing_metadata;

@ -69,6 +69,14 @@
#include "src/core/lib/transport/transport_fwd.h"
#include "src/core/lib/transport/transport_impl.h"
// Flag that this closure barrier may be covering a write in a pollset, and so
// we should not complete this closure until we can prove that the write got
// scheduled
#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
// First bit of the reference count, stored in the high order bits (with the low
// bits being used for flags defined above)
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
namespace grpc_core {
class ContextList;
}
@ -470,6 +478,10 @@ struct grpc_chttp2_transport
/// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to
/// the peer
bool enable_preferred_rx_crypto_frame_advertisement = false;
/// Set to non zero if closures associated with the transport may be
/// covering a write in a pollset. Such closures cannot be scheduled until
/// we can prove that the write got scheduled.
uint8_t closure_barrier_may_cover_write = CLOSURE_BARRIER_MAY_COVER_WRITE;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
};

@ -54,6 +54,8 @@ const char* const description_transport_supplies_client_latency =
"opencensus";
const char* const description_event_engine_listener =
"Use EventEngine listeners instead of iomgr's grpc_tcp_server";
const char* const description_schedule_cancellation_over_write =
"Allow cancellation op to be scheduled over a write";
} // namespace
namespace grpc_core {
@ -75,6 +77,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"transport_supplies_client_latency",
description_transport_supplies_client_latency, false},
{"event_engine_listener", description_event_engine_listener, false},
{"schedule_cancellation_over_write",
description_schedule_cancellation_over_write, false},
};
} // namespace grpc_core

@ -72,6 +72,7 @@ inline bool IsFreeLargeAllocatorEnabled() { return false; }
inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsTransportSuppliesClientLatencyEnabled() { return false; }
inline bool IsEventEngineListenerEnabled() { return false; }
inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
#else
#define GRPC_EXPERIMENT_IS_INCLUDED_TCP_FRAME_SIZE_TUNING
inline bool IsTcpFrameSizeTuningEnabled() { return IsExperimentEnabled(0); }
@ -107,8 +108,12 @@ inline bool IsTransportSuppliesClientLatencyEnabled() {
}
#define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_LISTENER
inline bool IsEventEngineListenerEnabled() { return IsExperimentEnabled(12); }
#define GRPC_EXPERIMENT_IS_INCLUDED_SCHEDULE_CANCELLATION_OVER_WRITE
inline bool IsScheduleCancellationOverWriteEnabled() {
return IsExperimentEnabled(13);
}
constexpr const size_t kNumExperiments = 13;
constexpr const size_t kNumExperiments = 14;
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];
#endif

@ -131,6 +131,13 @@
description:
Use EventEngine listeners instead of iomgr's grpc_tcp_server
default: false
expiry: 2023/02/13
expiry: 2023/05/13
owner: vigneshbabu@google.com
test_tags: ["event_engine_listener_test"]
- name: schedule_cancellation_over_write
description:
Allow cancellation op to be scheduled over a write
default: false
expiry: 2023/07/01
owner: vigneshbabu@google.com
test_tags: []

Loading…
Cancel
Save