From b31f402b469ceeec2a9f7d70c596a1c03351d795 Mon Sep 17 00:00:00 2001 From: Prashant Jaikumar Date: Tue, 19 Feb 2019 10:46:05 -0800 Subject: [PATCH] Add flaky_network_test after fixing internal build failures. Re-add flaky_network_test along with a couple of new testcases. --- test/cpp/end2end/BUILD | 19 + test/cpp/end2end/flaky_network_test.cc | 492 ++++++++++++++++++ .../linux/grpc_bazel_privileged_docker.sh | 26 + .../internal_ci/linux/grpc_flaky_network.cfg | 2 +- .../linux/grpc_flaky_network_in_docker.sh | 8 +- 5 files changed, 542 insertions(+), 5 deletions(-) create mode 100644 test/cpp/end2end/flaky_network_test.cc create mode 100755 tools/internal_ci/linux/grpc_bazel_privileged_docker.sh diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index cbf09354a03..64b3eae60da 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -553,6 +553,25 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "flaky_network_test", + srcs = ["flaky_network_test.cc"], + external_deps = [ + "gtest", + ], + tags = ["manual"], + deps = [ + ":test_service_impl", + "//:gpr", + "//:grpc", + "//:grpc++", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) + grpc_cc_test( name = "shutdown_test", srcs = ["shutdown_test.cc"], diff --git a/test/cpp/end2end/flaky_network_test.cc b/test/cpp/end2end/flaky_network_test.cc new file mode 100644 index 00000000000..20c8fb59fa2 --- /dev/null +++ b/test/cpp/end2end/flaky_network_test.cc @@ -0,0 +1,492 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/gpr/env.h" + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" + +#include + +#ifdef GPR_LINUX +using grpc::testing::EchoRequest; +using grpc::testing::EchoResponse; + +namespace grpc { +namespace testing { +namespace { + +class FlakyNetworkTest : public ::testing::Test { + protected: + FlakyNetworkTest() + : server_host_("grpctest"), + interface_("lo:1"), + ipv4_address_("10.0.0.1"), + netmask_("/32"), + kRequestMessage_("🖖") {} + + void InterfaceUp() { + std::ostringstream cmd; + // create interface_ with address ipv4_address_ + cmd << "ip addr add " << ipv4_address_ << netmask_ << " dev " << interface_; + std::system(cmd.str().c_str()); + } + + void InterfaceDown() { + std::ostringstream cmd; + // remove interface_ + cmd << "ip addr del " << ipv4_address_ << netmask_ << " dev " << interface_; + std::system(cmd.str().c_str()); + } + + void DNSUp() { + std::ostringstream cmd; + // Add DNS entry for server_host_ in /etc/hosts + cmd << "echo '" << ipv4_address_ << " " << server_host_ + << "' >> /etc/hosts"; + std::system(cmd.str().c_str()); + } + + void DNSDown() { + std::ostringstream cmd; + // Remove DNS entry for server_host_ from /etc/hosts + // NOTE: we can't do this in one step with sed -i because when we are + // running under docker, the file is mounted by docker so we can't change + // its inode from within the container (sed -i creates a new file and + // replaces the old file, which changes the inode) + cmd << "sed '/" << server_host_ << "/d' /etc/hosts > /etc/hosts.orig"; + std::system(cmd.str().c_str()); + + // clear the stream + cmd.str(""); + + cmd << "cat /etc/hosts.orig > /etc/hosts"; + std::system(cmd.str().c_str()); + } + + void DropPackets() { + std::ostringstream cmd; + // drop packets with src IP = ipv4_address_ + cmd << "iptables -A INPUT -s " << ipv4_address_ << " -j DROP"; + + std::system(cmd.str().c_str()); + // clear the stream + cmd.str(""); + + // drop packets with dst IP = ipv4_address_ + cmd << "iptables -A INPUT -d " << ipv4_address_ << " -j DROP"; + } + + void RestoreNetwork() { + std::ostringstream cmd; + // remove iptables rule to drop packets with src IP = ipv4_address_ + cmd << "iptables -D INPUT -s " << ipv4_address_ << " -j DROP"; + std::system(cmd.str().c_str()); + // clear the stream + cmd.str(""); + // remove iptables rule to drop packets with dest IP = ipv4_address_ + cmd << "iptables -D INPUT -d " << ipv4_address_ << " -j DROP"; + } + + void FlakeNetwork() { + std::ostringstream cmd; + // Emulate a flaky network connection over interface_. Add a delay of 100ms + // +/- 590ms, 3% packet loss, 1% duplicates and 0.1% corrupt packets. + cmd << "tc qdisc replace dev " << interface_ + << " root netem delay 100ms 50ms distribution normal loss 3% duplicate " + "1% corrupt 0.1% "; + std::system(cmd.str().c_str()); + } + + void UnflakeNetwork() { + // Remove simulated network flake on interface_ + std::ostringstream cmd; + cmd << "tc qdisc del dev " << interface_ << " root netem"; + std::system(cmd.str().c_str()); + } + + void NetworkUp() { + InterfaceUp(); + DNSUp(); + } + + void NetworkDown() { + InterfaceDown(); + DNSDown(); + } + + void SetUp() override { + NetworkUp(); + grpc_init(); + StartServer(); + } + + void TearDown() override { + NetworkDown(); + StopServer(); + grpc_shutdown(); + } + + void StartServer() { + // TODO (pjaikumar): Ideally, we should allocate the port dynamically using + // grpc_pick_unused_port_or_die(). That doesn't work inside some docker + // containers because port_server listens on localhost which maps to + // ip6-looopback, but ipv6 support is not enabled by default in docker. + port_ = SERVER_PORT; + + server_.reset(new ServerData(port_)); + server_->Start(server_host_); + } + void StopServer() { server_->Shutdown(); } + + std::unique_ptr BuildStub( + const std::shared_ptr& channel) { + return grpc::testing::EchoTestService::NewStub(channel); + } + + std::shared_ptr BuildChannel( + const grpc::string& lb_policy_name, + ChannelArguments args = ChannelArguments()) { + if (lb_policy_name.size() > 0) { + args.SetLoadBalancingPolicyName(lb_policy_name); + } // else, default to pick first + std::ostringstream server_address; + server_address << server_host_ << ":" << port_; + return CreateCustomChannel(server_address.str(), + InsecureChannelCredentials(), args); + } + + bool SendRpc( + const std::unique_ptr& stub, + int timeout_ms = 0, bool wait_for_ready = false) { + auto response = std::unique_ptr(new EchoResponse()); + EchoRequest request; + request.set_message(kRequestMessage_); + ClientContext context; + if (timeout_ms > 0) { + context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); + } + // See https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md for + // details of wait-for-ready semantics + if (wait_for_ready) { + context.set_wait_for_ready(true); + } + Status status = stub->Echo(&context, request, response.get()); + auto ok = status.ok(); + if (ok) { + gpr_log(GPR_DEBUG, "RPC returned %s\n", response->message().c_str()); + } else { + gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str()); + } + return ok; + } + + struct ServerData { + int port_; + std::unique_ptr server_; + TestServiceImpl service_; + std::unique_ptr thread_; + bool server_ready_ = false; + + explicit ServerData(int port) { port_ = port; } + + void Start(const grpc::string& server_host) { + gpr_log(GPR_INFO, "starting server on port %d", port_); + std::mutex mu; + std::unique_lock lock(mu); + std::condition_variable cond; + thread_.reset(new std::thread( + std::bind(&ServerData::Serve, this, server_host, &mu, &cond))); + cond.wait(lock, [this] { return server_ready_; }); + server_ready_ = false; + gpr_log(GPR_INFO, "server startup complete"); + } + + void Serve(const grpc::string& server_host, std::mutex* mu, + std::condition_variable* cond) { + std::ostringstream server_address; + server_address << server_host << ":" << port_; + ServerBuilder builder; + builder.AddListeningPort(server_address.str(), + InsecureServerCredentials()); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + std::lock_guard lock(*mu); + server_ready_ = true; + cond->notify_one(); + } + + void Shutdown() { + server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); + thread_->join(); + } + }; + + bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { + const gpr_timespec deadline = + grpc_timeout_seconds_to_deadline(timeout_seconds); + grpc_connectivity_state state; + while ((state = channel->GetState(false /* try_to_connect */)) == + GRPC_CHANNEL_READY) { + if (!channel->WaitForStateChange(state, deadline)) return false; + } + return true; + } + + bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) { + const gpr_timespec deadline = + grpc_timeout_seconds_to_deadline(timeout_seconds); + grpc_connectivity_state state; + while ((state = channel->GetState(true /* try_to_connect */)) != + GRPC_CHANNEL_READY) { + if (!channel->WaitForStateChange(state, deadline)) return false; + } + return true; + } + + private: + const grpc::string server_host_; + const grpc::string interface_; + const grpc::string ipv4_address_; + const grpc::string netmask_; + std::unique_ptr stub_; + std::unique_ptr server_; + const int SERVER_PORT = 32750; + int port_; + const grpc::string kRequestMessage_; +}; + +// Network interface connected to server flaps +TEST_F(FlakyNetworkTest, NetworkTransition) { + const int kKeepAliveTimeMs = 1000; + const int kKeepAliveTimeoutMs = 1000; + ChannelArguments args; + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKeepAliveTimeMs); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, kKeepAliveTimeoutMs); + args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + + auto channel = BuildChannel("pick_first", args); + auto stub = BuildStub(channel); + // Channel should be in READY state after we send an RPC + EXPECT_TRUE(SendRpc(stub)); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + + std::atomic_bool shutdown{false}; + std::thread sender = std::thread([this, &stub, &shutdown]() { + while (true) { + if (shutdown.load()) { + return; + } + SendRpc(stub); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + }); + + // bring down network + NetworkDown(); + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + // bring network interface back up + InterfaceUp(); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + // Restore DNS entry for server + DNSUp(); + EXPECT_TRUE(WaitForChannelReady(channel.get())); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + shutdown.store(true); + sender.join(); +} + +// Traffic to server server is blackholed temporarily with keepalives enabled +TEST_F(FlakyNetworkTest, ServerUnreachableWithKeepalive) { + const int kKeepAliveTimeMs = 1000; + const int kKeepAliveTimeoutMs = 1000; + ChannelArguments args; + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKeepAliveTimeMs); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, kKeepAliveTimeoutMs); + args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + + auto channel = BuildChannel("pick_first", args); + auto stub = BuildStub(channel); + // Channel should be in READY state after we send an RPC + EXPECT_TRUE(SendRpc(stub)); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + + std::atomic_bool shutdown{false}; + std::thread sender = std::thread([this, &stub, &shutdown]() { + while (true) { + if (shutdown.load()) { + return; + } + SendRpc(stub); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + }); + + // break network connectivity + DropPackets(); + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + // bring network interface back up + RestoreNetwork(); + EXPECT_TRUE(WaitForChannelReady(channel.get())); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + shutdown.store(true); + sender.join(); +} + +// +// Traffic to server server is blackholed temporarily with keepalives disabled +TEST_F(FlakyNetworkTest, ServerUnreachableNoKeepalive) { + auto channel = BuildChannel("pick_first", ChannelArguments()); + auto stub = BuildStub(channel); + // Channel should be in READY state after we send an RPC + EXPECT_TRUE(SendRpc(stub)); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + + // break network connectivity + DropPackets(); + + std::thread sender = std::thread([this, &stub]() { + // RPC with deadline should timeout + EXPECT_FALSE(SendRpc(stub, /*timeout_ms=*/500, /*wait_for_ready=*/true)); + // RPC without deadline forever until call finishes + EXPECT_TRUE(SendRpc(stub, /*timeout_ms=*/0, /*wait_for_ready=*/true)); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + // bring network interface back up + RestoreNetwork(); + + // wait for RPC to finish + sender.join(); +} + +// Send RPCs over a flaky network connection +TEST_F(FlakyNetworkTest, FlakyNetwork) { + const int kKeepAliveTimeMs = 1000; + const int kKeepAliveTimeoutMs = 1000; + const int kMessageCount = 100; + ChannelArguments args; + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKeepAliveTimeMs); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, kKeepAliveTimeoutMs); + args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + + auto channel = BuildChannel("pick_first", args); + auto stub = BuildStub(channel); + // Channel should be in READY state after we send an RPC + EXPECT_TRUE(SendRpc(stub)); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + + // simulate flaky network (packet loss, corruption and delays) + FlakeNetwork(); + for (int i = 0; i < kMessageCount; ++i) { + EXPECT_TRUE(SendRpc(stub)); + } + // remove network flakiness + UnflakeNetwork(); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); +} + +// Server is shutdown gracefully and restarted. Client keepalives are enabled +TEST_F(FlakyNetworkTest, ServerRestartKeepaliveEnabled) { + const int kKeepAliveTimeMs = 1000; + const int kKeepAliveTimeoutMs = 1000; + ChannelArguments args; + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKeepAliveTimeMs); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, kKeepAliveTimeoutMs); + args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + + auto channel = BuildChannel("pick_first", args); + auto stub = BuildStub(channel); + // Channel should be in READY state after we send an RPC + EXPECT_TRUE(SendRpc(stub)); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + + // server goes down, client should detect server going down and calls should + // fail + StopServer(); + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + EXPECT_FALSE(SendRpc(stub)); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + // server restarts, calls succeed + StartServer(); + EXPECT_TRUE(WaitForChannelReady(channel.get())); + // EXPECT_TRUE(SendRpc(stub)); +} + +// Server is shutdown gracefully and restarted. Client keepalives are enabled +TEST_F(FlakyNetworkTest, ServerRestartKeepaliveDisabled) { + auto channel = BuildChannel("pick_first", ChannelArguments()); + auto stub = BuildStub(channel); + // Channel should be in READY state after we send an RPC + EXPECT_TRUE(SendRpc(stub)); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + + // server sends GOAWAY when it's shutdown, so client attempts to reconnect + StopServer(); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + // server restarts, calls succeed + StartServer(); + EXPECT_TRUE(WaitForChannelReady(channel.get())); +} + +} // namespace +} // namespace testing +} // namespace grpc +#endif // GPR_LINUX + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_test_init(argc, argv); + auto result = RUN_ALL_TESTS(); + return result; +} diff --git a/tools/internal_ci/linux/grpc_bazel_privileged_docker.sh b/tools/internal_ci/linux/grpc_bazel_privileged_docker.sh new file mode 100755 index 00000000000..ae1056d7c3d --- /dev/null +++ b/tools/internal_ci/linux/grpc_bazel_privileged_docker.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex + +# change to grpc repo root +cd $(dirname $0)/../../.. + +source tools/internal_ci/helper_scripts/prepare_build_linux_rc + +export DOCKERFILE_DIR=tools/dockerfile/test/bazel +export DOCKER_RUN_SCRIPT=$BAZEL_SCRIPT +# NET_ADMIN capability allows tests to manipulate network interfaces +exec tools/run_tests/dockerize/build_and_run_docker.sh --cap-add NET_ADMIN diff --git a/tools/internal_ci/linux/grpc_flaky_network.cfg b/tools/internal_ci/linux/grpc_flaky_network.cfg index de7a3b9cd8f..07bedd79f94 100644 --- a/tools/internal_ci/linux/grpc_flaky_network.cfg +++ b/tools/internal_ci/linux/grpc_flaky_network.cfg @@ -15,7 +15,7 @@ # Config file for the internal CI (in protobuf text format) # Location of the continuous shell script in repository. -build_file: "grpc/tools/internal_ci/linux/grpc_bazel.sh" +build_file: "grpc/tools/internal_ci/linux/grpc_bazel_privileged_docker.sh" timeout_mins: 240 env_vars { key: "BAZEL_SCRIPT" diff --git a/tools/internal_ci/linux/grpc_flaky_network_in_docker.sh b/tools/internal_ci/linux/grpc_flaky_network_in_docker.sh index 42b6d44c1cb..60bb49b639a 100755 --- a/tools/internal_ci/linux/grpc_flaky_network_in_docker.sh +++ b/tools/internal_ci/linux/grpc_flaky_network_in_docker.sh @@ -23,9 +23,9 @@ git clone /var/local/jenkins/grpc /var/local/git/grpc (cd /var/local/jenkins/grpc/ && git submodule foreach 'cd /var/local/git/grpc \ && git submodule update --init --reference /var/local/jenkins/grpc/${name} \ ${name}') -cd /var/local/git/grpc +cd /var/local/git/grpc/test/cpp/end2end -# TODO(jtattermusch): install prerequsites if needed +# iptables is used to drop traffic between client and server +apt-get install -y iptables -# TODO(jtattermusch): run the flaky network test instead -bazel build --spawn_strategy=standalone --genrule_strategy=standalone :all test/... examples/... +bazel test --spawn_strategy=standalone --genrule_strategy=standalone --test_output=all :flaky_network_test