Merge pull request #2977 from ctiller/shutdown-c++

Add a deadline to Server::Shutdown
pull/3017/head
Yang Gao 9 years ago
commit 48c9f250c2
  1. 45
      Makefile
  2. 20
      build.json
  3. 11
      include/grpc++/server.h
  4. 37
      src/cpp/server/server.cc
  5. 159
      test/cpp/end2end/shutdown_test.cc
  6. 17
      tools/run_tests/sources_and_headers.json
  7. 17
      tools/run_tests/tests.json
  8. 10
      vsprojects/Grpc.mak

@ -889,6 +889,7 @@ reconnect_interop_server: $(BINDIR)/$(CONFIG)/reconnect_interop_server
secure_auth_context_test: $(BINDIR)/$(CONFIG)/secure_auth_context_test secure_auth_context_test: $(BINDIR)/$(CONFIG)/secure_auth_context_test
server_crash_test: $(BINDIR)/$(CONFIG)/server_crash_test server_crash_test: $(BINDIR)/$(CONFIG)/server_crash_test
server_crash_test_client: $(BINDIR)/$(CONFIG)/server_crash_test_client server_crash_test_client: $(BINDIR)/$(CONFIG)/server_crash_test_client
shutdown_test: $(BINDIR)/$(CONFIG)/shutdown_test
status_test: $(BINDIR)/$(CONFIG)/status_test status_test: $(BINDIR)/$(CONFIG)/status_test
sync_streaming_ping_pong_test: $(BINDIR)/$(CONFIG)/sync_streaming_ping_pong_test sync_streaming_ping_pong_test: $(BINDIR)/$(CONFIG)/sync_streaming_ping_pong_test
sync_unary_ping_pong_test: $(BINDIR)/$(CONFIG)/sync_unary_ping_pong_test sync_unary_ping_pong_test: $(BINDIR)/$(CONFIG)/sync_unary_ping_pong_test
@ -1735,7 +1736,7 @@ buildtests_c: privatelibs_c $(BINDIR)/$(CONFIG)/alarm_heap_test $(BINDIR)/$(CONF
buildtests_cxx: buildtests_zookeeper privatelibs_cxx $(BINDIR)/$(CONFIG)/async_end2end_test $(BINDIR)/$(CONFIG)/async_streaming_ping_pong_test $(BINDIR)/$(CONFIG)/async_unary_ping_pong_test $(BINDIR)/$(CONFIG)/auth_property_iterator_test $(BINDIR)/$(CONFIG)/channel_arguments_test $(BINDIR)/$(CONFIG)/cli_call_test $(BINDIR)/$(CONFIG)/client_crash_test $(BINDIR)/$(CONFIG)/client_crash_test_server $(BINDIR)/$(CONFIG)/credentials_test $(BINDIR)/$(CONFIG)/cxx_byte_buffer_test $(BINDIR)/$(CONFIG)/cxx_slice_test $(BINDIR)/$(CONFIG)/cxx_time_test $(BINDIR)/$(CONFIG)/dynamic_thread_pool_test $(BINDIR)/$(CONFIG)/end2end_test $(BINDIR)/$(CONFIG)/fixed_size_thread_pool_test $(BINDIR)/$(CONFIG)/generic_end2end_test $(BINDIR)/$(CONFIG)/grpc_cli $(BINDIR)/$(CONFIG)/interop_client $(BINDIR)/$(CONFIG)/interop_server $(BINDIR)/$(CONFIG)/interop_test $(BINDIR)/$(CONFIG)/mock_test $(BINDIR)/$(CONFIG)/qps_interarrival_test $(BINDIR)/$(CONFIG)/qps_openloop_test $(BINDIR)/$(CONFIG)/qps_test $(BINDIR)/$(CONFIG)/reconnect_interop_client $(BINDIR)/$(CONFIG)/reconnect_interop_server $(BINDIR)/$(CONFIG)/secure_auth_context_test $(BINDIR)/$(CONFIG)/server_crash_test $(BINDIR)/$(CONFIG)/server_crash_test_client $(BINDIR)/$(CONFIG)/status_test $(BINDIR)/$(CONFIG)/sync_streaming_ping_pong_test $(BINDIR)/$(CONFIG)/sync_unary_ping_pong_test $(BINDIR)/$(CONFIG)/thread_stress_test buildtests_cxx: buildtests_zookeeper privatelibs_cxx $(BINDIR)/$(CONFIG)/async_end2end_test $(BINDIR)/$(CONFIG)/async_streaming_ping_pong_test $(BINDIR)/$(CONFIG)/async_unary_ping_pong_test $(BINDIR)/$(CONFIG)/auth_property_iterator_test $(BINDIR)/$(CONFIG)/channel_arguments_test $(BINDIR)/$(CONFIG)/cli_call_test $(BINDIR)/$(CONFIG)/client_crash_test $(BINDIR)/$(CONFIG)/client_crash_test_server $(BINDIR)/$(CONFIG)/credentials_test $(BINDIR)/$(CONFIG)/cxx_byte_buffer_test $(BINDIR)/$(CONFIG)/cxx_slice_test $(BINDIR)/$(CONFIG)/cxx_time_test $(BINDIR)/$(CONFIG)/dynamic_thread_pool_test $(BINDIR)/$(CONFIG)/end2end_test $(BINDIR)/$(CONFIG)/fixed_size_thread_pool_test $(BINDIR)/$(CONFIG)/generic_end2end_test $(BINDIR)/$(CONFIG)/grpc_cli $(BINDIR)/$(CONFIG)/interop_client $(BINDIR)/$(CONFIG)/interop_server $(BINDIR)/$(CONFIG)/interop_test $(BINDIR)/$(CONFIG)/mock_test $(BINDIR)/$(CONFIG)/qps_interarrival_test $(BINDIR)/$(CONFIG)/qps_openloop_test $(BINDIR)/$(CONFIG)/qps_test $(BINDIR)/$(CONFIG)/reconnect_interop_client $(BINDIR)/$(CONFIG)/reconnect_interop_server $(BINDIR)/$(CONFIG)/secure_auth_context_test $(BINDIR)/$(CONFIG)/server_crash_test $(BINDIR)/$(CONFIG)/server_crash_test_client $(BINDIR)/$(CONFIG)/status_test $(BINDIR)/$(CONFIG)/sync_streaming_ping_pong_test $(BINDIR)/$(CONFIG)/sync_unary_ping_pong_test $(BINDIR)/$(CONFIG)/thread_stress_test
ifeq ($(HAS_ZOOKEEPER),true) ifeq ($(HAS_ZOOKEEPER),true)
buildtests_zookeeper: privatelibs_zookeeper $(BINDIR)/$(CONFIG)/zookeeper_test buildtests_zookeeper: privatelibs_zookeeper $(BINDIR)/$(CONFIG)/shutdown_test $(BINDIR)/$(CONFIG)/zookeeper_test
else else
buildtests_zookeeper: buildtests_zookeeper:
endif endif
@ -3363,6 +3364,8 @@ flaky_test_cxx: buildtests_cxx
ifeq ($(HAS_ZOOKEEPER),true) ifeq ($(HAS_ZOOKEEPER),true)
test_zookeeper: buildtests_zookeeper test_zookeeper: buildtests_zookeeper
$(E) "[RUN] Testing shutdown_test"
$(Q) $(BINDIR)/$(CONFIG)/shutdown_test || ( echo test shutdown_test failed ; exit 1 )
$(E) "[RUN] Testing zookeeper_test" $(E) "[RUN] Testing zookeeper_test"
$(Q) $(BINDIR)/$(CONFIG)/zookeeper_test || ( echo test zookeeper_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/zookeeper_test || ( echo test zookeeper_test failed ; exit 1 )
@ -10262,6 +10265,46 @@ endif
endif endif
SHUTDOWN_TEST_SRC = \
test/cpp/end2end/shutdown_test.cc \
SHUTDOWN_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(SHUTDOWN_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/shutdown_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/shutdown_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/shutdown_test: $(PROTOBUF_DEP) $(SHUTDOWN_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc_zookeeper.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(SHUTDOWN_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc_zookeeper.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a -lzookeeper_mt $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/shutdown_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/shutdown_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc_zookeeper.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_shutdown_test: $(SHUTDOWN_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(SHUTDOWN_TEST_OBJS:.o=.dep)
endif
endif
STATUS_TEST_SRC = \ STATUS_TEST_SRC = \
test/cpp/util/status_test.cc \ test/cpp/util/status_test.cc \

@ -2610,6 +2610,26 @@
"gpr" "gpr"
] ]
}, },
{
"name": "shutdown_test",
"build": "test",
"language": "c++",
"src": [
"test/cpp/end2end/shutdown_test.cc"
],
"deps": [
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc_zookeeper",
"grpc",
"gpr_test_util",
"gpr"
],
"external_deps": [
"zookeeper"
]
},
{ {
"name": "status_test", "name": "status_test",
"build": "test", "build": "test",

@ -63,7 +63,14 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
~Server(); ~Server();
// Shutdown the server, block until all rpc processing finishes. // Shutdown the server, block until all rpc processing finishes.
void Shutdown(); // Forcefully terminate pending calls after deadline expires.
template <class T>
void Shutdown(const T& deadline) {
ShutdownInternal(TimePoint<T>(deadline).raw_time());
}
// Shutdown the server, waiting for all rpc processing to finish.
void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
// Block waiting for all work to complete (the server must either // Block waiting for all work to complete (the server must either
// be shutting down or some other thread must call Shutdown for this // be shutting down or some other thread must call Shutdown for this
@ -99,6 +106,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
void ShutdownInternal(gpr_timespec deadline);
class BaseAsyncRequest : public CompletionQueueTag { class BaseAsyncRequest : public CompletionQueueTag {
public: public:
BaseAsyncRequest(Server* server, ServerContext* context, BaseAsyncRequest(Server* server, ServerContext* context,

@ -90,6 +90,26 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd; return mrd;
} }
static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
gpr_timespec deadline) {
void* tag = nullptr;
*ok = false;
switch (cq->AsyncNext(&tag, ok, deadline)) {
case CompletionQueue::TIMEOUT:
*req = nullptr;
return true;
case CompletionQueue::SHUTDOWN:
*req = nullptr;
return false;
case CompletionQueue::GOT_EVENT:
*req = static_cast<SyncRequest*>(tag);
GPR_ASSERT((*req)->in_flight_);
return true;
}
gpr_log(GPR_ERROR, "Should never reach here");
abort();
}
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() { void TeardownRequest() {
@ -303,12 +323,27 @@ bool Server::Start() {
return true; return true;
} }
void Server::Shutdown() { void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_); grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) { if (started_ && !shutdown_) {
shutdown_ = true; shutdown_ = true;
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown(); cq_.Shutdown();
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
// Since nothing else needs to touch state guarded by mu_, holding it
// through this loop is fine.
SyncRequest* request;
bool ok;
while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
if (request == NULL) { // deadline expired
grpc_server_cancel_all_calls(server_);
deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
} else if (ok) {
SyncRequest::CallData call_data(this, request);
}
}
// Wait for running callbacks to finish. // Wait for running callbacks to finish.
while (num_running_cb_ != 0) { while (num_running_cb_ != 0) {

@ -0,0 +1,159 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "test/core/util/test_config.h"
#include <thread>
#include "test/core/util/port.h"
#include "test/cpp/util/echo.grpc.pb.h"
#include "src/core/support/env.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
#include <gtest/gtest.h>
#include <grpc/grpc.h>
#include <grpc/support/sync.h>
using grpc::cpp::test::util::EchoRequest;
using grpc::cpp::test::util::EchoResponse;
namespace grpc {
namespace testing {
class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
public:
explicit TestServiceImpl(gpr_event* ev) : ev_(ev) {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) GRPC_OVERRIDE {
gpr_event_set(ev_, (void*)1);
while (!context->IsCancelled()) {
}
return Status::OK;
}
private:
gpr_event* ev_;
};
class ShutdownTest : public ::testing::Test {
public:
ShutdownTest() : shutdown_(false), service_(&ev_) { gpr_event_init(&ev_); }
void SetUp() GRPC_OVERRIDE {
port_ = grpc_pick_unused_port_or_die();
server_ = SetUpServer(port_);
}
std::unique_ptr<Server> SetUpServer(const int port) {
grpc::string server_address = "localhost:" + to_string(port);
ServerBuilder builder;
builder.AddListeningPort(server_address, InsecureServerCredentials());
builder.RegisterService(&service_);
std::unique_ptr<Server> server = builder.BuildAndStart();
return server;
}
void TearDown() GRPC_OVERRIDE { GPR_ASSERT(shutdown_); }
void ResetStub() {
string target = "dns:localhost:" + to_string(port_);
channel_ = CreateChannel(target, InsecureCredentials(), ChannelArguments());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_));
}
string to_string(const int number) {
std::stringstream strs;
strs << number;
return strs.str();
}
void SendRequest() {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
ClientContext context;
GPR_ASSERT(!shutdown_);
Status s = stub_->Echo(&context, request, &response);
GPR_ASSERT(shutdown_);
}
protected:
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<Server> server_;
bool shutdown_;
int port_;
gpr_event ev_;
TestServiceImpl service_;
};
// Tests zookeeper state change between two RPCs
// TODO(ctiller): leaked objects in this test
TEST_F(ShutdownTest, ShutdownTest) {
ResetStub();
// send the request in a background thread
std::thread thr(std::bind(&ShutdownTest::SendRequest, this));
// wait for the server to get the event
gpr_event_wait(&ev_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
shutdown_ = true;
// shutdown should trigger cancellation causing everything to shutdown
auto deadline =
std::chrono::system_clock::now() + std::chrono::microseconds(100);
server_->Shutdown(deadline);
EXPECT_GE(std::chrono::system_clock::now(), deadline);
thr.join();
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -1620,6 +1620,23 @@
"test/cpp/end2end/server_crash_test_client.cc" "test/cpp/end2end/server_crash_test_client.cc"
] ]
}, },
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util",
"grpc_zookeeper"
],
"headers": [],
"language": "c++",
"name": "shutdown_test",
"src": [
"test/cpp/end2end/shutdown_test.cc"
]
},
{ {
"deps": [ "deps": [
"gpr", "gpr",

@ -1425,6 +1425,23 @@
"posix" "posix"
] ]
}, },
{
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"flaky": false,
"language": "c++",
"name": "shutdown_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
]
},
{ {
"ci_platforms": [ "ci_platforms": [
"linux", "linux",

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save