Create a new method handler for resource exhaustion and tie into thread mgr

pull/16356/head
Vijay Pai 6 years ago
parent 8165c4c0aa
commit 14ad82a76d
  1. 4
      include/grpcpp/impl/codegen/byte_buffer.h
  2. 6
      include/grpcpp/impl/codegen/completion_queue.h
  3. 17
      include/grpcpp/impl/codegen/method_handler_impl.h
  4. 6
      include/grpcpp/impl/codegen/server_context.h
  5. 3
      include/grpcpp/server.h
  6. 22
      src/cpp/server/server_cc.cc
  7. 21
      src/cpp/thread_manager/thread_manager.cc
  8. 2
      src/cpp/thread_manager/thread_manager.h
  9. 109
      test/cpp/end2end/thread_stress_test.cc
  10. 4
      test/cpp/thread_manager/thread_manager_test.cc

@ -45,6 +45,8 @@ template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler; class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler; class ServerStreamingHandler;
template <StatusCode code>
class ErrorMethodHandler;
template <class R> template <class R>
class DeserializeFuncType; class DeserializeFuncType;
class GrpcByteBufferPeer; class GrpcByteBufferPeer;
@ -144,6 +146,8 @@ class ByteBuffer final {
friend class internal::RpcMethodHandler; friend class internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
friend class internal::ServerStreamingHandler; friend class internal::ServerStreamingHandler;
template <StatusCode code>
friend class internal::ErrorMethodHandler;
template <class R> template <class R>
friend class internal::DeserializeFuncType; friend class internal::DeserializeFuncType;
friend class ProtoBufferReader; friend class ProtoBufferReader;

@ -78,9 +78,10 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler; class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler; class BidiStreamingHandler;
class UnknownMethodHandler;
template <class Streamer, bool WriteNeeded> template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler; class TemplatedBidiStreamingHandler;
template <StatusCode code>
class ErrorMethodHandler;
template <class InputMessage, class OutputMessage> template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl; class BlockingUnaryCallImpl;
} // namespace internal } // namespace internal
@ -265,7 +266,8 @@ class CompletionQueue : private GrpcLibraryCodegen {
friend class ::grpc::internal::ServerStreamingHandler; friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded> template <class Streamer, bool WriteNeeded>
friend class ::grpc::internal::TemplatedBidiStreamingHandler; friend class ::grpc::internal::TemplatedBidiStreamingHandler;
friend class ::grpc::internal::UnknownMethodHandler; template <StatusCode code>
friend class ::grpc::internal::ErrorMethodHandler;
friend class ::grpc::Server; friend class ::grpc::Server;
friend class ::grpc::ServerContext; friend class ::grpc::ServerContext;
friend class ::grpc::ServerInterface; friend class ::grpc::ServerInterface;

@ -272,12 +272,14 @@ class SplitServerStreamingHandler
ServerSplitStreamer<RequestType, ResponseType>, false>(func) {} ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
}; };
/// Handle unknown method by returning UNIMPLEMENTED error. /// General method handler class for errors that prevent real method use
class UnknownMethodHandler : public MethodHandler { /// e.g., handle unknown method by returning UNIMPLEMENTED error.
template <StatusCode code>
class ErrorMethodHandler : public MethodHandler {
public: public:
template <class T> template <class T>
static void FillOps(ServerContext* context, T* ops) { static void FillOps(ServerContext* context, T* ops) {
Status status(StatusCode::UNIMPLEMENTED, ""); Status status(code, "");
if (!context->sent_initial_metadata_) { if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_, ops->SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags()); context->initial_metadata_flags());
@ -294,9 +296,18 @@ class UnknownMethodHandler : public MethodHandler {
FillOps(param.server_context, &ops); FillOps(param.server_context, &ops);
param.call->PerformOps(&ops); param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops); param.call->cq()->Pluck(&ops);
// We also have to destroy any request payload in the handler parameter
ByteBuffer* payload = param.request.bbuf_ptr();
if (payload != nullptr) {
payload->Clear();
}
} }
}; };
typedef ErrorMethodHandler<StatusCode::UNIMPLEMENTED> UnknownMethodHandler;
typedef ErrorMethodHandler<StatusCode::RESOURCE_EXHAUSTED>
ResourceExhaustedHandler;
} // namespace internal } // namespace internal
} // namespace grpc } // namespace grpc

