diff --git a/Makefile b/Makefile index 6d15a57745d..b4dcd431485 100644 --- a/Makefile +++ b/Makefile @@ -894,6 +894,7 @@ uri_parser_test: $(BINDIR)/$(CONFIG)/uri_parser_test workqueue_test: $(BINDIR)/$(CONFIG)/workqueue_test async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test async_streaming_ping_pong_test: $(BINDIR)/$(CONFIG)/async_streaming_ping_pong_test +async_thread_stress_test: $(BINDIR)/$(CONFIG)/async_thread_stress_test async_unary_ping_pong_test: $(BINDIR)/$(CONFIG)/async_unary_ping_pong_test auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test @@ -1248,6 +1249,7 @@ buildtests_c: privatelibs_c \ buildtests_cxx: buildtests_zookeeper privatelibs_cxx \ $(BINDIR)/$(CONFIG)/async_end2end_test \ $(BINDIR)/$(CONFIG)/async_streaming_ping_pong_test \ + $(BINDIR)/$(CONFIG)/async_thread_stress_test \ $(BINDIR)/$(CONFIG)/async_unary_ping_pong_test \ $(BINDIR)/$(CONFIG)/auth_property_iterator_test \ $(BINDIR)/$(CONFIG)/channel_arguments_test \ @@ -1538,6 +1540,8 @@ test_cxx: test_zookeeper buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/async_end2end_test || ( echo test async_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing async_streaming_ping_pong_test" $(Q) $(BINDIR)/$(CONFIG)/async_streaming_ping_pong_test || ( echo test async_streaming_ping_pong_test failed ; exit 1 ) + $(E) "[RUN] Testing async_thread_stress_test" + $(Q) $(BINDIR)/$(CONFIG)/async_thread_stress_test || ( echo test async_thread_stress_test failed ; exit 1 ) $(E) "[RUN] Testing async_unary_ping_pong_test" $(Q) $(BINDIR)/$(CONFIG)/async_unary_ping_pong_test || ( echo test async_unary_ping_pong_test failed ; exit 1 ) $(E) "[RUN] Testing auth_property_iterator_test" @@ -8818,6 +8822,49 @@ endif endif +ASYNC_THREAD_STRESS_TEST_SRC = \ + test/cpp/end2end/async_thread_stress_test.cc \ + +ASYNC_THREAD_STRESS_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(ASYNC_THREAD_STRESS_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/async_thread_stress_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/async_thread_stress_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/async_thread_stress_test: $(PROTOBUF_DEP) $(ASYNC_THREAD_STRESS_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(ASYNC_THREAD_STRESS_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/async_thread_stress_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/async_thread_stress_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_async_thread_stress_test: $(ASYNC_THREAD_STRESS_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(ASYNC_THREAD_STRESS_TEST_OBJS:.o=.dep) +endif +endif + + ASYNC_UNARY_PING_PONG_TEST_SRC = \ test/cpp/qps/async_unary_ping_pong_test.cc \ diff --git a/build.yaml b/build.yaml index 61a28cb2560..ad36e6489c3 100644 --- a/build.yaml +++ b/build.yaml @@ -1825,6 +1825,19 @@ targets: - mac - linux - posix +- name: async_thread_stress_test + cpu_cost: 100 + build: test + language: c++ + src: + - test/cpp/end2end/async_thread_stress_test.cc + deps: + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr - name: async_unary_ping_pong_test build: test language: c++ diff --git a/test/cpp/end2end/async_thread_stress_test.cc b/test/cpp/end2end/async_thread_stress_test.cc new file mode 100644 index 00000000000..846aeef1fec --- /dev/null +++ b/test/cpp/end2end/async_thread_stress_test.cc @@ -0,0 +1,291 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +using grpc::testing::EchoRequest; +using grpc::testing::EchoResponse; +using std::chrono::system_clock; + +namespace grpc { +namespace testing { + +namespace { + +// When echo_deadline is requested, deadline seen in the ServerContext is set in +// the response in seconds. +void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, + EchoResponse* response) { + if (request->has_param() && request->param().echo_deadline()) { + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + if (context->deadline() != system_clock::time_point::max()) { + Timepoint2Timespec(context->deadline(), &deadline); + } + response->mutable_param()->set_request_deadline(deadline.tv_sec); + } +} + +} // namespace + +class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { + public: + TestServiceImpl() : signal_client_(false) {} + + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) GRPC_OVERRIDE { + response->set_message(request->message()); + MaybeEchoDeadline(context, request, response); + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock lock(mu_); + signal_client_ = true; + } + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().client_cancel_after_us(), + GPR_TIMESPAN))); + } + return Status::CANCELLED; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().server_cancel_after_us(), + GPR_TIMESPAN))); + return Status::CANCELLED; + } else { + EXPECT_FALSE(context->IsCancelled()); + } + return Status::OK; + } + + // Unimplemented is left unimplemented to test the returned error. + + Status RequestStream(ServerContext* context, + ServerReader* reader, + EchoResponse* response) GRPC_OVERRIDE { + EchoRequest request; + response->set_message(""); + while (reader->Read(&request)) { + response->mutable_message()->append(request.message()); + } + return Status::OK; + } + + // Return 3 messages. + // TODO(yangg) make it generic by adding a parameter into EchoRequest + Status ResponseStream(ServerContext* context, const EchoRequest* request, + ServerWriter* writer) GRPC_OVERRIDE { + EchoResponse response; + response.set_message(request->message() + "0"); + writer->Write(response); + response.set_message(request->message() + "1"); + writer->Write(response); + response.set_message(request->message() + "2"); + writer->Write(response); + + return Status::OK; + } + + Status BidiStream(ServerContext* context, + ServerReaderWriter* stream) + GRPC_OVERRIDE { + EchoRequest request; + EchoResponse response; + while (stream->Read(&request)) { + gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); + response.set_message(request.message()); + stream->Write(response); + } + return Status::OK; + } + + bool signal_client() { + std::unique_lock lock(mu_); + return signal_client_; + } + + private: + bool signal_client_; + std::mutex mu_; +}; + +class TestServiceImplDupPkg + : public ::grpc::testing::duplicate::EchoTestService::Service { + public: + Status Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response) GRPC_OVERRIDE { + response->set_message("no package"); + return Status::OK; + } +}; + +class AsyncClientEnd2endTest : public ::testing::Test { + protected: + AsyncClientEnd2endTest() : kMaxMessageSize_(8192), rpcs_outstanding_(0) {} + + void SetUp() GRPC_OVERRIDE { + int port = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port; + // Setup server + ServerBuilder builder; + builder.AddListeningPort(server_address_.str(), + InsecureServerCredentials()); + builder.RegisterService(&service_); + builder.SetMaxMessageSize( + kMaxMessageSize_); // For testing max message size. + server_ = builder.BuildAndStart(); + } + + void TearDown() GRPC_OVERRIDE { server_->Shutdown(); } + + void ResetStub() { + std::shared_ptr channel = + CreateChannel(server_address_.str(), InsecureChannelCredentials()); + stub_ = grpc::testing::EchoTestService::NewStub(channel); + } + + void Wait() { + std::unique_lock l(mu_); + while (rpcs_outstanding_ != 0) { + cv_.wait(l); + } + + cq_.Shutdown(); + } + + struct AsyncClientCall { + EchoResponse response; + ClientContext context; + Status status; + std::unique_ptr> response_reader; + }; + + void AsyncSendRpc(int num_rpcs) { + for (int i = 0; i < num_rpcs; ++i) { + AsyncClientCall* call = new AsyncClientCall; + EchoRequest request; + request.set_message(std::to_string(i)); + call->response_reader = stub_->AsyncEcho(&call->context, request, &cq_); + call->response_reader->Finish(&call->response, &call->status, + (void*)call); + + std::unique_lock l(mu_); + rpcs_outstanding_++; + } + } + + void AsyncCompleteRpc() { + while (true) { + void* got_tag; + bool ok = false; + if (!cq_.Next(&got_tag, &ok)) break; + Call* call = static_cast(got_tag); + GPR_ASSERT(ok); + delete call; + + bool notify; + { + std::unique_lock l(mu_); + rpcs_outstanding_--; + notify = (rpcs_outstanding_ == 0); + } + if (notify) { + cv_.notify_all(); + } + } + } + + std::unique_ptr stub_; + std::unique_ptr server_; + std::ostringstream server_address_; + const int kMaxMessageSize_; + TestServiceImpl service_; + + CompletionQueue cq_; + std::mutex mu_; + std::condition_variable cv_; + int rpcs_outstanding_; +}; + +TEST_F(AsyncClientEnd2endTest, ThreadStress) { + ResetStub(); + std::vector threads; + for (int i = 0; i < 100; ++i) { + threads.push_back(new std::thread( + &AsyncClientEnd2endTest_ThreadStress_Test::AsyncSendRpc, this, 1000)); + } + for (int i = 0; i < 100; ++i) { + threads[i]->join(); + delete threads[i]; + } + + threads.clear(); + + for (int i = 0; i < 100; ++i) { + threads.push_back(new std::thread( + &AsyncClientEnd2endTest_ThreadStress_Test::AsyncCompleteRpc, this)); + } + Wait(); + for (int i = 0; i < 100; ++i) { + threads[i]->join(); + delete threads[i]; + } +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 3e42c59ed3e..506aea53329 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -1318,6 +1318,22 @@ "test/cpp/qps/async_streaming_ping_pong_test.cc" ] }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_test_util" + ], + "headers": [], + "language": "c++", + "name": "async_thread_stress_test", + "src": [ + "test/cpp/end2end/async_thread_stress_test.cc" + ] + }, { "deps": [ "gpr", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index e3dd29e7e72..be5a02aac86 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -1705,6 +1705,26 @@ "posix" ] }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 100, + "exclude_configs": [], + "flaky": false, + "language": "c++", + "name": "async_thread_stress_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ] + }, { "args": [], "ci_platforms": [