[combiner] Add a force-offload mechanism (#34377)

Add a mechanism to allow the transport to force an offload when it knows
that's appropriate.
pull/34382/head
Craig Tiller 2 years ago committed by GitHub
parent d670ffa92c
commit 4cfa676045
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/lib/iomgr/combiner.cc
  2. 2
      src/core/lib/iomgr/combiner.h
  3. 2
      src/core/lib/iomgr/exec_ctx.h
  4. 45
      test/core/iomgr/combiner_test.cc

@ -328,4 +328,10 @@ void Combiner::Run(grpc_closure* closure, grpc_error_handle error) {
void Combiner::FinallyRun(grpc_closure* closure, grpc_error_handle error) {
combiner_finally_exec(this, closure, error);
}
void Combiner::ForceOffload() {
gpr_atm_no_barrier_store(&initiating_exec_ctx_or_null, 0);
ExecCtx::Get()->SetReadyToFinishFlag();
}
} // namespace grpc_core

@ -36,6 +36,8 @@ class Combiner {
void Run(grpc_closure* closure, grpc_error_handle error);
// TODO(yashkt) : Remove this method
void FinallyRun(grpc_closure* closure, grpc_error_handle error);
// Force the next combiner execution to be offloaded
void ForceOffload();
Combiner* next_combiner_on_this_exec_ctx = nullptr;
MultiProducerSingleConsumerQueue queue;
// either:

@ -184,6 +184,8 @@ class GRPC_DLL ExecCtx {
}
}
void SetReadyToFinishFlag() { flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; }
Timestamp Now() { return Timestamp::Now(); }
void InvalidateNow() { time_cache_.InvalidateCache(); }
void SetNowIomgrShutdown() {

@ -18,6 +18,8 @@
#include "src/core/lib/iomgr/combiner.h"
#include <thread>
#include <gtest/gtest.h>
#include <grpc/grpc.h>
@ -26,11 +28,11 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/thd.h"
#include "test/core/util/test_config.h"
TEST(CombinerTest, TestNoOp) {
gpr_log(GPR_DEBUG, "test_no_op");
grpc_core::ExecCtx exec_ctx;
GRPC_COMBINER_UNREF(grpc_combiner_create(), "test_no_op");
}
@ -40,8 +42,6 @@ static void set_event_to_true(void* value, grpc_error_handle /*error*/) {
}
TEST(CombinerTest, TestExecuteOne) {
gpr_log(GPR_DEBUG, "test_execute_one");
grpc_core::Combiner* lock = grpc_combiner_create();
gpr_event done;
gpr_event_init(&done);
@ -94,8 +94,6 @@ static void execute_many_loop(void* a) {
}
TEST(CombinerTest, TestExecuteMany) {
gpr_log(GPR_DEBUG, "test_execute_many");
grpc_core::Combiner* lock = grpc_combiner_create();
grpc_core::Thread thds[10];
thd_args ta[GPR_ARRAY_SIZE(thds)];
@ -122,13 +120,11 @@ static void in_finally(void* /*arg*/, grpc_error_handle /*error*/) {
}
static void add_finally(void* arg, grpc_error_handle /*error*/) {
static_cast<grpc_core::Combiner*>(arg)->Run(
static_cast<grpc_core::Combiner*>(arg)->FinallyRun(
GRPC_CLOSURE_CREATE(in_finally, arg, nullptr), absl::OkStatus());
}
TEST(CombinerTest, TestExecuteFinally) {
gpr_log(GPR_DEBUG, "test_execute_finally");
grpc_core::Combiner* lock = grpc_combiner_create();
grpc_core::ExecCtx exec_ctx;
gpr_event_init(&got_in_finally);
@ -140,6 +136,39 @@ TEST(CombinerTest, TestExecuteFinally) {
GRPC_COMBINER_UNREF(lock, "test_execute_finally");
}
TEST(CombinerTest, TestForceOffload) {
grpc_core::Combiner* lock = grpc_combiner_create();
grpc_core::ExecCtx exec_ctx;
grpc_core::Notification done;
const auto start_thread = std::this_thread::get_id();
lock->Run(grpc_core::NewClosure([&](grpc_error_handle) {
// Initial execution should get done in the exec ctx flush below,
// so thread stays the same.
EXPECT_EQ(start_thread, std::this_thread::get_id());
lock->Run(grpc_core::NewClosure([&](grpc_error_handle) {
// Next one should stick to the same thread too
// (proves we're not offloading all the time).
EXPECT_EQ(start_thread, std::this_thread::get_id());
// Force the offload.
lock->ForceOffload();
lock->Run(
grpc_core::NewClosure([&](grpc_error_handle) {
// We should see *not* the starting thread being
// the executor now.
EXPECT_NE(start_thread,
std::this_thread::get_id());
done.Notify();
}),
absl::OkStatus());
}),
absl::OkStatus());
}),
absl::OkStatus());
exec_ctx.Flush();
done.WaitForNotification();
GRPC_COMBINER_UNREF(lock, "test_force_offload");
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);

Loading…
Cancel
Save