@ -63,9 +63,10 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler; class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler; class BidiStreamingHandler;
class UnknownMethodHandler;
template <class Streamer, bool WriteNeeded> template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler; class TemplatedBidiStreamingHandler;
template <StatusCode code>
class ErrorMethodHandler;
class Call; class Call;
} // namespace internal } // namespace internal
@ -262,7 +263,8 @@ class ServerContext {
friend class ::grpc::internal::ServerStreamingHandler; friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded> template <class Streamer, bool WriteNeeded>
friend class ::grpc::internal::TemplatedBidiStreamingHandler; friend class ::grpc::internal::TemplatedBidiStreamingHandler;
friend class ::grpc::internal::UnknownMethodHandler; template <StatusCode code>
friend class internal::ErrorMethodHandler;
friend class ::grpc::ClientContext; friend class ::grpc::ClientContext;
/// Prevent copying. /// Prevent copying.

@ -223,6 +223,9 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
std::unique_ptr<HealthCheckServiceInterface> health_check_service_; std::unique_ptr<HealthCheckServiceInterface> health_check_service_;
bool health_check_service_disabled_; bool health_check_service_disabled_;
// A special handler for resource exhausted in sync case
std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_;
}; };
} // namespace grpc } // namespace grpc

@ -210,8 +210,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
call_(mrd->call_, server, &cq_, server->max_receive_message_size()), call_(mrd->call_, server, &cq_, server->max_receive_message_size()),
ctx_(mrd->deadline_, &mrd->request_metadata_), ctx_(mrd->deadline_, &mrd->request_metadata_),
has_request_payload_(mrd->has_request_payload_), has_request_payload_(mrd->has_request_payload_),
request_payload_(mrd->request_payload_), request_payload_(has_request_payload_ ? mrd->request_payload_
method_(mrd->method_) { : nullptr),
method_(mrd->method_),
server_(server) {
ctx_.set_call(mrd->call_); ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_; ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_); GPR_ASSERT(mrd->in_flight_);
@ -225,10 +227,13 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
} }
} }
void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks) { void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
bool resources) {
ctx_.BeginCompletionOp(&call_); ctx_.BeginCompletionOp(&call_);
global_callbacks->PreSynchronousRequest(&ctx_); global_callbacks->PreSynchronousRequest(&ctx_);
method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( auto* handler = resources ? method_->handler()
: server_->resource_exhausted_handler_.get();
handler->RunHandler(internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_payload_)); &call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_); global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr; request_payload_ = nullptr;
@ -250,6 +255,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
const bool has_request_payload_; const bool has_request_payload_;
grpc_byte_buffer* request_payload_; grpc_byte_buffer* request_payload_;
internal::RpcServiceMethod* const method_; internal::RpcServiceMethod* const method_;
Server* server_;
}; };
private: private:
@ -300,7 +306,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
GPR_UNREACHABLE_CODE(return TIMEOUT); GPR_UNREACHABLE_CODE(return TIMEOUT);
} }
void DoWork(void* tag, bool ok) override { void DoWork(void* tag, bool ok, bool resources) override {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag); SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) { if (!sync_req) {
@ -320,7 +326,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
} }
GPR_TIMER_SCOPE("cd.Run()", 0); GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_); cd.Run(global_callbacks_, resources);
} }
// TODO (sreek) If ok is false here (which it isn't in case of // TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request // grpc_request_registered_call), we should still re-queue the request
@ -578,6 +584,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
} }
} }
if (!sync_server_cqs_->empty()) {
resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler);
}
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start(); (*it)->Start();
} }

