diff --git a/BUILD b/BUILD index 9790feda0d6..358c71a8475 100644 --- a/BUILD +++ b/BUILD @@ -3686,10 +3686,12 @@ grpc_cc_library( "//src/core:chttp2_flow_control", "//src/core:closure", "//src/core:error", + "//src/core:experiments", "//src/core:http2_errors", "//src/core:http2_settings", "//src/core:init_internally", "//src/core:iomgr_fwd", + "//src/core:iomgr_port", "//src/core:memory_quota", "//src/core:poll", "//src/core:ref_counted", diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index d16a0fac891..46dc06f24a9 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -63,6 +63,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/gprpp/crash.h" @@ -75,6 +76,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_fwd.h" +#include "src/core/lib/iomgr/port.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/resource_quota/arena.h" @@ -93,6 +95,10 @@ #include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport_impl.h" +#ifdef GRPC_POSIX_SOCKET_TCP +#include "src/core/lib/iomgr/ev_posix.h" +#endif + #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) @@ -569,6 +575,14 @@ grpc_chttp2_transport::grpc_chttp2_transport( if (grpc_core::test_only_init_callback != nullptr) { grpc_core::test_only_init_callback(); } + +#ifdef GRPC_POSIX_SOCKET_TCP + closure_barrier_may_cover_write = + grpc_event_engine_run_in_background() && + grpc_core::IsScheduleCancellationOverWriteEnabled() + ? 0 + : CLOSURE_BARRIER_MAY_COVER_WRITE; +#endif } static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) { @@ -1156,14 +1170,6 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) { } } -// Flag that this closure barrier may be covering a write in a pollset, and so -// we should not complete this closure until we can prove that the write got -// scheduled -#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0) -// First bit of the reference count, stored in the high order bits (with the low -// bits being used for flags defined above) -#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) - static grpc_closure* add_closure_barrier(grpc_closure* closure) { closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT; return closure; @@ -1295,7 +1301,7 @@ static void perform_stream_op_locked(void* stream_op, t->channelz_socket->RecordStreamStartedFromLocal(); } GPR_ASSERT(s->send_initial_metadata_finished == nullptr); - on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; + on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata = @@ -1351,7 +1357,7 @@ static void perform_stream_op_locked(void* stream_op, t->num_messages_in_next_write++; grpc_core::global_stats().IncrementHttp2SendMessageSize( op->payload->send_message.send_message->Length()); - on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; + on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; s->send_message_finished = add_closure_barrier(op->on_complete); const uint32_t flags = op_payload->send_message.flags; if (s->write_closed) { @@ -1425,7 +1431,7 @@ static void perform_stream_op_locked(void* stream_op, if (op->send_trailing_metadata) { GPR_ASSERT(s->send_trailing_metadata_finished == nullptr); - on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; + on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; s->send_trailing_metadata_finished = add_closure_barrier(on_complete); s->send_trailing_metadata = op_payload->send_trailing_metadata.send_trailing_metadata; diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 95cde29ca50..8ad1a17a101 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -69,6 +69,14 @@ #include "src/core/lib/transport/transport_fwd.h" #include "src/core/lib/transport/transport_impl.h" +// Flag that this closure barrier may be covering a write in a pollset, and so +// we should not complete this closure until we can prove that the write got +// scheduled +#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0) +// First bit of the reference count, stored in the high order bits (with the low +// bits being used for flags defined above) +#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) + namespace grpc_core { class ContextList; } @@ -470,6 +478,10 @@ struct grpc_chttp2_transport /// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to /// the peer bool enable_preferred_rx_crypto_frame_advertisement = false; + /// Set to non zero if closures associated with the transport may be + /// covering a write in a pollset. Such closures cannot be scheduled until + /// we can prove that the write got scheduled. + uint8_t closure_barrier_may_cover_write = CLOSURE_BARRIER_MAY_COVER_WRITE; std::shared_ptr event_engine; }; diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 9533bc5eb99..081718c04ee 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -54,6 +54,8 @@ const char* const description_transport_supplies_client_latency = "opencensus"; const char* const description_event_engine_listener = "Use EventEngine listeners instead of iomgr's grpc_tcp_server"; +const char* const description_schedule_cancellation_over_write = + "Allow cancellation op to be scheduled over a write"; } // namespace namespace grpc_core { @@ -75,6 +77,8 @@ const ExperimentMetadata g_experiment_metadata[] = { {"transport_supplies_client_latency", description_transport_supplies_client_latency, false}, {"event_engine_listener", description_event_engine_listener, false}, + {"schedule_cancellation_over_write", + description_schedule_cancellation_over_write, false}, }; } // namespace grpc_core diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 893cea5b6a2..9bdb5a42b54 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -72,6 +72,7 @@ inline bool IsFreeLargeAllocatorEnabled() { return false; } inline bool IsPromiseBasedServerCallEnabled() { return false; } inline bool IsTransportSuppliesClientLatencyEnabled() { return false; } inline bool IsEventEngineListenerEnabled() { return false; } +inline bool IsScheduleCancellationOverWriteEnabled() { return false; } #else #define GRPC_EXPERIMENT_IS_INCLUDED_TCP_FRAME_SIZE_TUNING inline bool IsTcpFrameSizeTuningEnabled() { return IsExperimentEnabled(0); } @@ -107,8 +108,12 @@ inline bool IsTransportSuppliesClientLatencyEnabled() { } #define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_LISTENER inline bool IsEventEngineListenerEnabled() { return IsExperimentEnabled(12); } +#define GRPC_EXPERIMENT_IS_INCLUDED_SCHEDULE_CANCELLATION_OVER_WRITE +inline bool IsScheduleCancellationOverWriteEnabled() { + return IsExperimentEnabled(13); +} -constexpr const size_t kNumExperiments = 13; +constexpr const size_t kNumExperiments = 14; extern const ExperimentMetadata g_experiment_metadata[kNumExperiments]; #endif diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 0934c50c4cf..775f28e0e9a 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -131,6 +131,13 @@ description: Use EventEngine listeners instead of iomgr's grpc_tcp_server default: false - expiry: 2023/02/13 + expiry: 2023/05/13 owner: vigneshbabu@google.com test_tags: ["event_engine_listener_test"] +- name: schedule_cancellation_over_write + description: + Allow cancellation op to be scheduled over a write + default: false + expiry: 2023/07/01 + owner: vigneshbabu@google.com + test_tags: []