Merge pull request #17872 from rmstar/cfstream_test

Add test for network transitions when CFStream is enabled
pull/18042/head
rmstar 6 years ago committed by GitHub
commit 52360b8fa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      BUILD
  2. 16
      bazel/grpc_build_system.bzl
  3. 4
      src/core/lib/iomgr/endpoint_cfstream.cc
  4. 32
      test/cpp/end2end/BUILD
  5. 275
      test/cpp/end2end/cfstream_test.cc
  6. 18
      tools/internal_ci/macos/grpc_cfstream.cfg
  7. 28
      tools/internal_ci/macos/grpc_run_bazel_tests.sh

18
BUILD

@ -63,6 +63,21 @@ config_setting(
values = {"cpu": "x64_windows_msvc"}, values = {"cpu": "x64_windows_msvc"},
) )
config_setting(
name = "mac_x86_64",
values = {"cpu": "darwin"},
)
COPTS = select({
":mac_x86_64": ["-DGRPC_CFSTREAM"],
"//conditions:default": [],
})
LINK_OPTS = select({
":mac_x86_64": ["-framework CoreFoundation"],
"//conditions:default": [],
})
# This should be updated along with build.yaml # This should be updated along with build.yaml
g_stands_for = "gold" g_stands_for = "gold"
@ -980,6 +995,7 @@ grpc_cc_library(
"zlib", "zlib",
], ],
language = "c++", language = "c++",
copts = COPTS,
public_hdrs = GRPC_PUBLIC_HDRS, public_hdrs = GRPC_PUBLIC_HDRS,
deps = [ deps = [
"gpr_base", "gpr_base",
@ -1039,6 +1055,8 @@ grpc_cc_library(
"src/core/lib/iomgr/iomgr_posix_cfstream.cc", "src/core/lib/iomgr/iomgr_posix_cfstream.cc",
"src/core/lib/iomgr/tcp_client_cfstream.cc", "src/core/lib/iomgr/tcp_client_cfstream.cc",
], ],
copts = COPTS,
linkopts = LINK_OPTS,
hdrs = [ hdrs = [
"src/core/lib/iomgr/cfstream_handle.h", "src/core/lib/iomgr/cfstream_handle.h",
"src/core/lib/iomgr/endpoint_cfstream.h", "src/core/lib/iomgr/endpoint_cfstream.h",

@ -73,10 +73,11 @@ def grpc_cc_library(
testonly = False, testonly = False,
visibility = None, visibility = None,
alwayslink = 0, alwayslink = 0,
data = []): data = [],
copts = [] copts = [],
linkopts = []):
if language.upper() == "C": if language.upper() == "C":
copts = if_not_windows(["-std=c99"]) copts = copts + if_not_windows(["-std=c99"])
native.cc_library( native.cc_library(
name = name, name = name,
srcs = srcs, srcs = srcs,
@ -98,7 +99,7 @@ def grpc_cc_library(
copts = copts, copts = copts,
visibility = visibility, visibility = visibility,
testonly = testonly, testonly = testonly,
linkopts = if_not_windows(["-pthread"]), linkopts = linkopts + if_not_windows(["-pthread"]),
includes = [ includes = [
"include", "include",
], ],
@ -132,10 +133,9 @@ def grpc_proto_library(
generate_mocks = generate_mocks, generate_mocks = generate_mocks,
) )
def grpc_cc_test(name, srcs = [], deps = [], external_deps = [], args = [], data = [], uses_polling = True, language = "C++", size = "medium", timeout = None, tags = [], exec_compatible_with = []): def grpc_cc_test(name, srcs = [], deps = [], external_deps = [], args = [], data = [], uses_polling = True, language = "C++", size = "medium", timeout = None, tags = [], exec_compatible_with = [], copts = [], linkopts = []):
copts = []
if language.upper() == "C": if language.upper() == "C":
copts = if_not_windows(["-std=c99"]) copts = copts + if_not_windows(["-std=c99"])
args = { args = {
"name": name, "name": name,
"srcs": srcs, "srcs": srcs,
@ -143,7 +143,7 @@ def grpc_cc_test(name, srcs = [], deps = [], external_deps = [], args = [], data
"data": data, "data": data,
"deps": deps + _get_external_deps(external_deps), "deps": deps + _get_external_deps(external_deps),
"copts": copts, "copts": copts,
"linkopts": if_not_windows(["-pthread"]), "linkopts": linkopts + if_not_windows(["-pthread"]),
"size": size, "size": size,
"timeout": timeout, "timeout": timeout,
"exec_compatible_with": exec_compatible_with, "exec_compatible_with": exec_compatible_with,

@ -182,7 +182,7 @@ static void ReadAction(void* arg, grpc_error* error) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep)); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep));
EP_UNREF(ep, "read"); EP_UNREF(ep, "read");
} else { } else {
if (read_size < len) { if (read_size < static_cast<CFIndex>(len)) {
grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr); grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
} }
CallReadCb(ep, GRPC_ERROR_NONE); CallReadCb(ep, GRPC_ERROR_NONE);
@ -217,7 +217,7 @@ static void WriteAction(void* arg, grpc_error* error) {
CallWriteCb(ep, error); CallWriteCb(ep, error);
EP_UNREF(ep, "write"); EP_UNREF(ep, "write");
} else { } else {
if (write_size < GRPC_SLICE_LENGTH(slice)) { if (write_size < static_cast<CFIndex>(GRPC_SLICE_LENGTH(slice))) {
grpc_slice_buffer_undo_take_first( grpc_slice_buffer_undo_take_first(
ep->write_slices, grpc_slice_sub(slice, write_size, slice_len)); ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
} }

@ -16,6 +16,16 @@ licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package") load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package")
config_setting(
name = "mac_x86_64",
values = {"cpu": "darwin"},
)
COPTS = select({
":mac_x86_64": ["-DGRPC_CFSTREAM"],
"//conditions:default": [],
})
grpc_package( grpc_package(
name = "test/cpp/end2end", name = "test/cpp/end2end",
visibility = "public", visibility = "public",
@ -606,3 +616,25 @@ grpc_cc_test(
"//test/cpp/util:test_util", "//test/cpp/util:test_util",
], ],
) )
grpc_cc_test(
name = "cfstream_test",
srcs = ["cfstream_test.cc"],
external_deps = [
"gtest",
],
tags = ["manual"], # test requires root, won't work with bazel RBE
copts = COPTS,
deps = [
":test_service_impl",
"//:gpr",
"//:grpc",
"//:grpc++",
"//:grpc_cfstream",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing:simple_messages_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)

@ -0,0 +1,275 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/iomgr/port.h"
#include <algorithm>
#include <memory>
#include <mutex>
#include <random>
#include <thread>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <gtest/gtest.h>
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/gpr/env.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
#ifdef GRPC_CFSTREAM
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using std::chrono::system_clock;
namespace grpc {
namespace testing {
namespace {
class CFStreamTest : public ::testing::Test {
protected:
CFStreamTest()
: server_host_("grpctest"),
interface_("lo0"),
ipv4_address_("10.0.0.1"),
netmask_("/32"),
kRequestMessage_("🖖") {}
void DNSUp() {
std::ostringstream cmd;
// Add DNS entry for server_host_ in /etc/hosts
cmd << "echo '" << ipv4_address_ << " " << server_host_
<< " ' | sudo tee -a /etc/hosts";
std::system(cmd.str().c_str());
}
void DNSDown() {
std::ostringstream cmd;
// Remove DNS entry for server_host_ in /etc/hosts
cmd << "sudo sed -i '.bak' '/" << server_host_ << "/d' /etc/hosts";
std::system(cmd.str().c_str());
}
void InterfaceUp() {
std::ostringstream cmd;
cmd << "sudo /sbin/ifconfig " << interface_ << " alias " << ipv4_address_;
std::system(cmd.str().c_str());
}
void InterfaceDown() {
std::ostringstream cmd;
cmd << "sudo /sbin/ifconfig " << interface_ << " -alias " << ipv4_address_;
std::system(cmd.str().c_str());
}
void NetworkUp() {
InterfaceUp();
DNSUp();
}
void NetworkDown() {
InterfaceDown();
DNSDown();
}
void SetUp() override {
NetworkUp();
grpc_init();
StartServer();
}
void TearDown() override {
NetworkDown();
StopServer();
grpc_shutdown();
}
void StartServer() {
port_ = grpc_pick_unused_port_or_die();
server_.reset(new ServerData(port_));
server_->Start(server_host_);
}
void StopServer() { server_->Shutdown(); }
std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
const std::shared_ptr<Channel>& channel) {
return grpc::testing::EchoTestService::NewStub(channel);
}
std::shared_ptr<Channel> BuildChannel() {
std::ostringstream server_address;
server_address << server_host_ << ":" << port_;
return CreateCustomChannel(
server_address.str(), InsecureChannelCredentials(), ChannelArguments());
}
void SendRpc(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
bool expect_success = false) {
auto response = std::unique_ptr<EchoResponse>(new EchoResponse());
EchoRequest request;
request.set_message(kRequestMessage_);
ClientContext context;
Status status = stub->Echo(&context, request, response.get());
if (status.ok()) {
gpr_log(GPR_DEBUG, "RPC returned %s\n", response->message().c_str());
} else {
gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str());
}
if (expect_success) {
EXPECT_TRUE(status.ok());
}
}
bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
const gpr_timespec deadline =
grpc_timeout_seconds_to_deadline(timeout_seconds);
grpc_connectivity_state state;
while ((state = channel->GetState(false /* try_to_connect */)) ==
GRPC_CHANNEL_READY) {
if (!channel->WaitForStateChange(state, deadline)) return false;
}
return true;
}
bool WaitForChannelReady(Channel* channel, int timeout_seconds = 10) {
const gpr_timespec deadline =
grpc_timeout_seconds_to_deadline(timeout_seconds);
grpc_connectivity_state state;
while ((state = channel->GetState(true /* try_to_connect */)) !=
GRPC_CHANNEL_READY) {
if (!channel->WaitForStateChange(state, deadline)) return false;
}
return true;
}
private:
struct ServerData {
int port_;
std::unique_ptr<Server> server_;
TestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
explicit ServerData(int port) { port_ = port; }
void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
std::mutex mu;
std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
thread_.reset(new std::thread(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
cond.wait(lock, [this] { return server_ready_; });
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
void Serve(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
builder.AddListeningPort(server_address.str(),
InsecureServerCredentials());
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
std::lock_guard<std::mutex> lock(*mu);
server_ready_ = true;
cond->notify_one();
}
void Shutdown(bool join = true) {
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
if (join) thread_->join();
}
};
const grpc::string server_host_;
const grpc::string interface_;
const grpc::string ipv4_address_;
const grpc::string netmask_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<ServerData> server_;
int port_;
const grpc::string kRequestMessage_;
};
// gRPC should automatically detech network flaps (without enabling keepalives)
// when CFStream is enabled
TEST_F(CFStreamTest, NetworkTransition) {
auto channel = BuildChannel();
auto stub = BuildStub(channel);
// Channel should be in READY state after we send an RPC
SendRpc(stub, /*expect_success=*/true);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
std::atomic_bool shutdown{false};
std::thread sender = std::thread([this, &stub, &shutdown]() {
while (true) {
if (shutdown.load()) {
return;
}
SendRpc(stub);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
// bring down network
NetworkDown();
// network going down should be detected by cfstream
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
// bring network interface back up
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
NetworkUp();
// channel should reconnect
EXPECT_TRUE(WaitForChannelReady(channel.get()));
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
shutdown.store(true);
sender.join();
}
} // namespace
} // namespace testing
} // namespace grpc
#endif // GRPC_CFSTREAM
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_test_init(argc, argv);
gpr_setenv("grpc_cfstream", "1");
const auto result = RUN_ALL_TESTS();
return result;
}

@ -0,0 +1,18 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/macos/grpc_run_bazel_tests.sh"

@ -0,0 +1,28 @@
#!/usr/bin/env bash
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
# change to grpc repo root
cd $(dirname $0)/../../..
./tools/run_tests/start_port_server.py
# run cfstream_test separately because it messes with the network
bazel test --spawn_strategy=standalone --genrule_strategy=standalone --test_output=all //test/cpp/end2end:cfstream_test
# kill port_server.py to prevent the build from hanging
ps aux | grep port_server\\.py | awk '{print $2}' | xargs kill -9
Loading…
Cancel
Save