Re-add cfstream_test

2nd attempt at adding cfstream_test after fixing internal build failures caused by first attempt.
pull/18082/head
Prashant Jaikumar 6 years ago
parent 3dacd1afc4
commit 6177befe94
  1. 7
      BUILD
  2. 23
      bazel/grpc_build_system.bzl
  3. 21
      test/cpp/end2end/BUILD
  4. 278
      test/cpp/end2end/cfstream_test.cc
  5. 19
      tools/internal_ci/macos/grpc_cfstream.cfg
  6. 29
      tools/internal_ci/macos/grpc_run_bazel_tests.sh

@ -63,6 +63,11 @@ config_setting(
values = {"cpu": "x64_windows_msvc"},
)
config_setting(
name = "mac_x86_64",
values = {"cpu": "darwin"},
)
# This should be updated along with build.yaml
g_stands_for = "godric"
@ -981,6 +986,7 @@ grpc_cc_library(
],
language = "c++",
public_hdrs = GRPC_PUBLIC_HDRS,
use_cfstream = True,
deps = [
"gpr_base",
"grpc_codegen",
@ -1044,6 +1050,7 @@ grpc_cc_library(
"src/core/lib/iomgr/endpoint_cfstream.h",
"src/core/lib/iomgr/error_cfstream.h",
],
use_cfstream = True,
deps = [
":gpr_base",
":grpc_base",

@ -35,6 +35,12 @@ def if_not_windows(a):
"//conditions:default": a,
})
def if_mac(a):
return select({
"//:mac_x86_64": a,
"//conditions:default": [],
})
def _get_external_deps(external_deps):
ret = []
for dep in external_deps:
@ -73,10 +79,16 @@ def grpc_cc_library(
testonly = False,
visibility = None,
alwayslink = 0,
data = []):
data = [],
use_cfstream = False):
copts = []
if use_cfstream:
copts = if_mac(["-DGRPC_CFSTREAM"])
if language.upper() == "C":
copts = if_not_windows(["-std=c99"])
copts = copts + if_not_windows(["-std=c99"])
linkopts = if_not_windows(["-pthread"])
if use_cfstream:
linkopts = linkopts + if_mac(["-framework CoreFoundation"])
native.cc_library(
name = name,
srcs = srcs,
@ -98,7 +110,7 @@ def grpc_cc_library(
copts = copts,
visibility = visibility,
testonly = testonly,
linkopts = if_not_windows(["-pthread"]),
linkopts = linkopts,
includes = [
"include",
],
@ -113,7 +125,6 @@ def grpc_proto_plugin(name, srcs = [], deps = []):
deps = deps,
)
def grpc_proto_library(
name,
srcs = [],
@ -133,9 +144,9 @@ def grpc_proto_library(
)
def grpc_cc_test(name, srcs = [], deps = [], external_deps = [], args = [], data = [], uses_polling = True, language = "C++", size = "medium", timeout = None, tags = [], exec_compatible_with = []):
copts = []
copts = if_mac(["-DGRPC_CFSTREAM"])
if language.upper() == "C":
copts = if_not_windows(["-std=c99"])
copts = copts + if_not_windows(["-std=c99"])
args = {
"name": name,
"srcs": srcs,

@ -606,3 +606,24 @@ grpc_cc_test(
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "cfstream_test",
srcs = ["cfstream_test.cc"],
external_deps = [
"gtest",
],
tags = ["manual"], # test requires root, won't work with bazel RBE
deps = [
":test_service_impl",
"//:gpr",
"//:grpc",
"//:grpc++",
"//:grpc_cfstream",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing:simple_messages_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)

@ -0,0 +1,278 @@
/*
*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/iomgr/port.h"
#include <algorithm>
#include <memory>
#include <mutex>
#include <random>
#include <thread>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <gtest/gtest.h>
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/gpr/env.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
#ifdef GRPC_CFSTREAM
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using std::chrono::system_clock;
namespace grpc {
namespace testing {
namespace {
class CFStreamTest : public ::testing::Test {
protected:
CFStreamTest()
: server_host_("grpctest"),
interface_("lo0"),
ipv4_address_("10.0.0.1"),
netmask_("/32"),
kRequestMessage_("🖖") {}
void DNSUp() {
std::ostringstream cmd;
// Add DNS entry for server_host_ in /etc/hosts
cmd << "echo '" << ipv4_address_ << " " << server_host_
<< " ' | sudo tee -a /etc/hosts";
std::system(cmd.str().c_str());
}
void DNSDown() {
std::ostringstream cmd;
// Remove DNS entry for server_host_ in /etc/hosts
cmd << "sudo sed -i '.bak' '/" << server_host_ << "/d' /etc/hosts";
std::system(cmd.str().c_str());
}
void InterfaceUp() {
std::ostringstream cmd;
cmd << "sudo /sbin/ifconfig " << interface_ << " alias " << ipv4_address_;
std::system(cmd.str().c_str());
}
void InterfaceDown() {
std::ostringstream cmd;
cmd << "sudo /sbin/ifconfig " << interface_ << " -alias " << ipv4_address_;
std::system(cmd.str().c_str());
}
void NetworkUp() {
InterfaceUp();
DNSUp();
}
void NetworkDown() {
InterfaceDown();
DNSDown();
}
void SetUp() override {
NetworkUp();
grpc_init();
StartServer();
}
void TearDown() override {
NetworkDown();
StopServer();
grpc_shutdown();
}
void StartServer() {
port_ = grpc_pick_unused_port_or_die();
server_.reset(new ServerData(port_));
server_->Start(server_host_);
}
void StopServer() { server_->Shutdown(); }
std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
const std::shared_ptr<Channel>& channel) {
return grpc::testing::EchoTestService::NewStub(channel);
}
std::shared_ptr<Channel> BuildChannel() {
std::ostringstream server_address;
server_address << server_host_ << ":" << port_;
return CreateCustomChannel(
server_address.str(), InsecureChannelCredentials(), ChannelArguments());
}
void SendRpc(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
bool expect_success = false) {
auto response = std::unique_ptr<EchoResponse>(new EchoResponse());
EchoRequest request;
request.set_message(kRequestMessage_);
ClientContext context;
Status status = stub->Echo(&context, request, response.get());
if (status.ok()) {
gpr_log(GPR_DEBUG, "RPC returned %s\n", response->message().c_str());
} else {
gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str());
}
if (expect_success) {
EXPECT_TRUE(status.ok());
}
}
bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
const gpr_timespec deadline =
grpc_timeout_seconds_to_deadline(timeout_seconds);
grpc_connectivity_state state;
while ((state = channel->GetState(false /* try_to_connect */)) ==
GRPC_CHANNEL_READY) {
if (!channel->WaitForStateChange(state, deadline)) return false;
}
return true;
}
bool WaitForChannelReady(Channel* channel, int timeout_seconds = 10) {
const gpr_timespec deadline =
grpc_timeout_seconds_to_deadline(timeout_seconds);
grpc_connectivity_state state;
while ((state = channel->GetState(true /* try_to_connect */)) !=
GRPC_CHANNEL_READY) {
if (!channel->WaitForStateChange(state, deadline)) return false;
}
return true;
}
private:
struct ServerData {
int port_;
std::unique_ptr<Server> server_;
TestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
explicit ServerData(int port) { port_ = port; }
void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
std::mutex mu;
std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
thread_.reset(new std::thread(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
cond.wait(lock, [this] { return server_ready_; });
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
void Serve(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
builder.AddListeningPort(server_address.str(),
InsecureServerCredentials());
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
std::lock_guard<std::mutex> lock(*mu);
server_ready_ = true;
cond->notify_one();
}
void Shutdown(bool join = true) {
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
if (join) thread_->join();
}
};
const grpc::string server_host_;
const grpc::string interface_;
const grpc::string ipv4_address_;
const grpc::string netmask_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<ServerData> server_;
int port_;
const grpc::string kRequestMessage_;
};
// gRPC should automatically detech network flaps (without enabling keepalives)
// when CFStream is enabled
TEST_F(CFStreamTest, NetworkTransition) {
auto channel = BuildChannel();
auto stub = BuildStub(channel);
// Channel should be in READY state after we send an RPC
SendRpc(stub, /*expect_success=*/true);
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
std::atomic_bool shutdown{false};
std::thread sender = std::thread([this, &stub, &shutdown]() {
while (true) {
if (shutdown.load()) {
return;
}
SendRpc(stub);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
// bring down network
NetworkDown();
// network going down should be detected by cfstream
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
// bring network interface back up
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
NetworkUp();
// channel should reconnect
EXPECT_TRUE(WaitForChannelReady(channel.get()));
EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
shutdown.store(true);
sender.join();
}
} // namespace
} // namespace testing
} // namespace grpc
#endif // GRPC_CFSTREAM
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_test_init(argc, argv);
gpr_setenv("grpc_cfstream", "1");
// TODO (pjaikumar): remove the line below when
// https://github.com/grpc/grpc/issues/18080 has been fixed.
gpr_setenv("GRPC_DNS_RESOLVER", "native");
const auto result = RUN_ALL_TESTS();
return result;
}

@ -0,0 +1,19 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/macos/grpc_run_bazel_tests.sh"

@ -0,0 +1,29 @@
#!/usr/bin/env bash
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
# change to grpc repo root
cd $(dirname $0)/../../..
./tools/run_tests/start_port_server.py
# run cfstream_test separately because it messes with the network
bazel test --spawn_strategy=standalone --genrule_strategy=standalone --test_output=all //test/cpp/end2end:cfstream_test
# kill port_server.py to prevent the build from hanging
ps aux | grep port_server\\.py | awk '{print $2}' | xargs kill -9
Loading…
Cancel
Save