mirror of https://github.com/grpc/grpc.git
parent
a30f2f9501
commit
3b1d176e9d
19 changed files with 332 additions and 0 deletions
@ -0,0 +1,106 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2019 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/lib/iomgr/logical_thread.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
DebugOnlyTraceFlag grpc_logical_thread_trace(false, "logical_thread"); |
||||||
|
|
||||||
|
void LogicalThread::Run(const DebugLocation& location, grpc_closure* closure, |
||||||
|
grpc_error* error) { |
||||||
|
(void)location; |
||||||
|
#ifndef NDEBUG |
||||||
|
closure->file_initiated = location.file(); |
||||||
|
closure->line_initiated = location.line(); |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"LogicalThread::Run() %p Scheduling closure %p: created: [%s:%d], " |
||||||
|
"scheduled [%s:%d]", |
||||||
|
this, closure, closure->file_created, closure->line_created, |
||||||
|
location.file(), location.line()); |
||||||
|
} |
||||||
|
#endif |
||||||
|
const size_t prev_size = size_.FetchAdd(1); |
||||||
|
if (prev_size == 0) { |
||||||
|
// There is no other closure executing right now on this logical thread.
|
||||||
|
// Execute this closure immediately.
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||||
|
gpr_log(GPR_INFO, " Executing immediately"); |
||||||
|
} |
||||||
|
closure->cb(closure->cb_arg, error); |
||||||
|
GRPC_ERROR_UNREF(error); |
||||||
|
// Loan this thread to the logical thread and drain the queue
|
||||||
|
DrainQueue(); |
||||||
|
} else { |
||||||
|
// There already are closures executing on this logical thread. Simply add
|
||||||
|
// this closure to the list.
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||||
|
gpr_log(GPR_INFO, " Schedule on list"); |
||||||
|
} |
||||||
|
closure->error_data.error = error; |
||||||
|
queue_.Push(closure->next_data.mpscq_node.get()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// The thread that calls this loans itself to the logical thread so as to
|
||||||
|
// execute all the scheduled closures. This is called from within
|
||||||
|
// LogicalThread::Run() after executing a closure immediately, and hence size_
|
||||||
|
// is atleast 1.
|
||||||
|
void LogicalThread::DrainQueue() { |
||||||
|
while (true) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||||
|
gpr_log(GPR_INFO, "LogicalThread::DrainQueue() %p", this); |
||||||
|
} |
||||||
|
size_t prev_size = size_.FetchSub(1); |
||||||
|
// prev_size should be atleast 1 since
|
||||||
|
GPR_DEBUG_ASSERT(prev_size >= 1); |
||||||
|
if (prev_size == 1) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||||
|
gpr_log(GPR_INFO, " Queue Drained"); |
||||||
|
} |
||||||
|
break; |
||||||
|
} |
||||||
|
// There is atleast one closure on the queue. Pop the closure from the queue
|
||||||
|
// and execute it.
|
||||||
|
grpc_closure* closure = nullptr; |
||||||
|
bool empty_unused; |
||||||
|
while ((closure = reinterpret_cast<grpc_closure*>( |
||||||
|
queue_.PopAndCheckEnd(&empty_unused))) == nullptr) { |
||||||
|
// This can happen either due to a race condition within the mpscq
|
||||||
|
// implementation or because of a race with Run()
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||||
|
gpr_log(GPR_INFO, " Queue returned nullptr, trying again"); |
||||||
|
} |
||||||
|
} |
||||||
|
#ifndef NDEBUG |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
" Running closure %p: created: [%s:%d], scheduled [%s:%d]", |
||||||
|
closure, closure->file_created, closure->line_created, |
||||||
|
closure->file_initiated, closure->line_initiated); |
||||||
|
} |
||||||
|
#endif |
||||||
|
grpc_error* closure_error = closure->error_data.error; |
||||||
|
closure->cb(closure->cb_arg, closure_error); |
||||||
|
GRPC_ERROR_UNREF(closure_error); |
||||||
|
} |
||||||
|
} |
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,51 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2019 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/lib/debug/trace.h" |
||||||
|
#include "src/core/lib/gprpp/atomic.h" |
||||||
|
#include "src/core/lib/gprpp/debug_location.h" |
||||||
|
#include "src/core/lib/gprpp/mpscq.h" |
||||||
|
#include "src/core/lib/gprpp/ref_counted.h" |
||||||
|
#include "src/core/lib/iomgr/closure.h" |
||||||
|
|
||||||
|
#ifndef GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H |
||||||
|
#define GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
extern DebugOnlyTraceFlag grpc_logical_thread_trace; |
||||||
|
|
||||||
|
// LogicalThread is a mechanism to schedule closures in a synchronized manner.
|
||||||
|
// All closures scheduled on a LogicalThread instance will be executed serially
|
||||||
|
// in a borrowed thread. The basic algorithm on scheduling closures is as
|
||||||
|
// follows - 1) If there are no (zero) closures scheduled on the logical thread
|
||||||
|
class LogicalThread : public RefCounted<LogicalThread> { |
||||||
|
public: |
||||||
|
void Run(const DebugLocation& location, grpc_closure* closure, |
||||||
|
grpc_error* error); |
||||||
|
|
||||||
|
private: |
||||||
|
void DrainQueue(); |
||||||
|
|
||||||
|
Atomic<size_t> size_{0}; |
||||||
|
MultiProducerSingleConsumerQueue queue_; |
||||||
|
}; |
||||||
|
} /* namespace grpc_core */ |
||||||
|
|
||||||
|
#endif /* GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H */ |
@ -0,0 +1,114 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2019 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <gtest/gtest.h> |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
|
||||||
|
#include "src/core/lib/gpr/useful.h" |
||||||
|
#include "src/core/lib/gprpp/thd.h" |
||||||
|
#include "src/core/lib/iomgr/logical_thread.h" |
||||||
|
#include "test/core/util/test_config.h" |
||||||
|
|
||||||
|
namespace { |
||||||
|
TEST(LogicalThreadTest, NoOp) { |
||||||
|
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>(); |
||||||
|
} |
||||||
|
|
||||||
|
void set_event_to_true(void* value, grpc_error* /*error*/) { |
||||||
|
gpr_event_set(static_cast<gpr_event*>(value), (void*)1); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(LogicalThreadTest, ExecuteOne) { |
||||||
|
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>(); |
||||||
|
gpr_event done; |
||||||
|
gpr_event_init(&done); |
||||||
|
lock->Run(DEBUG_LOCATION, |
||||||
|
GRPC_CLOSURE_CREATE(set_event_to_true, &done, nullptr), |
||||||
|
GRPC_ERROR_NONE); |
||||||
|
GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != |
||||||
|
nullptr); |
||||||
|
} |
||||||
|
|
||||||
|
typedef struct { |
||||||
|
size_t ctr; |
||||||
|
grpc_core::RefCountedPtr<grpc_core::LogicalThread> lock; |
||||||
|
gpr_event done; |
||||||
|
} thd_args; |
||||||
|
|
||||||
|
typedef struct { |
||||||
|
size_t* ctr; |
||||||
|
size_t value; |
||||||
|
} ex_args; |
||||||
|
|
||||||
|
void check_one(void* a, grpc_error* /*error*/) { |
||||||
|
ex_args* args = static_cast<ex_args*>(a); |
||||||
|
GPR_ASSERT(*args->ctr == args->value - 1); |
||||||
|
*args->ctr = args->value; |
||||||
|
gpr_free(a); |
||||||
|
} |
||||||
|
|
||||||
|
void execute_many_loop(void* a) { |
||||||
|
thd_args* args = static_cast<thd_args*>(a); |
||||||
|
size_t n = 1; |
||||||
|
for (size_t i = 0; i < 10; i++) { |
||||||
|
for (size_t j = 0; j < 10000; j++) { |
||||||
|
ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c))); |
||||||
|
c->ctr = &args->ctr; |
||||||
|
c->value = n++; |
||||||
|
args->lock->Run(DEBUG_LOCATION, |
||||||
|
GRPC_CLOSURE_CREATE(check_one, c, nullptr), |
||||||
|
GRPC_ERROR_NONE); |
||||||
|
} |
||||||
|
// sleep for a little bit, to test other threads picking up the load
|
||||||
|
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); |
||||||
|
} |
||||||
|
args->lock->Run(DEBUG_LOCATION, |
||||||
|
GRPC_CLOSURE_CREATE(set_event_to_true, &args->done, nullptr), |
||||||
|
GRPC_ERROR_NONE); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(LogicalThreadTest, ExecuteMany) { |
||||||
|
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>(); |
||||||
|
grpc_core::Thread thds[100]; |
||||||
|
thd_args ta[GPR_ARRAY_SIZE(thds)]; |
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { |
||||||
|
ta[i].ctr = 0; |
||||||
|
ta[i].lock = lock; |
||||||
|
gpr_event_init(&ta[i].done); |
||||||
|
thds[i] = grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]); |
||||||
|
thds[i].Start(); |
||||||
|
} |
||||||
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { |
||||||
|
GPR_ASSERT(gpr_event_wait(&ta[i].done, |
||||||
|
gpr_inf_future(GPR_CLOCK_REALTIME)) != nullptr); |
||||||
|
thds[i].Join(); |
||||||
|
} |
||||||
|
} |
||||||
|
} // namespace
|
||||||
|
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
grpc::testing::TestEnvironment env(argc, argv); |
||||||
|
grpc_init(); |
||||||
|
::testing::InitGoogleTest(&argc, argv); |
||||||
|
int retval = RUN_ALL_TESTS(); |
||||||
|
grpc_shutdown(); |
||||||
|
return retval; |
||||||
|
} |
Loading…
Reference in new issue