@ -166,8 +166,9 @@ void ThreadManager::MainWorkLoop() {
case WORK_FOUND: case WORK_FOUND:
// If we got work and there are now insufficient pollers and there is // If we got work and there are now insufficient pollers and there is
// quota available to create a new thread, start a new poller thread // quota available to create a new thread, start a new poller thread
if (!shutdown_ && num_pollers_ < min_pollers_ && bool got_thread;
grpc_resource_user_allocate_threads(resource_user_, 1)) { if (!shutdown_ && num_pollers_ < min_pollers_) {
if (grpc_resource_user_allocate_threads(resource_user_, 1)) {
num_pollers_++; num_pollers_++;
num_threads_++; num_threads_++;
if (num_threads_ > max_active_threads_sofar_) { if (num_threads_ > max_active_threads_sofar_) {
@ -176,12 +177,26 @@ void ThreadManager::MainWorkLoop() {
// Drop lock before spawning thread to avoid contention // Drop lock before spawning thread to avoid contention
lock.unlock(); lock.unlock();
new WorkerThread(this); new WorkerThread(this);
got_thread = true;
} else if (num_pollers_ > 0) {
// There is still at least some thread polling, so we can go on
// even though we couldn't allocate a new thread
lock.unlock();
got_thread = true;
} else {
// There are no pollers to spare and we couldn't allocate
// a new thread, so resources are exhausted!
lock.unlock();
got_thread = false;
}
} else { } else {
// Drop lock for consistency with above branch // Drop lock for consistency with above branch
lock.unlock(); lock.unlock();
got_thread = true;
} }
// Lock is always released at this point - do the application work // Lock is always released at this point - do the application work
DoWork(tag, ok); // or return resource exhausted
DoWork(tag, ok, got_thread);
// Take the lock again to check post conditions // Take the lock again to check post conditions
lock.lock(); lock.lock();
// If we're shutdown, we should finish at this point. // If we're shutdown, we should finish at this point.

@ -72,7 +72,7 @@ class ThreadManager {
// The implementation of DoWork() should also do any setup needed to ensure // The implementation of DoWork() should also do any setup needed to ensure
// that the next call to PollForWork() (not necessarily by the current thread) // that the next call to PollForWork() (not necessarily by the current thread)
// actually finds some work // actually finds some work
virtual void DoWork(void* tag, bool ok) = 0; virtual void DoWork(void* tag, bool ok, bool resources) = 0;
// Mark the ThreadManager as shutdown and begin draining the work. This is a // Mark the ThreadManager as shutdown and begin draining the work. This is a
// non-blocking call and the caller should call Wait(), a blocking call which // non-blocking call and the caller should call Wait(), a blocking call which

@ -16,6 +16,7 @@
* *
*/ */
#include <cinttypes>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
@ -24,6 +25,7 @@
#include <grpcpp/channel.h> #include <grpcpp/channel.h>
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h> #include <grpcpp/create_channel.h>
#include <grpcpp/resource_quota.h>
#include <grpcpp/server.h> #include <grpcpp/server.h>
#include <grpcpp/server_builder.h> #include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h> #include <grpcpp/server_context.h>
@ -51,63 +53,13 @@ namespace testing {
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public: public:
TestServiceImpl() : signal_client_(false) {} TestServiceImpl() {}
Status Echo(ServerContext* context, const EchoRequest* request, Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override { EchoResponse* response) override {
response->set_message(request->message()); response->set_message(request->message());
return Status::OK; return Status::OK;
} }
// Unimplemented is left unimplemented to test the returned error.
Status RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) override {
EchoRequest request;
response->set_message("");
while (reader->Read(&request)) {
response->mutable_message()->append(request.message());
}
return Status::OK;
}
// Return 3 messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status ResponseStream(ServerContext* context, const EchoRequest* request,
ServerWriter<EchoResponse>* writer) override {
EchoResponse response;
response.set_message(request->message() + "0");
writer->Write(response);
response.set_message(request->message() + "1");
writer->Write(response);
response.set_message(request->message() + "2");
writer->Write(response);
return Status::OK;
}
Status BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
EchoRequest request;
EchoResponse response;
while (stream->Read(&request)) {
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
stream->Write(response);
}
return Status::OK;
}
bool signal_client() {
std::unique_lock<std::mutex> lock(mu_);
return signal_client_;
}
private:
bool signal_client_;
std::mutex mu_;
}; };
template <class Service> template <class Service>
@ -118,6 +70,7 @@ class CommonStressTest {
virtual void SetUp() = 0; virtual void SetUp() = 0;
virtual void TearDown() = 0; virtual void TearDown() = 0;
virtual void ResetStub() = 0; virtual void ResetStub() = 0;
virtual bool AllowExhaustion() = 0;
grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); } grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
protected: protected:
@ -146,6 +99,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
CreateChannel(server_address_.str(), InsecureChannelCredentials()); CreateChannel(server_address_.str(), InsecureChannelCredentials());
this->stub_ = grpc::testing::EchoTestService::NewStub(channel); this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
} }
bool AllowExhaustion() override { return false; }
protected: protected:
void SetUpStart(ServerBuilder* builder, Service* service) override { void SetUpStart(ServerBuilder* builder, Service* service) override {
@ -161,7 +115,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
std::ostringstream server_address_; std::ostringstream server_address_;
}; };
template <class Service> template <class Service, bool allow_resource_exhaustion>
class CommonStressTestInproc : public CommonStressTest<Service> { class CommonStressTestInproc : public CommonStressTest<Service> {
public: public:
void ResetStub() override { void ResetStub() override {
@ -169,6 +123,7 @@ class CommonStressTestInproc : public CommonStressTest<Service> {
std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args); std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
this->stub_ = grpc::testing::EchoTestService::NewStub(channel); this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
} }
bool AllowExhaustion() override { return allow_resource_exhaustion; }
protected: protected:
void SetUpStart(ServerBuilder* builder, Service* service) override { void SetUpStart(ServerBuilder* builder, Service* service) override {
@ -193,6 +148,26 @@ class CommonStressTestSyncServer : public BaseClass {
TestServiceImpl service_; TestServiceImpl service_;
}; };
template <class BaseClass>
class CommonStressTestSyncServerLowThreadCount : public BaseClass {
public:
void SetUp() override {
ServerBuilder builder;
ResourceQuota quota;
this->SetUpStart(&builder, &service_);
quota.SetMaxThreads(4);
builder.SetResourceQuota(quota);
this->SetUpEnd(&builder);
}
void TearDown() override {
this->TearDownStart();
this->TearDownEnd();
}
private:
TestServiceImpl service_;
};
template <class BaseClass> template <class BaseClass>
class CommonStressTestAsyncServer : public BaseClass { class CommonStressTestAsyncServer : public BaseClass {
public: public:
@ -293,7 +268,8 @@ class End2endTest : public ::testing::Test {
Common common_; Common common_;
}; };
static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
bool allow_exhaustion, gpr_atm* errors) {
EchoRequest request; EchoRequest request;
EchoResponse response; EchoResponse response;
request.set_message("Hello"); request.set_message("Hello");
@ -301,34 +277,49 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
for (int i = 0; i < num_rpcs; ++i) { for (int i = 0; i < num_rpcs; ++i) {
ClientContext context; ClientContext context;
Status s = stub->Echo(&context, request, &response); Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message()); EXPECT_TRUE(s.ok() || (allow_exhaustion &&
s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
if (!s.ok()) { if (!s.ok()) {
if (!(allow_exhaustion &&
s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
s.error_message().c_str()); s.error_message().c_str());
} }
ASSERT_TRUE(s.ok()); gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
} else {
EXPECT_EQ(response.message(), request.message());
}
} }
} }
typedef ::testing::Types< typedef ::testing::Types<
CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>, CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>, CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
CommonStressTestSyncServerLowThreadCount<
CommonStressTestInproc<TestServiceImpl, true>>,
CommonStressTestAsyncServer< CommonStressTestAsyncServer<
CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>, CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
CommonStressTestAsyncServer< CommonStressTestAsyncServer<CommonStressTestInproc<
CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>> grpc::testing::EchoTestService::AsyncService, false>>>
CommonTypes; CommonTypes;
TYPED_TEST_CASE(End2endTest, CommonTypes); TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) { TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub(); this->common_.ResetStub();
std::vector<std::thread> threads; std::vector<std::thread> threads;
gpr_atm errors;
gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
threads.reserve(kNumThreads); threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; ++i) { for (int i = 0; i < kNumThreads; ++i) {
threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs); threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
this->common_.AllowExhaustion(), &errors);
} }
for (int i = 0; i < kNumThreads; ++i) { for (int i = 0; i < kNumThreads; ++i) {
threads[i].join(); threads[i].join();
} }
uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
if (error_cnt != 0) {
gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
}
} }
template <class Common> template <class Common>

@ -55,7 +55,7 @@ class ThreadManagerTest final : public grpc::ThreadManager {
num_work_found_(0) {} num_work_found_(0) {}
grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override; grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
void DoWork(void* tag, bool ok) override; void DoWork(void* tag, bool ok, bool resources) override;
// Get number of times PollForWork() returned WORK_FOUND // Get number of times PollForWork() returned WORK_FOUND
int GetNumWorkFound(); int GetNumWorkFound();
@ -102,7 +102,7 @@ grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag,
return WORK_FOUND; return WORK_FOUND;
} }
void ThreadManagerTest::DoWork(void* tag, bool ok) { void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) {
gpr_atm_no_barrier_fetch_add(&num_do_work_, 1); gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
SleepForMs(settings_.work_duration_ms); // Simulate work by sleeping SleepForMs(settings_.work_duration_ms); // Simulate work by sleeping
} }

Loading…
Cancel
Save