Use std::function instead of grpc_closure

reviewable/pr21275/r1
Yash Tibrewal 5 years ago
parent 3b1d176e9d
commit d4d9e45990
  1. 37
      CMakeLists.txt
  2. 48
      Makefile
  3. 10
      build.yaml
  4. 75
      src/core/lib/iomgr/logical_thread.cc
  5. 15
      src/core/lib/iomgr/logical_thread.h
  6. 30
      test/core/iomgr/logical_thread_test.cc
  7. 24
      tools/run_tests/generated/tests.json

@ -779,6 +779,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx json_run_localhost)
endif()
add_dependencies(buildtests_cxx logical_thread_test)
add_dependencies(buildtests_cxx message_allocator_end2end_test)
add_dependencies(buildtests_cxx metrics_client)
add_dependencies(buildtests_cxx mock_test)
@ -13947,6 +13948,42 @@ endif()
endif()
if(gRPC_BUILD_TESTS)
add_executable(logical_thread_test
test/core/iomgr/logical_thread_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(logical_thread_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(logical_thread_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(message_allocator_end2end_test
test/cpp/end2end/message_allocator_end2end_test.cc
third_party/googletest/googletest/src/gtest-all.cc

@ -1256,6 +1256,7 @@ interop_client: $(BINDIR)/$(CONFIG)/interop_client
interop_server: $(BINDIR)/$(CONFIG)/interop_server
interop_test: $(BINDIR)/$(CONFIG)/interop_test
json_run_localhost: $(BINDIR)/$(CONFIG)/json_run_localhost
logical_thread_test: $(BINDIR)/$(CONFIG)/logical_thread_test
message_allocator_end2end_test: $(BINDIR)/$(CONFIG)/message_allocator_end2end_test
metrics_client: $(BINDIR)/$(CONFIG)/metrics_client
mock_test: $(BINDIR)/$(CONFIG)/mock_test
@ -1726,6 +1727,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/interop_server \
$(BINDIR)/$(CONFIG)/interop_test \
$(BINDIR)/$(CONFIG)/json_run_localhost \
$(BINDIR)/$(CONFIG)/logical_thread_test \
$(BINDIR)/$(CONFIG)/message_allocator_end2end_test \
$(BINDIR)/$(CONFIG)/metrics_client \
$(BINDIR)/$(CONFIG)/mock_test \
@ -1898,6 +1900,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/interop_server \
$(BINDIR)/$(CONFIG)/interop_test \
$(BINDIR)/$(CONFIG)/json_run_localhost \
$(BINDIR)/$(CONFIG)/logical_thread_test \
$(BINDIR)/$(CONFIG)/message_allocator_end2end_test \
$(BINDIR)/$(CONFIG)/metrics_client \
$(BINDIR)/$(CONFIG)/mock_test \
@ -2410,6 +2413,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/inproc_sync_unary_ping_pong_test || ( echo test inproc_sync_unary_ping_pong_test failed ; exit 1 )
$(E) "[RUN] Testing interop_test"
$(Q) $(BINDIR)/$(CONFIG)/interop_test || ( echo test interop_test failed ; exit 1 )
$(E) "[RUN] Testing logical_thread_test"
$(Q) $(BINDIR)/$(CONFIG)/logical_thread_test || ( echo test logical_thread_test failed ; exit 1 )
$(E) "[RUN] Testing message_allocator_end2end_test"
$(Q) $(BINDIR)/$(CONFIG)/message_allocator_end2end_test || ( echo test message_allocator_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing mock_test"
@ -18222,6 +18227,49 @@ endif
endif
LOGICAL_THREAD_TEST_SRC = \
test/core/iomgr/logical_thread_test.cc \
LOGICAL_THREAD_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LOGICAL_THREAD_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/logical_thread_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/logical_thread_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/logical_thread_test: $(PROTOBUF_DEP) $(LOGICAL_THREAD_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(LOGICAL_THREAD_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/logical_thread_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/iomgr/logical_thread_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_logical_thread_test: $(LOGICAL_THREAD_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(LOGICAL_THREAD_TEST_OBJS:.o=.dep)
endif
endif
MESSAGE_ALLOCATOR_END2END_TEST_SRC = \
test/cpp/end2end/message_allocator_end2end_test.cc \

@ -5353,6 +5353,16 @@ targets:
- mac
- linux
- posix
- name: logical_thread_test
cpu_cost: 10
build: test
language: c++
src:
- test/core/iomgr/logical_thread_test.cc
deps:
- grpc_test_util
- grpc
- gpr
- name: message_allocator_end2end_test
gtest: true
cpu_cost: 0.5

@ -24,20 +24,29 @@ namespace grpc_core {
DebugOnlyTraceFlag grpc_logical_thread_trace(false, "logical_thread");
void LogicalThread::Run(const DebugLocation& location, grpc_closure* closure,
grpc_error* error) {
(void)location;
struct CallbackWrapper {
#ifndef NDEBUG
closure->file_initiated = location.file();
closure->line_initiated = location.line();
CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc)
: callback(std::move(cb)), location(loc) {}
#else
explicit CallbackWrapper(std::function<void()> cb)
: callback(std::move(cb)) {}
#endif
MultiProducerSingleConsumerQueue::Node mpscq_node;
const std::function<void()> callback;
#ifndef NDEBUG
const DebugLocation location;
#endif
};
void LogicalThread::Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
(void)location;
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO,
"LogicalThread::Run() %p Scheduling closure %p: created: [%s:%d], "
"scheduled [%s:%d]",
this, closure, closure->file_created, closure->line_created,
location.file(), location.line());
gpr_log(GPR_INFO, "LogicalThread::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line());
}
#endif
const size_t prev_size = size_.FetchAdd(1);
if (prev_size == 0) {
// There is no other closure executing right now on this logical thread.
@ -45,24 +54,29 @@ void LogicalThread::Run(const DebugLocation& location, grpc_closure* closure,
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, " Executing immediately");
}
closure->cb(closure->cb_arg, error);
GRPC_ERROR_UNREF(error);
// Loan this thread to the logical thread and drain the queue
callback();
// Loan this thread to the logical thread and drain the queue.
DrainQueue();
} else {
#ifndef NDEBUG
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
#else
CallbackWrapper* cb_wrapper = new CallbackWrapper(std::move(callback));
#endif
// There already are closures executing on this logical thread. Simply add
// this closure to the list.
// this closure to the queue.
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, " Schedule on list");
gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper);
}
closure->error_data.error = error;
queue_.Push(closure->next_data.mpscq_node.get());
queue_.Push(
reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(cb_wrapper));
}
}
// The thread that calls this loans itself to the logical thread so as to
// execute all the scheduled closures. This is called from within
// LogicalThread::Run() after executing a closure immediately, and hence size_
// execute all the scheduled callback. This is called from within
// LogicalThread::Run() after executing a callback immediately, and hence size_
// is atleast 1.
void LogicalThread::DrainQueue() {
while (true) {
@ -78,29 +92,30 @@ void LogicalThread::DrainQueue() {
}
break;
}
// There is atleast one closure on the queue. Pop the closure from the queue
// and execute it.
grpc_closure* closure = nullptr;
// There is atleast one callback on the queue. Pop the callback from the
// queue and execute it.
CallbackWrapper* cb_wrapper = nullptr;
bool empty_unused;
while ((closure = reinterpret_cast<grpc_closure*>(
while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
// This can happen either due to a race condition within the mpscq
// implementation or because of a race with Run()
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
// TODO(yashykt) : The combiner mechanism offloads execution to the
// executor at this point. Figure out if we should replicate that
// behavior here.
gpr_log(GPR_INFO, " Queue returned nullptr, trying again");
}
}
#ifndef NDEBUG
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO,
" Running closure %p: created: [%s:%d], scheduled [%s:%d]",
closure, closure->file_created, closure->line_created,
closure->file_initiated, closure->line_initiated);
" Running item %p : callback scheduled at [%s:%d]", cb_wrapper,
cb_wrapper->location.file(), cb_wrapper->location.line());
}
#endif
grpc_error* closure_error = closure->error_data.error;
closure->cb(closure->cb_arg, closure_error);
GRPC_ERROR_UNREF(closure_error);
cb_wrapper->callback();
delete cb_wrapper;
}
}
} // namespace grpc_core

@ -18,12 +18,13 @@
#include <grpc/support/port_platform.h>
#include <functional>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/iomgr/closure.h"
#ifndef GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H
#define GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H
@ -31,14 +32,14 @@
namespace grpc_core {
extern DebugOnlyTraceFlag grpc_logical_thread_trace;
// LogicalThread is a mechanism to schedule closures in a synchronized manner.
// All closures scheduled on a LogicalThread instance will be executed serially
// in a borrowed thread. The basic algorithm on scheduling closures is as
// follows - 1) If there are no (zero) closures scheduled on the logical thread
// LogicalThread is a mechanism to schedule callbacks in a synchronized manner.
// All callbacks scheduled on a LogicalThread instance will be executed serially
// in a borrowed thread. The API provides a FIFO guarantee to the execution of
// callbacks scheduled on the thread.
class LogicalThread : public RefCounted<LogicalThread> {
public:
void Run(const DebugLocation& location, grpc_closure* closure,
grpc_error* error);
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location);
private:
void DrainQueue();

@ -32,17 +32,11 @@ TEST(LogicalThreadTest, NoOp) {
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
}
void set_event_to_true(void* value, grpc_error* /*error*/) {
gpr_event_set(static_cast<gpr_event*>(value), (void*)1);
}
TEST(LogicalThreadTest, ExecuteOne) {
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
gpr_event done;
gpr_event_init(&done);
lock->Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(set_event_to_true, &done, nullptr),
GRPC_ERROR_NONE);
lock->Run([&done]() { gpr_event_set(&done, (void*)1); }, DEBUG_LOCATION);
GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
}
@ -58,13 +52,6 @@ typedef struct {
size_t value;
} ex_args;
void check_one(void* a, grpc_error* /*error*/) {
ex_args* args = static_cast<ex_args*>(a);
GPR_ASSERT(*args->ctr == args->value - 1);
*args->ctr = args->value;
gpr_free(a);
}
void execute_many_loop(void* a) {
thd_args* args = static_cast<thd_args*>(a);
size_t n = 1;
@ -73,16 +60,19 @@ void execute_many_loop(void* a) {
ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c)));
c->ctr = &args->ctr;
c->value = n++;
args->lock->Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(check_one, c, nullptr),
GRPC_ERROR_NONE);
args->lock->Run(
[c]() {
GPR_ASSERT(*c->ctr == c->value - 1);
*c->ctr = c->value;
gpr_free(c);
},
DEBUG_LOCATION);
}
// sleep for a little bit, to test other threads picking up the load
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
}
args->lock->Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(set_event_to_true, &args->done, nullptr),
GRPC_ERROR_NONE);
args->lock->Run([args]() { gpr_event_set(&args->done, (void*)1); },
DEBUG_LOCATION);
}
TEST(LogicalThreadTest, ExecuteMany) {

@ -5019,6 +5019,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 10,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c++",
"name": "logical_thread_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save