Revert "Stop using std::thread in C++ library since it can trigger exceptions"

pull/14003/head
Jan Tattermusch 7 years ago committed by GitHub
parent b0b4555f4c
commit c9ec2c0888
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      include/grpc++/impl/codegen/byte_buffer.h
  2. 6
      include/grpc++/impl/codegen/completion_queue.h
  3. 17
      include/grpc++/impl/codegen/method_handler_impl.h
  4. 6
      include/grpc++/impl/codegen/server_context.h
  5. 21
      include/grpc++/server.h
  6. 19
      include/grpc++/server_builder.h
  7. 14
      src/cpp/client/secure_credentials.cc
  8. 2
      src/cpp/server/create_default_thread_pool.cc
  9. 54
      src/cpp/server/dynamic_thread_pool.cc
  10. 20
      src/cpp/server/dynamic_thread_pool.h
  11. 7
      src/cpp/server/secure_server_credentials.cc
  12. 7
      src/cpp/server/server_builder.cc
  13. 49
      src/cpp/server/server_cc.cc
  14. 4
      src/cpp/server/thread_pool_interface.h
  15. 54
      src/cpp/thread_manager/thread_manager.cc
  16. 29
      src/cpp/thread_manager/thread_manager.h
  17. 157
      test/cpp/end2end/thread_stress_test.cc
  18. 31
      test/cpp/thread_manager/BUILD
  19. 8
      test/cpp/thread_manager/thread_manager_test.cc

@ -41,8 +41,6 @@ template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler; class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler; class ServerStreamingHandler;
template <StatusCode code>
class ErrorMethodHandler;
template <class R> template <class R>
class DeserializeFuncType; class DeserializeFuncType;
} // namespace internal } // namespace internal
@ -109,8 +107,6 @@ class ByteBuffer final {
friend class internal::RpcMethodHandler; friend class internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
friend class internal::ServerStreamingHandler; friend class internal::ServerStreamingHandler;
template <StatusCode code>
friend class internal::ErrorMethodHandler;
template <class R> template <class R>
friend class internal::DeserializeFuncType; friend class internal::DeserializeFuncType;

@ -78,8 +78,7 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler; class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler; class BidiStreamingHandler;
template <StatusCode code> class UnknownMethodHandler;
class ErrorMethodHandler;
template <class Streamer, bool WriteNeeded> template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler; class TemplatedBidiStreamingHandler;
template <class InputMessage, class OutputMessage> template <class InputMessage, class OutputMessage>
@ -222,8 +221,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
friend class ::grpc::internal::ServerStreamingHandler; friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded> template <class Streamer, bool WriteNeeded>
friend class ::grpc::internal::TemplatedBidiStreamingHandler; friend class ::grpc::internal::TemplatedBidiStreamingHandler;
template <StatusCode code> friend class ::grpc::internal::UnknownMethodHandler;
friend class ::grpc::internal::ErrorMethodHandler;
friend class ::grpc::Server; friend class ::grpc::Server;
friend class ::grpc::ServerContext; friend class ::grpc::ServerContext;
friend class ::grpc::ServerInterface; friend class ::grpc::ServerInterface;

@ -242,14 +242,12 @@ class SplitServerStreamingHandler
ServerSplitStreamer<RequestType, ResponseType>, false>(func) {} ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
}; };
/// General method handler class for errors that prevent real method use /// Handle unknown method by returning UNIMPLEMENTED error.
/// e.g., handle unknown method by returning UNIMPLEMENTED error. class UnknownMethodHandler : public MethodHandler {
template <StatusCode code>
class ErrorMethodHandler : public MethodHandler {
public: public:
template <class T> template <class T>
static void FillOps(ServerContext* context, T* ops) { static void FillOps(ServerContext* context, T* ops) {
Status status(code, ""); Status status(StatusCode::UNIMPLEMENTED, "");
if (!context->sent_initial_metadata_) { if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_, ops->SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags()); context->initial_metadata_flags());
@ -266,18 +264,9 @@ class ErrorMethodHandler : public MethodHandler {
FillOps(param.server_context, &ops); FillOps(param.server_context, &ops);
param.call->PerformOps(&ops); param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops); param.call->cq()->Pluck(&ops);
// We also have to destroy any request payload in the handler parameter
ByteBuffer* payload = param.request.bbuf_ptr();
if (payload != nullptr) {
payload->Clear();
}
} }
}; };
typedef ErrorMethodHandler<StatusCode::UNIMPLEMENTED> UnknownMethodHandler;
typedef ErrorMethodHandler<StatusCode::RESOURCE_EXHAUSTED>
ResourceExhaustedHandler;
} // namespace internal } // namespace internal
} // namespace grpc } // namespace grpc

@ -63,8 +63,7 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler; class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler; class BidiStreamingHandler;
template <StatusCode code> class UnknownMethodHandler;
class ErrorMethodHandler;
template <class Streamer, bool WriteNeeded> template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler; class TemplatedBidiStreamingHandler;
class Call; class Call;
@ -256,8 +255,7 @@ class ServerContext {
friend class ::grpc::internal::ServerStreamingHandler; friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded> template <class Streamer, bool WriteNeeded>
friend class ::grpc::internal::TemplatedBidiStreamingHandler; friend class ::grpc::internal::TemplatedBidiStreamingHandler;
template <StatusCode code> friend class ::grpc::internal::UnknownMethodHandler;
friend class ::grpc::internal::ErrorMethodHandler;
friend class ::grpc::ClientContext; friend class ::grpc::ClientContext;
/// Prevent copying. /// Prevent copying.

@ -35,7 +35,6 @@
#include <grpc++/support/config.h> #include <grpc++/support/config.h>
#include <grpc++/support/status.h> #include <grpc++/support/status.h>
#include <grpc/compression.h> #include <grpc/compression.h>
#include <grpc/support/thd.h>
struct grpc_server; struct grpc_server;
@ -139,20 +138,10 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen {
/// ///
/// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
/// server completion queues passed via sync_server_cqs param. /// server completion queues passed via sync_server_cqs param.
///
/// \param thread_creator The thread creation function for the sync
/// server. Typically gpr_thd_new
///
/// \param thread_joiner The thread joining function for the sync
/// server. Typically gpr_thd_join
Server(int max_message_size, ChannelArguments* args, Server(int max_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs, sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec, int min_pollers, int max_pollers, int sync_cq_timeout_msec);
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
const gpr_thd_options*)>
thread_creator,
std::function<void(gpr_thd_id)> thread_joiner);
/// Register a service. This call does not take ownership of the service. /// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance. /// The service must exist for the lifetime of the Server instance.
@ -231,14 +220,6 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen {
std::unique_ptr<HealthCheckServiceInterface> health_check_service_; std::unique_ptr<HealthCheckServiceInterface> health_check_service_;
bool health_check_service_disabled_; bool health_check_service_disabled_;
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
const gpr_thd_options*)>
thread_creator_;
std::function<void(gpr_thd_id)> thread_joiner_;
// A special handler for resource exhausted in sync case
std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_;
}; };
} // namespace grpc } // namespace grpc

@ -20,7 +20,6 @@
#define GRPCXX_SERVER_BUILDER_H #define GRPCXX_SERVER_BUILDER_H
#include <climits> #include <climits>
#include <functional>
#include <map> #include <map>
#include <memory> #include <memory>
#include <vector> #include <vector>
@ -31,7 +30,6 @@
#include <grpc++/support/config.h> #include <grpc++/support/config.h>
#include <grpc/compression.h> #include <grpc/compression.h>
#include <grpc/support/cpu.h> #include <grpc/support/cpu.h>
#include <grpc/support/thd.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include <grpc/support/workaround_list.h> #include <grpc/support/workaround_list.h>
@ -49,7 +47,6 @@ class Service;
namespace testing { namespace testing {
class ServerBuilderPluginTest; class ServerBuilderPluginTest;
class ServerBuilderThreadCreatorOverrideTest;
} // namespace testing } // namespace testing
/// A builder class for the creation and startup of \a grpc::Server instances. /// A builder class for the creation and startup of \a grpc::Server instances.
@ -216,17 +213,6 @@ class ServerBuilder {
private: private:
friend class ::grpc::testing::ServerBuilderPluginTest; friend class ::grpc::testing::ServerBuilderPluginTest;
friend class ::grpc::testing::ServerBuilderThreadCreatorOverrideTest;
ServerBuilder& SetThreadFunctions(
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
const gpr_thd_options*)>
thread_creator,
std::function<void(gpr_thd_id)> thread_joiner) {
thread_creator_ = thread_creator;
thread_joiner_ = thread_joiner;
return *this;
}
struct Port { struct Port {
grpc::string addr; grpc::string addr;
@ -286,11 +272,6 @@ class ServerBuilder {
grpc_compression_algorithm algorithm; grpc_compression_algorithm algorithm;
} maybe_default_compression_algorithm_; } maybe_default_compression_algorithm_;
uint32_t enabled_compression_algorithms_bitset_; uint32_t enabled_compression_algorithms_bitset_;
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
const gpr_thd_options*)>
thread_creator_;
std::function<void(gpr_thd_id)> thread_joiner_;
}; };
} // namespace grpc } // namespace grpc

@ -189,16 +189,10 @@ int MetadataCredentialsPluginWrapper::GetMetadata(
} }
if (w->plugin_->IsBlocking()) { if (w->plugin_->IsBlocking()) {
// Asynchronous return. // Asynchronous return.
if (w->thread_pool_->Add(std::bind( w->thread_pool_->Add(
&MetadataCredentialsPluginWrapper::InvokePlugin, w, context, cb, std::bind(&MetadataCredentialsPluginWrapper::InvokePlugin, w, context,
user_data, nullptr, nullptr, nullptr, nullptr))) { cb, user_data, nullptr, nullptr, nullptr, nullptr));
return 0; return 0;
} else {
*num_creds_md = 0;
*status = GRPC_STATUS_RESOURCE_EXHAUSTED;
*error_details = nullptr;
return true;
}
} else { } else {
// Synchronous return. // Synchronous return.
w->InvokePlugin(context, cb, user_data, creds_md, num_creds_md, status, w->InvokePlugin(context, cb, user_data, creds_md, num_creds_md, status,

@ -28,7 +28,7 @@ namespace {
ThreadPoolInterface* CreateDefaultThreadPoolImpl() { ThreadPoolInterface* CreateDefaultThreadPoolImpl() {
int cores = gpr_cpu_num_cores(); int cores = gpr_cpu_num_cores();
if (!cores) cores = 4; if (!cores) cores = 4;
return new DynamicThreadPool(cores, gpr_thd_new, gpr_thd_join); return new DynamicThreadPool(cores);
} }
CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPoolImpl; CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPoolImpl;

@ -19,32 +19,19 @@
#include "src/cpp/server/dynamic_thread_pool.h" #include "src/cpp/server/dynamic_thread_pool.h"
#include <mutex> #include <mutex>
#include <thread>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/thd.h>
namespace grpc { namespace grpc {
DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool, DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
bool* valid) : pool_(pool),
: pool_(pool) { thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
gpr_thd_options opt = gpr_thd_options_default(); this)) {}
gpr_thd_options_set_joinable(&opt);
std::lock_guard<std::mutex> l(dt_mu_);
valid_ = *valid = pool->thread_creator_(
&thd_, "dynamic thread",
[](void* th) {
reinterpret_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc();
},
this, &opt);
}
DynamicThreadPool::DynamicThread::~DynamicThread() { DynamicThreadPool::DynamicThread::~DynamicThread() {
std::lock_guard<std::mutex> l(dt_mu_); thd_->join();
if (valid_) { thd_.reset();
pool_->thread_joiner_(thd_);
}
} }
void DynamicThreadPool::DynamicThread::ThreadFunc() { void DynamicThreadPool::DynamicThread::ThreadFunc() {
@ -86,26 +73,15 @@ void DynamicThreadPool::ThreadFunc() {
} }
} }
DynamicThreadPool::DynamicThreadPool( DynamicThreadPool::DynamicThreadPool(int reserve_threads)
int reserve_threads,
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
const gpr_thd_options*)>
thread_creator,
std::function<void(gpr_thd_id)> thread_joiner)
: shutdown_(false), : shutdown_(false),
reserve_threads_(reserve_threads), reserve_threads_(reserve_threads),
nthreads_(0), nthreads_(0),
threads_waiting_(0), threads_waiting_(0) {
thread_creator_(thread_creator),
thread_joiner_(thread_joiner) {
for (int i = 0; i < reserve_threads_; i++) { for (int i = 0; i < reserve_threads_; i++) {
std::lock_guard<std::mutex> lock(mu_); std::lock_guard<std::mutex> lock(mu_);
nthreads_++; nthreads_++;
bool valid; new DynamicThread(this);
auto* th = new DynamicThread(this, &valid);
if (!valid) {
delete th;
}
} }
} }
@ -125,7 +101,7 @@ DynamicThreadPool::~DynamicThreadPool() {
ReapThreads(&dead_threads_); ReapThreads(&dead_threads_);
} }
bool DynamicThreadPool::Add(const std::function<void()>& callback) { void DynamicThreadPool::Add(const std::function<void()>& callback) {
std::lock_guard<std::mutex> lock(mu_); std::lock_guard<std::mutex> lock(mu_);
// Add works to the callbacks list // Add works to the callbacks list
callbacks_.push(callback); callbacks_.push(callback);
@ -133,12 +109,7 @@ bool DynamicThreadPool::Add(const std::function<void()>& callback) {
if (threads_waiting_ == 0) { if (threads_waiting_ == 0) {
// Kick off a new thread // Kick off a new thread
nthreads_++; nthreads_++;
bool valid; new DynamicThread(this);
auto* th = new DynamicThread(this, &valid);
if (!valid) {
delete th;
return false;
}
} else { } else {
cv_.notify_one(); cv_.notify_one();
} }
@ -146,7 +117,6 @@ bool DynamicThreadPool::Add(const std::function<void()>& callback) {
if (!dead_threads_.empty()) { if (!dead_threads_.empty()) {
ReapThreads(&dead_threads_); ReapThreads(&dead_threads_);
} }
return true;
} }
} // namespace grpc } // namespace grpc

@ -24,9 +24,9 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <queue> #include <queue>
#include <thread>
#include <grpc++/support/config.h> #include <grpc++/support/config.h>
#include <grpc/support/thd.h>
#include "src/cpp/server/thread_pool_interface.h" #include "src/cpp/server/thread_pool_interface.h"
@ -34,26 +34,20 @@ namespace grpc {
class DynamicThreadPool final : public ThreadPoolInterface { class DynamicThreadPool final : public ThreadPoolInterface {
public: public:
DynamicThreadPool(int reserve_threads, explicit DynamicThreadPool(int reserve_threads);
std::function<int(gpr_thd_id*, const char*, void (*)(void*),
void*, const gpr_thd_options*)>
thread_creator,
std::function<void(gpr_thd_id)> thread_joiner);
~DynamicThreadPool(); ~DynamicThreadPool();
bool Add(const std::function<void()>& callback) override; void Add(const std::function<void()>& callback) override;
private: private:
class DynamicThread { class DynamicThread {
public: public:
DynamicThread(DynamicThreadPool* pool, bool* valid); DynamicThread(DynamicThreadPool* pool);
~DynamicThread(); ~DynamicThread();
private: private:
DynamicThreadPool* pool_; DynamicThreadPool* pool_;
std::mutex dt_mu_; std::unique_ptr<std::thread> thd_;
gpr_thd_id thd_;
bool valid_;
void ThreadFunc(); void ThreadFunc();
}; };
std::mutex mu_; std::mutex mu_;
@ -65,10 +59,6 @@ class DynamicThreadPool final : public ThreadPoolInterface {
int nthreads_; int nthreads_;
int threads_waiting_; int threads_waiting_;
std::list<DynamicThread*> dead_threads_; std::list<DynamicThread*> dead_threads_;
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
const gpr_thd_options*)>
thread_creator_;
std::function<void(gpr_thd_id)> thread_joiner_;
void ThreadFunc(); void ThreadFunc();
static void ReapThreads(std::list<DynamicThread*>* tlist); static void ReapThreads(std::list<DynamicThread*>* tlist);

@ -43,14 +43,9 @@ void AuthMetadataProcessorAyncWrapper::Process(
return; return;
} }
if (w->processor_->IsBlocking()) { if (w->processor_->IsBlocking()) {
bool added = w->thread_pool_->Add( w->thread_pool_->Add(
std::bind(&AuthMetadataProcessorAyncWrapper::InvokeProcessor, w, std::bind(&AuthMetadataProcessorAyncWrapper::InvokeProcessor, w,
context, md, num_md, cb, user_data)); context, md, num_md, cb, user_data));
if (!added) {
// no thread available, so fail with temporary resource unavailability
cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_UNAVAILABLE, nullptr);
return;
}
} else { } else {
// invoke directly. // invoke directly.
w->InvokeProcessor(context, md, num_md, cb, user_data); w->InvokeProcessor(context, md, num_md, cb, user_data);

@ -23,7 +23,6 @@
#include <grpc++/server.h> #include <grpc++/server.h>
#include <grpc/support/cpu.h> #include <grpc/support/cpu.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/cpp/server/thread_pool_interface.h" #include "src/cpp/server/thread_pool_interface.h"
@ -44,9 +43,7 @@ ServerBuilder::ServerBuilder()
max_send_message_size_(-1), max_send_message_size_(-1),
sync_server_settings_(SyncServerSettings()), sync_server_settings_(SyncServerSettings()),
resource_quota_(nullptr), resource_quota_(nullptr),
generic_service_(nullptr), generic_service_(nullptr) {
thread_creator_(gpr_thd_new),
thread_joiner_(gpr_thd_join) {
gpr_once_init(&once_init_plugin_list, do_plugin_list_init); gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
for (auto it = g_plugin_factory_list->begin(); for (auto it = g_plugin_factory_list->begin();
it != g_plugin_factory_list->end(); it++) { it != g_plugin_factory_list->end(); it++) {
@ -265,7 +262,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
std::unique_ptr<Server> server(new Server( std::unique_ptr<Server> server(new Server(
max_receive_message_size_, &args, sync_server_cqs, max_receive_message_size_, &args, sync_server_cqs,
sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec, thread_creator_, thread_joiner_)); sync_server_settings_.cq_timeout_msec));
if (has_sync_methods) { if (has_sync_methods) {
// This is a Sync server // This is a Sync server

@ -36,7 +36,6 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/ext/transport/inproc/inproc_transport.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
@ -196,10 +195,8 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
call_(mrd->call_, server, &cq_, server->max_receive_message_size()), call_(mrd->call_, server, &cq_, server->max_receive_message_size()),
ctx_(mrd->deadline_, &mrd->request_metadata_), ctx_(mrd->deadline_, &mrd->request_metadata_),
has_request_payload_(mrd->has_request_payload_), has_request_payload_(mrd->has_request_payload_),
request_payload_(has_request_payload_ ? mrd->request_payload_ request_payload_(mrd->request_payload_),
: nullptr), method_(mrd->method_) {
method_(mrd->method_),
server_(server) {
ctx_.set_call(mrd->call_); ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_; ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_); GPR_ASSERT(mrd->in_flight_);
@ -213,13 +210,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
} }
} }
void Run(std::shared_ptr<GlobalCallbacks> global_callbacks, void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
bool resources) {
ctx_.BeginCompletionOp(&call_); ctx_.BeginCompletionOp(&call_);
global_callbacks->PreSynchronousRequest(&ctx_); global_callbacks->PreSynchronousRequest(&ctx_);
auto* handler = resources ? method_->handler() method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter(
: server_->resource_exhausted_handler_.get();
handler->RunHandler(internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_payload_)); &call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_); global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr; request_payload_ = nullptr;
@ -241,7 +235,6 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
const bool has_request_payload_; const bool has_request_payload_;
grpc_byte_buffer* request_payload_; grpc_byte_buffer* request_payload_;
internal::RpcServiceMethod* const method_; internal::RpcServiceMethod* const method_;
Server* server_;
}; };
private: private:
@ -262,15 +255,11 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
// appropriate RPC handlers // appropriate RPC handlers
class Server::SyncRequestThreadManager : public ThreadManager { class Server::SyncRequestThreadManager : public ThreadManager {
public: public:
SyncRequestThreadManager( SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
Server* server, CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks,
std::shared_ptr<GlobalCallbacks> global_callbacks, int min_pollers, int min_pollers, int max_pollers,
int max_pollers, int cq_timeout_msec, int cq_timeout_msec)
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*, : ThreadManager(min_pollers, max_pollers),
const gpr_thd_options*)>
thread_creator,
std::function<void(gpr_thd_id)> thread_joiner)
: ThreadManager(min_pollers, max_pollers, thread_creator, thread_joiner),
server_(server), server_(server),
server_cq_(server_cq), server_cq_(server_cq),
cq_timeout_msec_(cq_timeout_msec), cq_timeout_msec_(cq_timeout_msec),
@ -296,7 +285,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
GPR_UNREACHABLE_CODE(return TIMEOUT); GPR_UNREACHABLE_CODE(return TIMEOUT);
} }
void DoWork(void* tag, bool ok, bool resources) override { void DoWork(void* tag, bool ok) override {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag); SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) { if (!sync_req) {
@ -316,7 +305,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
} }
GPR_TIMER_SCOPE("cd.Run()", 0); GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_, resources); cd.Run(global_callbacks_);
} }
// TODO (sreek) If ok is false here (which it isn't in case of // TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request // grpc_request_registered_call), we should still re-queue the request
@ -378,11 +367,7 @@ Server::Server(
int max_receive_message_size, ChannelArguments* args, int max_receive_message_size, ChannelArguments* args,
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs, sync_server_cqs,
int min_pollers, int max_pollers, int sync_cq_timeout_msec, int min_pollers, int max_pollers, int sync_cq_timeout_msec)
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
const gpr_thd_options*)>
thread_creator,
std::function<void(gpr_thd_id)> thread_joiner)
: max_receive_message_size_(max_receive_message_size), : max_receive_message_size_(max_receive_message_size),
sync_server_cqs_(sync_server_cqs), sync_server_cqs_(sync_server_cqs),
started_(false), started_(false),
@ -391,9 +376,7 @@ Server::Server(
has_generic_service_(false), has_generic_service_(false),
server_(nullptr), server_(nullptr),
server_initializer_(new ServerInitializer(this)), server_initializer_(new ServerInitializer(this)),
health_check_service_disabled_(false), health_check_service_disabled_(false) {
thread_creator_(thread_creator),
thread_joiner_(thread_joiner) {
g_gli_initializer.summon(); g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks; global_callbacks_ = g_callbacks;
@ -403,7 +386,7 @@ Server::Server(
it++) { it++) {
sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
this, (*it).get(), global_callbacks_, min_pollers, max_pollers, this, (*it).get(), global_callbacks_, min_pollers, max_pollers,
sync_cq_timeout_msec, thread_creator_, thread_joiner_)); sync_cq_timeout_msec));
} }
grpc_channel_args channel_args; grpc_channel_args channel_args;
@ -566,10 +549,6 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
} }
} }
if (!sync_server_cqs_->empty()) {
resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler);
}
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start(); (*it)->Start();
} }

@ -29,9 +29,7 @@ class ThreadPoolInterface {
virtual ~ThreadPoolInterface() {} virtual ~ThreadPoolInterface() {}
// Schedule the given callback for execution. // Schedule the given callback for execution.
// Return true on success, false on failure virtual void Add(const std::function<void()>& callback) = 0;
virtual bool Add(const std::function<void()>& callback)
GRPC_MUST_USE_RESULT = 0;
}; };
// Allows different codebases to use their own thread pool impls // Allows different codebases to use their own thread pool impls

@ -20,26 +20,18 @@
#include <climits> #include <climits>
#include <mutex> #include <mutex>
#include <thread>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/thd.h>
namespace grpc { namespace grpc {
ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr, bool* valid) ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr)
: thd_mgr_(thd_mgr) { : thd_mgr_(thd_mgr) {
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
// Make thread creation exclusive with respect to its join happening in // Make thread creation exclusive with respect to its join happening in
// ~WorkerThread(). // ~WorkerThread().
std::lock_guard<std::mutex> lock(wt_mu_); std::lock_guard<std::mutex> lock(wt_mu_);
*valid = valid_ = thd_mgr->thread_creator_( thd_ = std::thread(&ThreadManager::WorkerThread::Run, this);
&thd_, "worker thread",
[](void* th) {
reinterpret_cast<ThreadManager::WorkerThread*>(th)->Run();
},
this, &opt);
} }
void ThreadManager::WorkerThread::Run() { void ThreadManager::WorkerThread::Run() {
@ -50,24 +42,15 @@ void ThreadManager::WorkerThread::Run() {
ThreadManager::WorkerThread::~WorkerThread() { ThreadManager::WorkerThread::~WorkerThread() {
// Don't join until the thread is fully constructed. // Don't join until the thread is fully constructed.
std::lock_guard<std::mutex> lock(wt_mu_); std::lock_guard<std::mutex> lock(wt_mu_);
if (valid_) { thd_.join();
thd_mgr_->thread_joiner_(thd_);
}
} }
ThreadManager::ThreadManager( ThreadManager::ThreadManager(int min_pollers, int max_pollers)
int min_pollers, int max_pollers,
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
const gpr_thd_options*)>
thread_creator,
std::function<void(gpr_thd_id)> thread_joiner)
: shutdown_(false), : shutdown_(false),
num_pollers_(0), num_pollers_(0),
min_pollers_(min_pollers), min_pollers_(min_pollers),
max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
num_threads_(0), num_threads_(0) {}
thread_creator_(thread_creator),
thread_joiner_(thread_joiner) {}
ThreadManager::~ThreadManager() { ThreadManager::~ThreadManager() {
{ {
@ -128,9 +111,7 @@ void ThreadManager::Initialize() {
for (int i = 0; i < min_pollers_; i++) { for (int i = 0; i < min_pollers_; i++) {
// Create a new thread (which ends up calling the MainWorkLoop() function // Create a new thread (which ends up calling the MainWorkLoop() function
bool valid; new WorkerThread(this);
new WorkerThread(this, &valid);
GPR_ASSERT(valid); // we need to have at least this minimum
} }
} }
@ -157,27 +138,18 @@ void ThreadManager::MainWorkLoop() {
case WORK_FOUND: case WORK_FOUND:
// If we got work and there are now insufficient pollers, start a new // If we got work and there are now insufficient pollers, start a new
// one // one
bool resources;
if (!shutdown_ && num_pollers_ < min_pollers_) { if (!shutdown_ && num_pollers_ < min_pollers_) {
bool valid; num_pollers_++;
num_threads_++;
// Drop lock before spawning thread to avoid contention // Drop lock before spawning thread to avoid contention
lock.unlock(); lock.unlock();
auto* th = new WorkerThread(this, &valid); new WorkerThread(this);
lock.lock();
if (valid) {
num_pollers_++;
num_threads_++;
} else {
delete th;
}
resources = (num_pollers_ > 0);
} else { } else {
resources = true; // Drop lock for consistency with above branch
lock.unlock();
} }
// Drop lock before any application work
lock.unlock();
// Lock is always released at this point - do the application work // Lock is always released at this point - do the application work
DoWork(tag, ok, resources); DoWork(tag, ok);
// Take the lock again to check post conditions // Take the lock again to check post conditions
lock.lock(); lock.lock();
// If we're shutdown, we should finish at this point. // If we're shutdown, we should finish at this point.

@ -20,23 +20,18 @@
#define GRPC_INTERNAL_CPP_THREAD_MANAGER_H #define GRPC_INTERNAL_CPP_THREAD_MANAGER_H
#include <condition_variable> #include <condition_variable>
#include <functional>
#include <list> #include <list>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <thread>
#include <grpc++/support/config.h> #include <grpc++/support/config.h>
#include <grpc/support/thd.h>
namespace grpc { namespace grpc {
class ThreadManager { class ThreadManager {
public: public:
ThreadManager(int min_pollers, int max_pollers, explicit ThreadManager(int min_pollers, int max_pollers);
std::function<int(gpr_thd_id*, const char*, void (*)(void*),
void*, const gpr_thd_options*)>
thread_creator,
std::function<void(gpr_thd_id)> thread_joiner);
virtual ~ThreadManager(); virtual ~ThreadManager();
// Initializes and Starts the Rpc Manager threads // Initializes and Starts the Rpc Manager threads
@ -55,8 +50,6 @@ class ThreadManager {
// - ThreadManager does not interpret the values of 'tag' and 'ok' // - ThreadManager does not interpret the values of 'tag' and 'ok'
// - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to // - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to
// DoWork() // DoWork()
// - ThreadManager will also pass DoWork a bool saying if there are actually
// resources to do the work
// //
// If the return value is SHUTDOWN:, // If the return value is SHUTDOWN:,
// - ThreadManager WILL NOT call DoWork() and terminates the thead // - ThreadManager WILL NOT call DoWork() and terminates the thead
@ -76,7 +69,7 @@ class ThreadManager {
// The implementation of DoWork() should also do any setup needed to ensure // The implementation of DoWork() should also do any setup needed to ensure
// that the next call to PollForWork() (not necessarily by the current thread) // that the next call to PollForWork() (not necessarily by the current thread)
// actually finds some work // actually finds some work
virtual void DoWork(void* tag, bool ok, bool resources) = 0; virtual void DoWork(void* tag, bool ok) = 0;
// Mark the ThreadManager as shutdown and begin draining the work. This is a // Mark the ThreadManager as shutdown and begin draining the work. This is a
// non-blocking call and the caller should call Wait(), a blocking call which // non-blocking call and the caller should call Wait(), a blocking call which
@ -91,15 +84,15 @@ class ThreadManager {
virtual void Wait(); virtual void Wait();
private: private:
// Helper wrapper class around thread. This takes a ThreadManager object // Helper wrapper class around std::thread. This takes a ThreadManager object
// and starts a new thread to calls the Run() function. // and starts a new std::thread to calls the Run() function.
// //
// The Run() function calls ThreadManager::MainWorkLoop() function and once // The Run() function calls ThreadManager::MainWorkLoop() function and once
// that completes, it marks the WorkerThread completed by calling // that completes, it marks the WorkerThread completed by calling
// ThreadManager::MarkAsCompleted() // ThreadManager::MarkAsCompleted()
class WorkerThread { class WorkerThread {
public: public:
WorkerThread(ThreadManager* thd_mgr, bool* valid); WorkerThread(ThreadManager* thd_mgr);
~WorkerThread(); ~WorkerThread();
private: private:
@ -109,8 +102,7 @@ class ThreadManager {
ThreadManager* const thd_mgr_; ThreadManager* const thd_mgr_;
std::mutex wt_mu_; std::mutex wt_mu_;
gpr_thd_id thd_; std::thread thd_;
bool valid_;
}; };
// The main funtion in ThreadManager // The main funtion in ThreadManager
@ -137,13 +129,6 @@ class ThreadManager {
// currently polling i.e num_pollers_) // currently polling i.e num_pollers_)
int num_threads_; int num_threads_;
// Functions for creating/joining threads. Normally, these should
// be gpr_thd_new/gpr_thd_join but they are overridable
std::function<int(gpr_thd_id*, const char*, void (*)(void*), void*,
const gpr_thd_options*)>
thread_creator_;
std::function<void(gpr_thd_id)> thread_joiner_;
std::mutex list_mu_; std::mutex list_mu_;
std::list<WorkerThread*> completed_threads_; std::list<WorkerThread*> completed_threads_;
}; };

@ -26,7 +26,6 @@
#include <grpc++/server_builder.h> #include <grpc++/server_builder.h>
#include <grpc++/server_context.h> #include <grpc++/server_context.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/atm.h>
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
@ -53,13 +52,63 @@ namespace testing {
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public: public:
TestServiceImpl() {} TestServiceImpl() : signal_client_(false) {}
Status Echo(ServerContext* context, const EchoRequest* request, Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override { EchoResponse* response) override {
response->set_message(request->message()); response->set_message(request->message());
return Status::OK; return Status::OK;
} }
// Unimplemented is left unimplemented to test the returned error.
Status RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) override {
EchoRequest request;
response->set_message("");
while (reader->Read(&request)) {
response->mutable_message()->append(request.message());
}
return Status::OK;
}
// Return 3 messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status ResponseStream(ServerContext* context, const EchoRequest* request,
ServerWriter<EchoResponse>* writer) override {
EchoResponse response;
response.set_message(request->message() + "0");
writer->Write(response);
response.set_message(request->message() + "1");
writer->Write(response);
response.set_message(request->message() + "2");
writer->Write(response);
return Status::OK;
}
Status BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
EchoRequest request;
EchoResponse response;
while (stream->Read(&request)) {
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
stream->Write(response);
}
return Status::OK;
}
bool signal_client() {
std::unique_lock<std::mutex> lock(mu_);
return signal_client_;
}
private:
bool signal_client_;
std::mutex mu_;
}; };
template <class Service> template <class Service>
@ -70,15 +119,10 @@ class CommonStressTest {
virtual void SetUp() = 0; virtual void SetUp() = 0;
virtual void TearDown() = 0; virtual void TearDown() = 0;
virtual void ResetStub() = 0; virtual void ResetStub() = 0;
virtual bool AllowExhaustion() = 0;
grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); } grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
protected: protected:
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
// Some tests use a custom thread creator. This should be declared before the
// server so that it's destructor happens after the server
std::unique_ptr<ServerBuilderThreadCreatorOverrideTest> creator_;
std::unique_ptr<Server> server_; std::unique_ptr<Server> server_;
virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0; virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0;
@ -103,7 +147,6 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
CreateChannel(server_address_.str(), InsecureChannelCredentials()); CreateChannel(server_address_.str(), InsecureChannelCredentials());
this->stub_ = grpc::testing::EchoTestService::NewStub(channel); this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
} }
bool AllowExhaustion() override { return false; }
protected: protected:
void SetUpStart(ServerBuilder* builder, Service* service) override { void SetUpStart(ServerBuilder* builder, Service* service) override {
@ -119,7 +162,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
std::ostringstream server_address_; std::ostringstream server_address_;
}; };
template <class Service, bool allow_resource_exhaustion> template <class Service>
class CommonStressTestInproc : public CommonStressTest<Service> { class CommonStressTestInproc : public CommonStressTest<Service> {
public: public:
void ResetStub() override { void ResetStub() override {
@ -127,7 +170,6 @@ class CommonStressTestInproc : public CommonStressTest<Service> {
std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args); std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
this->stub_ = grpc::testing::EchoTestService::NewStub(channel); this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
} }
bool AllowExhaustion() override { return allow_resource_exhaustion; }
protected: protected:
void SetUpStart(ServerBuilder* builder, Service* service) override { void SetUpStart(ServerBuilder* builder, Service* service) override {
@ -152,67 +194,6 @@ class CommonStressTestSyncServer : public BaseClass {
TestServiceImpl service_; TestServiceImpl service_;
}; };
class ServerBuilderThreadCreatorOverrideTest {
public:
ServerBuilderThreadCreatorOverrideTest(ServerBuilder* builder, size_t limit)
: limit_(limit), threads_(0) {
builder->SetThreadFunctions(
[this](gpr_thd_id* id, const char* name, void (*f)(void*), void* arg,
const gpr_thd_options* options) -> int {
std::unique_lock<std::mutex> l(mu_);
if (threads_ < limit_) {
l.unlock();
if (gpr_thd_new(id, name, f, arg, options) != 0) {
l.lock();
threads_++;
return 1;
}
}
return 0;
},
[this](gpr_thd_id id) {
gpr_thd_join(id);
std::unique_lock<std::mutex> l(mu_);
threads_--;
if (threads_ == 0) {
done_.notify_one();
}
});
}
~ServerBuilderThreadCreatorOverrideTest() {
// Don't allow destruction until all threads are really done and uncounted
std::unique_lock<std::mutex> l(mu_);
done_.wait(l, [this] { return (threads_ == 0); });
}
private:
size_t limit_;
size_t threads_;
std::mutex mu_;
std::condition_variable done_;
};
template <class BaseClass>
class CommonStressTestSyncServerLowThreadCount : public BaseClass {
public:
void SetUp() override {
ServerBuilder builder;
this->SetUpStart(&builder, &service_);
builder.SetSyncServerOption(ServerBuilder::SyncServerOption::MIN_POLLERS,
1);
this->creator_.reset(
new ServerBuilderThreadCreatorOverrideTest(&builder, 4));
this->SetUpEnd(&builder);
}
void TearDown() override {
this->TearDownStart();
this->TearDownEnd();
}
private:
TestServiceImpl service_;
};
template <class BaseClass> template <class BaseClass>
class CommonStressTestAsyncServer : public BaseClass { class CommonStressTestAsyncServer : public BaseClass {
public: public:
@ -313,8 +294,7 @@ class End2endTest : public ::testing::Test {
Common common_; Common common_;
}; };
static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
bool allow_exhaustion, gpr_atm* errors) {
EchoRequest request; EchoRequest request;
EchoResponse response; EchoResponse response;
request.set_message("Hello"); request.set_message("Hello");
@ -322,48 +302,33 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
for (int i = 0; i < num_rpcs; ++i) { for (int i = 0; i < num_rpcs; ++i) {
ClientContext context; ClientContext context;
Status s = stub->Echo(&context, request, &response); Status s = stub->Echo(&context, request, &response);
EXPECT_TRUE(s.ok() || (allow_exhaustion && EXPECT_EQ(response.message(), request.message());
s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
if (!s.ok()) { if (!s.ok()) {
if (!(allow_exhaustion && gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) { s.error_message().c_str());
gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
s.error_message().c_str());
}
gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
} else {
EXPECT_EQ(response.message(), request.message());
} }
ASSERT_TRUE(s.ok());
} }
} }
typedef ::testing::Types< typedef ::testing::Types<
CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>, CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>, CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>,
CommonStressTestSyncServerLowThreadCount<
CommonStressTestInproc<TestServiceImpl, true>>,
CommonStressTestAsyncServer< CommonStressTestAsyncServer<
CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>, CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
CommonStressTestAsyncServer<CommonStressTestInproc< CommonStressTestAsyncServer<
grpc::testing::EchoTestService::AsyncService, false>>> CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>>
CommonTypes; CommonTypes;
TYPED_TEST_CASE(End2endTest, CommonTypes); TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) { TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub(); this->common_.ResetStub();
std::vector<std::thread> threads; std::vector<std::thread> threads;
gpr_atm errors;
gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
for (int i = 0; i < kNumThreads; ++i) { for (int i = 0; i < kNumThreads; ++i) {
threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs, threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs);
this->common_.AllowExhaustion(), &errors);
} }
for (int i = 0; i < kNumThreads; ++i) { for (int i = 0; i < kNumThreads; ++i) {
threads[i].join(); threads[i].join();
} }
uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
if (error_cnt != 0) {
gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
}
} }
template <class Common> template <class Common>

@ -1,31 +0,0 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
grpc_package(name = "test/cpp/thread_manager")
grpc_cc_test(
name = "thread_manager_test",
srcs = ["thread_manager_test.cc"],
deps = [
"//:gpr",
"//:grpc",
"//:grpc++",
"//test/cpp/util:test_config",
],
)

@ -20,10 +20,10 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <gflags/gflags.h>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/thd.h>
#include "src/cpp/thread_manager/thread_manager.h" #include "src/cpp/thread_manager/thread_manager.h"
#include "test/cpp/util/test_config.h" #include "test/cpp/util/test_config.h"
@ -32,13 +32,13 @@ namespace grpc {
class ThreadManagerTest final : public grpc::ThreadManager { class ThreadManagerTest final : public grpc::ThreadManager {
public: public:
ThreadManagerTest() ThreadManagerTest()
: ThreadManager(kMinPollers, kMaxPollers, gpr_thd_new, gpr_thd_join), : ThreadManager(kMinPollers, kMaxPollers),
num_do_work_(0), num_do_work_(0),
num_poll_for_work_(0), num_poll_for_work_(0),
num_work_found_(0) {} num_work_found_(0) {}
grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override; grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
void DoWork(void* tag, bool ok, bool resources) override; void DoWork(void* tag, bool ok) override;
void PerformTest(); void PerformTest();
private: private:
@ -89,7 +89,7 @@ grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag,
} }
} }
void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) { void ThreadManagerTest::DoWork(void* tag, bool ok) {
gpr_atm_no_barrier_fetch_add(&num_do_work_, 1); gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping
} }

Loading…
Cancel
Save