diff --git a/include/grpc++/impl/codegen/byte_buffer.h b/include/grpc++/impl/codegen/byte_buffer.h index 9c0246e6174..fe73ce7a83d 100644 --- a/include/grpc++/impl/codegen/byte_buffer.h +++ b/include/grpc++/impl/codegen/byte_buffer.h @@ -41,8 +41,6 @@ template class RpcMethodHandler; template class ServerStreamingHandler; -template -class ErrorMethodHandler; template class DeserializeFuncType; } // namespace internal @@ -109,8 +107,6 @@ class ByteBuffer final { friend class internal::RpcMethodHandler; template friend class internal::ServerStreamingHandler; - template - friend class internal::ErrorMethodHandler; template friend class internal::DeserializeFuncType; diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 452eac6646b..b8a78625782 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -78,8 +78,7 @@ template class ServerStreamingHandler; template class BidiStreamingHandler; -template -class ErrorMethodHandler; +class UnknownMethodHandler; template class TemplatedBidiStreamingHandler; template @@ -222,8 +221,7 @@ class CompletionQueue : private GrpcLibraryCodegen { friend class ::grpc::internal::ServerStreamingHandler; template friend class ::grpc::internal::TemplatedBidiStreamingHandler; - template - friend class ::grpc::internal::ErrorMethodHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; friend class ::grpc::ServerInterface; diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 93b7826e8fc..c0af4ca1307 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -242,14 +242,12 @@ class SplitServerStreamingHandler ServerSplitStreamer, false>(func) {} }; -/// General method handler class for errors that prevent real method use -/// e.g., handle unknown method by returning UNIMPLEMENTED error. -template -class ErrorMethodHandler : public MethodHandler { +/// Handle unknown method by returning UNIMPLEMENTED error. +class UnknownMethodHandler : public MethodHandler { public: template static void FillOps(ServerContext* context, T* ops) { - Status status(code, ""); + Status status(StatusCode::UNIMPLEMENTED, ""); if (!context->sent_initial_metadata_) { ops->SendInitialMetadata(context->initial_metadata_, context->initial_metadata_flags()); @@ -266,18 +264,9 @@ class ErrorMethodHandler : public MethodHandler { FillOps(param.server_context, &ops); param.call->PerformOps(&ops); param.call->cq()->Pluck(&ops); - // We also have to destroy any request payload in the handler parameter - ByteBuffer* payload = param.request.bbuf_ptr(); - if (payload != nullptr) { - payload->Clear(); - } } }; -typedef ErrorMethodHandler UnknownMethodHandler; -typedef ErrorMethodHandler - ResourceExhaustedHandler; - } // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 9f20335a2a2..a2d6967bf84 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -63,8 +63,7 @@ template class ServerStreamingHandler; template class BidiStreamingHandler; -template -class ErrorMethodHandler; +class UnknownMethodHandler; template class TemplatedBidiStreamingHandler; class Call; @@ -256,8 +255,7 @@ class ServerContext { friend class ::grpc::internal::ServerStreamingHandler; template friend class ::grpc::internal::TemplatedBidiStreamingHandler; - template - friend class ::grpc::internal::ErrorMethodHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::ClientContext; /// Prevent copying. diff --git a/include/grpc++/server.h b/include/grpc++/server.h index cf590185d1a..01c4a60d216 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -35,7 +35,6 @@ #include #include #include -#include struct grpc_server; @@ -139,20 +138,10 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on /// server completion queues passed via sync_server_cqs param. - /// - /// \param thread_creator The thread creation function for the sync - /// server. Typically gpr_thd_new - /// - /// \param thread_joiner The thread joining function for the sync - /// server. Typically gpr_thd_join Server(int max_message_size, ChannelArguments* args, std::shared_ptr>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec, - std::function - thread_creator, - std::function thread_joiner); + int min_pollers, int max_pollers, int sync_cq_timeout_msec); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -231,14 +220,6 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { std::unique_ptr health_check_service_; bool health_check_service_disabled_; - - std::function - thread_creator_; - std::function thread_joiner_; - - // A special handler for resource exhausted in sync case - std::unique_ptr resource_exhausted_handler_; }; } // namespace grpc diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 25bbacbbc7a..e2bae4b41fa 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -20,7 +20,6 @@ #define GRPCXX_SERVER_BUILDER_H #include -#include #include #include #include @@ -31,7 +30,6 @@ #include #include #include -#include #include #include @@ -49,7 +47,6 @@ class Service; namespace testing { class ServerBuilderPluginTest; -class ServerBuilderThreadCreatorOverrideTest; } // namespace testing /// A builder class for the creation and startup of \a grpc::Server instances. @@ -216,17 +213,6 @@ class ServerBuilder { private: friend class ::grpc::testing::ServerBuilderPluginTest; - friend class ::grpc::testing::ServerBuilderThreadCreatorOverrideTest; - - ServerBuilder& SetThreadFunctions( - std::function - thread_creator, - std::function thread_joiner) { - thread_creator_ = thread_creator; - thread_joiner_ = thread_joiner; - return *this; - } struct Port { grpc::string addr; @@ -286,11 +272,6 @@ class ServerBuilder { grpc_compression_algorithm algorithm; } maybe_default_compression_algorithm_; uint32_t enabled_compression_algorithms_bitset_; - - std::function - thread_creator_; - std::function thread_joiner_; }; } // namespace grpc diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 94519d817bf..4fb128d98b1 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -189,16 +189,10 @@ int MetadataCredentialsPluginWrapper::GetMetadata( } if (w->plugin_->IsBlocking()) { // Asynchronous return. - if (w->thread_pool_->Add(std::bind( - &MetadataCredentialsPluginWrapper::InvokePlugin, w, context, cb, - user_data, nullptr, nullptr, nullptr, nullptr))) { - return 0; - } else { - *num_creds_md = 0; - *status = GRPC_STATUS_RESOURCE_EXHAUSTED; - *error_details = nullptr; - return true; - } + w->thread_pool_->Add( + std::bind(&MetadataCredentialsPluginWrapper::InvokePlugin, w, context, + cb, user_data, nullptr, nullptr, nullptr, nullptr)); + return 0; } else { // Synchronous return. w->InvokePlugin(context, cb, user_data, creds_md, num_creds_md, status, diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc index 2d2abbe9d1c..8ca3e32c2fb 100644 --- a/src/cpp/server/create_default_thread_pool.cc +++ b/src/cpp/server/create_default_thread_pool.cc @@ -28,7 +28,7 @@ namespace { ThreadPoolInterface* CreateDefaultThreadPoolImpl() { int cores = gpr_cpu_num_cores(); if (!cores) cores = 4; - return new DynamicThreadPool(cores, gpr_thd_new, gpr_thd_join); + return new DynamicThreadPool(cores); } CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPoolImpl; diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc index d0e62313f68..81c78fe739b 100644 --- a/src/cpp/server/dynamic_thread_pool.cc +++ b/src/cpp/server/dynamic_thread_pool.cc @@ -19,32 +19,19 @@ #include "src/cpp/server/dynamic_thread_pool.h" #include +#include #include -#include namespace grpc { -DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool, - bool* valid) - : pool_(pool) { - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - - std::lock_guard l(dt_mu_); - valid_ = *valid = pool->thread_creator_( - &thd_, "dynamic thread", - [](void* th) { - reinterpret_cast(th)->ThreadFunc(); - }, - this, &opt); -} - +DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool) + : pool_(pool), + thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, + this)) {} DynamicThreadPool::DynamicThread::~DynamicThread() { - std::lock_guard l(dt_mu_); - if (valid_) { - pool_->thread_joiner_(thd_); - } + thd_->join(); + thd_.reset(); } void DynamicThreadPool::DynamicThread::ThreadFunc() { @@ -86,26 +73,15 @@ void DynamicThreadPool::ThreadFunc() { } } -DynamicThreadPool::DynamicThreadPool( - int reserve_threads, - std::function - thread_creator, - std::function thread_joiner) +DynamicThreadPool::DynamicThreadPool(int reserve_threads) : shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0), - threads_waiting_(0), - thread_creator_(thread_creator), - thread_joiner_(thread_joiner) { + threads_waiting_(0) { for (int i = 0; i < reserve_threads_; i++) { std::lock_guard lock(mu_); nthreads_++; - bool valid; - auto* th = new DynamicThread(this, &valid); - if (!valid) { - delete th; - } + new DynamicThread(this); } } @@ -125,7 +101,7 @@ DynamicThreadPool::~DynamicThreadPool() { ReapThreads(&dead_threads_); } -bool DynamicThreadPool::Add(const std::function& callback) { +void DynamicThreadPool::Add(const std::function& callback) { std::lock_guard lock(mu_); // Add works to the callbacks list callbacks_.push(callback); @@ -133,12 +109,7 @@ bool DynamicThreadPool::Add(const std::function& callback) { if (threads_waiting_ == 0) { // Kick off a new thread nthreads_++; - bool valid; - auto* th = new DynamicThread(this, &valid); - if (!valid) { - delete th; - return false; - } + new DynamicThread(this); } else { cv_.notify_one(); } @@ -146,7 +117,6 @@ bool DynamicThreadPool::Add(const std::function& callback) { if (!dead_threads_.empty()) { ReapThreads(&dead_threads_); } - return true; } } // namespace grpc diff --git a/src/cpp/server/dynamic_thread_pool.h b/src/cpp/server/dynamic_thread_pool.h index 75d31cd9083..9237c6e5cac 100644 --- a/src/cpp/server/dynamic_thread_pool.h +++ b/src/cpp/server/dynamic_thread_pool.h @@ -24,9 +24,9 @@ #include #include #include +#include #include -#include #include "src/cpp/server/thread_pool_interface.h" @@ -34,26 +34,20 @@ namespace grpc { class DynamicThreadPool final : public ThreadPoolInterface { public: - DynamicThreadPool(int reserve_threads, - std::function - thread_creator, - std::function thread_joiner); + explicit DynamicThreadPool(int reserve_threads); ~DynamicThreadPool(); - bool Add(const std::function& callback) override; + void Add(const std::function& callback) override; private: class DynamicThread { public: - DynamicThread(DynamicThreadPool* pool, bool* valid); + DynamicThread(DynamicThreadPool* pool); ~DynamicThread(); private: DynamicThreadPool* pool_; - std::mutex dt_mu_; - gpr_thd_id thd_; - bool valid_; + std::unique_ptr thd_; void ThreadFunc(); }; std::mutex mu_; @@ -65,10 +59,6 @@ class DynamicThreadPool final : public ThreadPoolInterface { int nthreads_; int threads_waiting_; std::list dead_threads_; - std::function - thread_creator_; - std::function thread_joiner_; void ThreadFunc(); static void ReapThreads(std::list* tlist); diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc index fa08a6200f1..0fbe4ccd18b 100644 --- a/src/cpp/server/secure_server_credentials.cc +++ b/src/cpp/server/secure_server_credentials.cc @@ -43,14 +43,9 @@ void AuthMetadataProcessorAyncWrapper::Process( return; } if (w->processor_->IsBlocking()) { - bool added = w->thread_pool_->Add( + w->thread_pool_->Add( std::bind(&AuthMetadataProcessorAyncWrapper::InvokeProcessor, w, context, md, num_md, cb, user_data)); - if (!added) { - // no thread available, so fail with temporary resource unavailability - cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_UNAVAILABLE, nullptr); - return; - } } else { // invoke directly. w->InvokeProcessor(context, md, num_md, cb, user_data); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index d91ee7f4e3a..200e477822c 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -23,7 +23,6 @@ #include #include #include -#include #include #include "src/cpp/server/thread_pool_interface.h" @@ -44,9 +43,7 @@ ServerBuilder::ServerBuilder() max_send_message_size_(-1), sync_server_settings_(SyncServerSettings()), resource_quota_(nullptr), - generic_service_(nullptr), - thread_creator_(gpr_thd_new), - thread_joiner_(gpr_thd_join) { + generic_service_(nullptr) { gpr_once_init(&once_init_plugin_list, do_plugin_list_init); for (auto it = g_plugin_factory_list->begin(); it != g_plugin_factory_list->end(); it++) { @@ -265,7 +262,7 @@ std::unique_ptr ServerBuilder::BuildAndStart() { std::unique_ptr server(new Server( max_receive_message_size_, &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec, thread_creator_, thread_joiner_)); + sync_server_settings_.cq_timeout_msec)); if (has_sync_methods) { // This is a Sync server diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 02a663d660e..4f8f4e06fcc 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -36,7 +36,6 @@ #include #include #include -#include #include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/lib/profiling/timers.h" @@ -196,10 +195,8 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { call_(mrd->call_, server, &cq_, server->max_receive_message_size()), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), - request_payload_(has_request_payload_ ? mrd->request_payload_ - : nullptr), - method_(mrd->method_), - server_(server) { + request_payload_(mrd->request_payload_), + method_(mrd->method_) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -213,13 +210,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { } } - void Run(std::shared_ptr global_callbacks, - bool resources) { + void Run(std::shared_ptr global_callbacks) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - auto* handler = resources ? method_->handler() - : server_->resource_exhausted_handler_.get(); - handler->RunHandler(internal::MethodHandler::HandlerParameter( + method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; @@ -241,7 +235,6 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { const bool has_request_payload_; grpc_byte_buffer* request_payload_; internal::RpcServiceMethod* const method_; - Server* server_; }; private: @@ -262,15 +255,11 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { // appropriate RPC handlers class Server::SyncRequestThreadManager : public ThreadManager { public: - SyncRequestThreadManager( - Server* server, CompletionQueue* server_cq, - std::shared_ptr global_callbacks, int min_pollers, - int max_pollers, int cq_timeout_msec, - std::function - thread_creator, - std::function thread_joiner) - : ThreadManager(min_pollers, max_pollers, thread_creator, thread_joiner), + SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, + std::shared_ptr global_callbacks, + int min_pollers, int max_pollers, + int cq_timeout_msec) + : ThreadManager(min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), @@ -296,7 +285,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { GPR_UNREACHABLE_CODE(return TIMEOUT); } - void DoWork(void* tag, bool ok, bool resources) override { + void DoWork(void* tag, bool ok) override { SyncRequest* sync_req = static_cast(tag); if (!sync_req) { @@ -316,7 +305,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_, resources); + cd.Run(global_callbacks_); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -378,11 +367,7 @@ Server::Server( int max_receive_message_size, ChannelArguments* args, std::shared_ptr>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec, - std::function - thread_creator, - std::function thread_joiner) + int min_pollers, int max_pollers, int sync_cq_timeout_msec) : max_receive_message_size_(max_receive_message_size), sync_server_cqs_(sync_server_cqs), started_(false), @@ -391,9 +376,7 @@ Server::Server( has_generic_service_(false), server_(nullptr), server_initializer_(new ServerInitializer(this)), - health_check_service_disabled_(false), - thread_creator_(thread_creator), - thread_joiner_(thread_joiner) { + health_check_service_disabled_(false) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; @@ -403,7 +386,7 @@ Server::Server( it++) { sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( this, (*it).get(), global_callbacks_, min_pollers, max_pollers, - sync_cq_timeout_msec, thread_creator_, thread_joiner_)); + sync_cq_timeout_msec)); } grpc_channel_args channel_args; @@ -566,10 +549,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { } } - if (!sync_server_cqs_->empty()) { - resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler); - } - for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } diff --git a/src/cpp/server/thread_pool_interface.h b/src/cpp/server/thread_pool_interface.h index 656e6673f12..028842a776f 100644 --- a/src/cpp/server/thread_pool_interface.h +++ b/src/cpp/server/thread_pool_interface.h @@ -29,9 +29,7 @@ class ThreadPoolInterface { virtual ~ThreadPoolInterface() {} // Schedule the given callback for execution. - // Return true on success, false on failure - virtual bool Add(const std::function& callback) - GRPC_MUST_USE_RESULT = 0; + virtual void Add(const std::function& callback) = 0; }; // Allows different codebases to use their own thread pool impls diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 107c60f4ebe..23264f1b5bc 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -20,26 +20,18 @@ #include #include +#include #include -#include namespace grpc { -ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr, bool* valid) +ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr) : thd_mgr_(thd_mgr) { - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - // Make thread creation exclusive with respect to its join happening in // ~WorkerThread(). std::lock_guard lock(wt_mu_); - *valid = valid_ = thd_mgr->thread_creator_( - &thd_, "worker thread", - [](void* th) { - reinterpret_cast(th)->Run(); - }, - this, &opt); + thd_ = std::thread(&ThreadManager::WorkerThread::Run, this); } void ThreadManager::WorkerThread::Run() { @@ -50,24 +42,15 @@ void ThreadManager::WorkerThread::Run() { ThreadManager::WorkerThread::~WorkerThread() { // Don't join until the thread is fully constructed. std::lock_guard lock(wt_mu_); - if (valid_) { - thd_mgr_->thread_joiner_(thd_); - } + thd_.join(); } -ThreadManager::ThreadManager( - int min_pollers, int max_pollers, - std::function - thread_creator, - std::function thread_joiner) +ThreadManager::ThreadManager(int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0), - thread_creator_(thread_creator), - thread_joiner_(thread_joiner) {} + num_threads_(0) {} ThreadManager::~ThreadManager() { { @@ -128,9 +111,7 @@ void ThreadManager::Initialize() { for (int i = 0; i < min_pollers_; i++) { // Create a new thread (which ends up calling the MainWorkLoop() function - bool valid; - new WorkerThread(this, &valid); - GPR_ASSERT(valid); // we need to have at least this minimum + new WorkerThread(this); } } @@ -157,27 +138,18 @@ void ThreadManager::MainWorkLoop() { case WORK_FOUND: // If we got work and there are now insufficient pollers, start a new // one - bool resources; if (!shutdown_ && num_pollers_ < min_pollers_) { - bool valid; + num_pollers_++; + num_threads_++; // Drop lock before spawning thread to avoid contention lock.unlock(); - auto* th = new WorkerThread(this, &valid); - lock.lock(); - if (valid) { - num_pollers_++; - num_threads_++; - } else { - delete th; - } - resources = (num_pollers_ > 0); + new WorkerThread(this); } else { - resources = true; + // Drop lock for consistency with above branch + lock.unlock(); } - // Drop lock before any application work - lock.unlock(); // Lock is always released at this point - do the application work - DoWork(tag, ok, resources); + DoWork(tag, ok); // Take the lock again to check post conditions lock.lock(); // If we're shutdown, we should finish at this point. diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index c1783baa607..a206e0bd8a5 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -20,23 +20,18 @@ #define GRPC_INTERNAL_CPP_THREAD_MANAGER_H #include -#include #include #include #include +#include #include -#include namespace grpc { class ThreadManager { public: - ThreadManager(int min_pollers, int max_pollers, - std::function - thread_creator, - std::function thread_joiner); + explicit ThreadManager(int min_pollers, int max_pollers); virtual ~ThreadManager(); // Initializes and Starts the Rpc Manager threads @@ -55,8 +50,6 @@ class ThreadManager { // - ThreadManager does not interpret the values of 'tag' and 'ok' // - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to // DoWork() - // - ThreadManager will also pass DoWork a bool saying if there are actually - // resources to do the work // // If the return value is SHUTDOWN:, // - ThreadManager WILL NOT call DoWork() and terminates the thead @@ -76,7 +69,7 @@ class ThreadManager { // The implementation of DoWork() should also do any setup needed to ensure // that the next call to PollForWork() (not necessarily by the current thread) // actually finds some work - virtual void DoWork(void* tag, bool ok, bool resources) = 0; + virtual void DoWork(void* tag, bool ok) = 0; // Mark the ThreadManager as shutdown and begin draining the work. This is a // non-blocking call and the caller should call Wait(), a blocking call which @@ -91,15 +84,15 @@ class ThreadManager { virtual void Wait(); private: - // Helper wrapper class around thread. This takes a ThreadManager object - // and starts a new thread to calls the Run() function. + // Helper wrapper class around std::thread. This takes a ThreadManager object + // and starts a new std::thread to calls the Run() function. // // The Run() function calls ThreadManager::MainWorkLoop() function and once // that completes, it marks the WorkerThread completed by calling // ThreadManager::MarkAsCompleted() class WorkerThread { public: - WorkerThread(ThreadManager* thd_mgr, bool* valid); + WorkerThread(ThreadManager* thd_mgr); ~WorkerThread(); private: @@ -109,8 +102,7 @@ class ThreadManager { ThreadManager* const thd_mgr_; std::mutex wt_mu_; - gpr_thd_id thd_; - bool valid_; + std::thread thd_; }; // The main funtion in ThreadManager @@ -137,13 +129,6 @@ class ThreadManager { // currently polling i.e num_pollers_) int num_threads_; - // Functions for creating/joining threads. Normally, these should - // be gpr_thd_new/gpr_thd_join but they are overridable - std::function - thread_creator_; - std::function thread_joiner_; - std::mutex list_mu_; std::list completed_threads_; }; diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index fd43c8f584c..90b2eddbbb4 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -26,7 +26,6 @@ #include #include #include -#include #include #include @@ -53,13 +52,63 @@ namespace testing { class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { public: - TestServiceImpl() {} + TestServiceImpl() : signal_client_(false) {} Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { response->set_message(request->message()); return Status::OK; } + + // Unimplemented is left unimplemented to test the returned error. + + Status RequestStream(ServerContext* context, + ServerReader* reader, + EchoResponse* response) override { + EchoRequest request; + response->set_message(""); + while (reader->Read(&request)) { + response->mutable_message()->append(request.message()); + } + return Status::OK; + } + + // Return 3 messages. + // TODO(yangg) make it generic by adding a parameter into EchoRequest + Status ResponseStream(ServerContext* context, const EchoRequest* request, + ServerWriter* writer) override { + EchoResponse response; + response.set_message(request->message() + "0"); + writer->Write(response); + response.set_message(request->message() + "1"); + writer->Write(response); + response.set_message(request->message() + "2"); + writer->Write(response); + + return Status::OK; + } + + Status BidiStream( + ServerContext* context, + ServerReaderWriter* stream) override { + EchoRequest request; + EchoResponse response; + while (stream->Read(&request)) { + gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); + response.set_message(request.message()); + stream->Write(response); + } + return Status::OK; + } + + bool signal_client() { + std::unique_lock lock(mu_); + return signal_client_; + } + + private: + bool signal_client_; + std::mutex mu_; }; template @@ -70,15 +119,10 @@ class CommonStressTest { virtual void SetUp() = 0; virtual void TearDown() = 0; virtual void ResetStub() = 0; - virtual bool AllowExhaustion() = 0; grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); } protected: std::unique_ptr stub_; - // Some tests use a custom thread creator. This should be declared before the - // server so that it's destructor happens after the server - std::unique_ptr creator_; - std::unique_ptr server_; virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0; @@ -103,7 +147,6 @@ class CommonStressTestInsecure : public CommonStressTest { CreateChannel(server_address_.str(), InsecureChannelCredentials()); this->stub_ = grpc::testing::EchoTestService::NewStub(channel); } - bool AllowExhaustion() override { return false; } protected: void SetUpStart(ServerBuilder* builder, Service* service) override { @@ -119,7 +162,7 @@ class CommonStressTestInsecure : public CommonStressTest { std::ostringstream server_address_; }; -template +template class CommonStressTestInproc : public CommonStressTest { public: void ResetStub() override { @@ -127,7 +170,6 @@ class CommonStressTestInproc : public CommonStressTest { std::shared_ptr channel = this->server_->InProcessChannel(args); this->stub_ = grpc::testing::EchoTestService::NewStub(channel); } - bool AllowExhaustion() override { return allow_resource_exhaustion; } protected: void SetUpStart(ServerBuilder* builder, Service* service) override { @@ -152,67 +194,6 @@ class CommonStressTestSyncServer : public BaseClass { TestServiceImpl service_; }; -class ServerBuilderThreadCreatorOverrideTest { - public: - ServerBuilderThreadCreatorOverrideTest(ServerBuilder* builder, size_t limit) - : limit_(limit), threads_(0) { - builder->SetThreadFunctions( - [this](gpr_thd_id* id, const char* name, void (*f)(void*), void* arg, - const gpr_thd_options* options) -> int { - std::unique_lock l(mu_); - if (threads_ < limit_) { - l.unlock(); - if (gpr_thd_new(id, name, f, arg, options) != 0) { - l.lock(); - threads_++; - return 1; - } - } - return 0; - }, - [this](gpr_thd_id id) { - gpr_thd_join(id); - std::unique_lock l(mu_); - threads_--; - if (threads_ == 0) { - done_.notify_one(); - } - }); - } - ~ServerBuilderThreadCreatorOverrideTest() { - // Don't allow destruction until all threads are really done and uncounted - std::unique_lock l(mu_); - done_.wait(l, [this] { return (threads_ == 0); }); - } - - private: - size_t limit_; - size_t threads_; - std::mutex mu_; - std::condition_variable done_; -}; - -template -class CommonStressTestSyncServerLowThreadCount : public BaseClass { - public: - void SetUp() override { - ServerBuilder builder; - this->SetUpStart(&builder, &service_); - builder.SetSyncServerOption(ServerBuilder::SyncServerOption::MIN_POLLERS, - 1); - this->creator_.reset( - new ServerBuilderThreadCreatorOverrideTest(&builder, 4)); - this->SetUpEnd(&builder); - } - void TearDown() override { - this->TearDownStart(); - this->TearDownEnd(); - } - - private: - TestServiceImpl service_; -}; - template class CommonStressTestAsyncServer : public BaseClass { public: @@ -313,8 +294,7 @@ class End2endTest : public ::testing::Test { Common common_; }; -static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, - bool allow_exhaustion, gpr_atm* errors) { +static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -322,48 +302,33 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, for (int i = 0; i < num_rpcs; ++i) { ClientContext context; Status s = stub->Echo(&context, request, &response); - EXPECT_TRUE(s.ok() || (allow_exhaustion && - s.error_code() == StatusCode::RESOURCE_EXHAUSTED)); + EXPECT_EQ(response.message(), request.message()); if (!s.ok()) { - if (!(allow_exhaustion && - s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) { - gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), - s.error_message().c_str()); - } - gpr_atm_no_barrier_fetch_add(errors, static_cast(1)); - } else { - EXPECT_EQ(response.message(), request.message()); + gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), + s.error_message().c_str()); } + ASSERT_TRUE(s.ok()); } } typedef ::testing::Types< CommonStressTestSyncServer>, - CommonStressTestSyncServer>, - CommonStressTestSyncServerLowThreadCount< - CommonStressTestInproc>, + CommonStressTestSyncServer>, CommonStressTestAsyncServer< CommonStressTestInsecure>, - CommonStressTestAsyncServer>> + CommonStressTestAsyncServer< + CommonStressTestInproc>> CommonTypes; TYPED_TEST_CASE(End2endTest, CommonTypes); TYPED_TEST(End2endTest, ThreadStress) { this->common_.ResetStub(); std::vector threads; - gpr_atm errors; - gpr_atm_rel_store(&errors, static_cast(0)); for (int i = 0; i < kNumThreads; ++i) { - threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs, - this->common_.AllowExhaustion(), &errors); + threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs); } for (int i = 0; i < kNumThreads; ++i) { threads[i].join(); } - uint64_t error_cnt = static_cast(gpr_atm_no_barrier_load(&errors)); - if (error_cnt != 0) { - gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt); - } } template diff --git a/test/cpp/thread_manager/BUILD b/test/cpp/thread_manager/BUILD deleted file mode 100644 index 1f0878770b5..00000000000 --- a/test/cpp/thread_manager/BUILD +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2017 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -licenses(["notice"]) # Apache v2 - -load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") - -grpc_package(name = "test/cpp/thread_manager") - -grpc_cc_test( - name = "thread_manager_test", - srcs = ["thread_manager_test.cc"], - deps = [ - "//:gpr", - "//:grpc", - "//:grpc++", - "//test/cpp/util:test_config", - ], -) - diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc index d3d31f9dd97..8282d466948 100644 --- a/test/cpp/thread_manager/thread_manager_test.cc +++ b/test/cpp/thread_manager/thread_manager_test.cc @@ -20,10 +20,10 @@ #include #include +#include #include #include #include -#include #include "src/cpp/thread_manager/thread_manager.h" #include "test/cpp/util/test_config.h" @@ -32,13 +32,13 @@ namespace grpc { class ThreadManagerTest final : public grpc::ThreadManager { public: ThreadManagerTest() - : ThreadManager(kMinPollers, kMaxPollers, gpr_thd_new, gpr_thd_join), + : ThreadManager(kMinPollers, kMaxPollers), num_do_work_(0), num_poll_for_work_(0), num_work_found_(0) {} grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override; - void DoWork(void* tag, bool ok, bool resources) override; + void DoWork(void* tag, bool ok) override; void PerformTest(); private: @@ -89,7 +89,7 @@ grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag, } } -void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) { +void ThreadManagerTest::DoWork(void* tag, bool ok) { gpr_atm_no_barrier_fetch_add(&num_do_work_, 1); SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping }