From 6d14987f09a494dc57416046c6df1aa3bab39a4c Mon Sep 17 00:00:00 2001 From: Prashant Jaikumar Date: Tue, 29 Jan 2019 18:00:32 -0800 Subject: [PATCH 1/2] Fixed cast in endpoint_cfstream.cc --- src/core/lib/iomgr/endpoint_cfstream.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index 7c4bc1ace2a..25146e7861c 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -182,7 +182,7 @@ static void ReadAction(void* arg, grpc_error* error) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep)); EP_UNREF(ep, "read"); } else { - if (read_size < len) { + if (read_size < static_cast(len)) { grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr); } CallReadCb(ep, GRPC_ERROR_NONE); @@ -217,7 +217,7 @@ static void WriteAction(void* arg, grpc_error* error) { CallWriteCb(ep, error); EP_UNREF(ep, "write"); } else { - if (write_size < GRPC_SLICE_LENGTH(slice)) { + if (write_size < static_cast(GRPC_SLICE_LENGTH(slice))) { grpc_slice_buffer_undo_take_first( ep->write_slices, grpc_slice_sub(slice, write_size, slice_len)); } From 2f0f522423c15394307e0abd066c7213eed23185 Mon Sep 17 00:00:00 2001 From: Prashant Jaikumar Date: Thu, 7 Feb 2019 17:40:47 -0800 Subject: [PATCH 2/2] Add end2end test for cfstream --- BUILD | 18 ++ bazel/grpc_build_system.bzl | 16 +- test/cpp/end2end/BUILD | 32 ++ test/cpp/end2end/cfstream_test.cc | 275 ++++++++++++++++++ tools/internal_ci/macos/grpc_cfstream.cfg | 18 ++ .../internal_ci/macos/grpc_run_bazel_tests.sh | 28 ++ 6 files changed, 379 insertions(+), 8 deletions(-) create mode 100644 test/cpp/end2end/cfstream_test.cc create mode 100644 tools/internal_ci/macos/grpc_cfstream.cfg create mode 100644 tools/internal_ci/macos/grpc_run_bazel_tests.sh diff --git a/BUILD b/BUILD index ebb03580bb4..11d40710f22 100644 --- a/BUILD +++ b/BUILD @@ -63,6 +63,21 @@ config_setting( values = {"cpu": "x64_windows_msvc"}, ) +config_setting( + name = "mac_x86_64", + values = {"cpu": "darwin"}, +) + +COPTS = select({ + ":mac_x86_64": ["-DGRPC_CFSTREAM"], + "//conditions:default": [], +}) + +LINK_OPTS = select({ + ":mac_x86_64": ["-framework CoreFoundation"], + "//conditions:default": [], +}) + # This should be updated along with build.yaml g_stands_for = "gold" @@ -980,6 +995,7 @@ grpc_cc_library( "zlib", ], language = "c++", + copts = COPTS, public_hdrs = GRPC_PUBLIC_HDRS, deps = [ "gpr_base", @@ -1039,6 +1055,8 @@ grpc_cc_library( "src/core/lib/iomgr/iomgr_posix_cfstream.cc", "src/core/lib/iomgr/tcp_client_cfstream.cc", ], + copts = COPTS, + linkopts = LINK_OPTS, hdrs = [ "src/core/lib/iomgr/cfstream_handle.h", "src/core/lib/iomgr/endpoint_cfstream.h", diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl index be85bc87324..5d5f75073af 100644 --- a/bazel/grpc_build_system.bzl +++ b/bazel/grpc_build_system.bzl @@ -73,10 +73,11 @@ def grpc_cc_library( testonly = False, visibility = None, alwayslink = 0, - data = []): - copts = [] + data = [], + copts = [], + linkopts = []): if language.upper() == "C": - copts = if_not_windows(["-std=c99"]) + copts = copts + if_not_windows(["-std=c99"]) native.cc_library( name = name, srcs = srcs, @@ -98,7 +99,7 @@ def grpc_cc_library( copts = copts, visibility = visibility, testonly = testonly, - linkopts = if_not_windows(["-pthread"]), + linkopts = linkopts + if_not_windows(["-pthread"]), includes = [ "include", ], @@ -132,10 +133,9 @@ def grpc_proto_library( generate_mocks = generate_mocks, ) -def grpc_cc_test(name, srcs = [], deps = [], external_deps = [], args = [], data = [], uses_polling = True, language = "C++", size = "medium", timeout = None, tags = [], exec_compatible_with = []): - copts = [] +def grpc_cc_test(name, srcs = [], deps = [], external_deps = [], args = [], data = [], uses_polling = True, language = "C++", size = "medium", timeout = None, tags = [], exec_compatible_with = [], copts = [], linkopts = []): if language.upper() == "C": - copts = if_not_windows(["-std=c99"]) + copts = copts + if_not_windows(["-std=c99"]) args = { "name": name, "srcs": srcs, @@ -143,7 +143,7 @@ def grpc_cc_test(name, srcs = [], deps = [], external_deps = [], args = [], data "data": data, "deps": deps + _get_external_deps(external_deps), "copts": copts, - "linkopts": if_not_windows(["-pthread"]), + "linkopts": linkopts + if_not_windows(["-pthread"]), "size": size, "timeout": timeout, "exec_compatible_with": exec_compatible_with, diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index cbf09354a03..b3ebefc68b9 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -16,6 +16,16 @@ licenses(["notice"]) # Apache v2 load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package") +config_setting( + name = "mac_x86_64", + values = {"cpu": "darwin"}, +) + +COPTS = select({ + ":mac_x86_64": ["-DGRPC_CFSTREAM"], + "//conditions:default": [], +}) + grpc_package( name = "test/cpp/end2end", visibility = "public", @@ -606,3 +616,25 @@ grpc_cc_test( "//test/cpp/util:test_util", ], ) + +grpc_cc_test( + name = "cfstream_test", + srcs = ["cfstream_test.cc"], + external_deps = [ + "gtest", + ], + tags = ["manual"], # test requires root, won't work with bazel RBE + copts = COPTS, + deps = [ + ":test_service_impl", + "//:gpr", + "//:grpc", + "//:grpc++", + "//:grpc_cfstream", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//src/proto/grpc/testing:simple_messages_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) diff --git a/test/cpp/end2end/cfstream_test.cc b/test/cpp/end2end/cfstream_test.cc new file mode 100644 index 00000000000..8d4cec55515 --- /dev/null +++ b/test/cpp/end2end/cfstream_test.cc @@ -0,0 +1,275 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/gpr/env.h" + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" + +#ifdef GRPC_CFSTREAM +using grpc::testing::EchoRequest; +using grpc::testing::EchoResponse; +using std::chrono::system_clock; + +namespace grpc { +namespace testing { +namespace { + +class CFStreamTest : public ::testing::Test { + protected: + CFStreamTest() + : server_host_("grpctest"), + interface_("lo0"), + ipv4_address_("10.0.0.1"), + netmask_("/32"), + kRequestMessage_("🖖") {} + + void DNSUp() { + std::ostringstream cmd; + // Add DNS entry for server_host_ in /etc/hosts + cmd << "echo '" << ipv4_address_ << " " << server_host_ + << " ' | sudo tee -a /etc/hosts"; + std::system(cmd.str().c_str()); + } + + void DNSDown() { + std::ostringstream cmd; + // Remove DNS entry for server_host_ in /etc/hosts + cmd << "sudo sed -i '.bak' '/" << server_host_ << "/d' /etc/hosts"; + std::system(cmd.str().c_str()); + } + + void InterfaceUp() { + std::ostringstream cmd; + cmd << "sudo /sbin/ifconfig " << interface_ << " alias " << ipv4_address_; + std::system(cmd.str().c_str()); + } + + void InterfaceDown() { + std::ostringstream cmd; + cmd << "sudo /sbin/ifconfig " << interface_ << " -alias " << ipv4_address_; + std::system(cmd.str().c_str()); + } + + void NetworkUp() { + InterfaceUp(); + DNSUp(); + } + + void NetworkDown() { + InterfaceDown(); + DNSDown(); + } + + void SetUp() override { + NetworkUp(); + grpc_init(); + StartServer(); + } + + void TearDown() override { + NetworkDown(); + StopServer(); + grpc_shutdown(); + } + + void StartServer() { + port_ = grpc_pick_unused_port_or_die(); + server_.reset(new ServerData(port_)); + server_->Start(server_host_); + } + void StopServer() { server_->Shutdown(); } + + std::unique_ptr BuildStub( + const std::shared_ptr& channel) { + return grpc::testing::EchoTestService::NewStub(channel); + } + + std::shared_ptr BuildChannel() { + std::ostringstream server_address; + server_address << server_host_ << ":" << port_; + return CreateCustomChannel( + server_address.str(), InsecureChannelCredentials(), ChannelArguments()); + } + + void SendRpc( + const std::unique_ptr& stub, + bool expect_success = false) { + auto response = std::unique_ptr(new EchoResponse()); + EchoRequest request; + request.set_message(kRequestMessage_); + ClientContext context; + Status status = stub->Echo(&context, request, response.get()); + if (status.ok()) { + gpr_log(GPR_DEBUG, "RPC returned %s\n", response->message().c_str()); + } else { + gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str()); + } + if (expect_success) { + EXPECT_TRUE(status.ok()); + } + } + + bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { + const gpr_timespec deadline = + grpc_timeout_seconds_to_deadline(timeout_seconds); + grpc_connectivity_state state; + while ((state = channel->GetState(false /* try_to_connect */)) == + GRPC_CHANNEL_READY) { + if (!channel->WaitForStateChange(state, deadline)) return false; + } + return true; + } + + bool WaitForChannelReady(Channel* channel, int timeout_seconds = 10) { + const gpr_timespec deadline = + grpc_timeout_seconds_to_deadline(timeout_seconds); + grpc_connectivity_state state; + while ((state = channel->GetState(true /* try_to_connect */)) != + GRPC_CHANNEL_READY) { + if (!channel->WaitForStateChange(state, deadline)) return false; + } + return true; + } + + private: + struct ServerData { + int port_; + std::unique_ptr server_; + TestServiceImpl service_; + std::unique_ptr thread_; + bool server_ready_ = false; + + explicit ServerData(int port) { port_ = port; } + + void Start(const grpc::string& server_host) { + gpr_log(GPR_INFO, "starting server on port %d", port_); + std::mutex mu; + std::unique_lock lock(mu); + std::condition_variable cond; + thread_.reset(new std::thread( + std::bind(&ServerData::Serve, this, server_host, &mu, &cond))); + cond.wait(lock, [this] { return server_ready_; }); + server_ready_ = false; + gpr_log(GPR_INFO, "server startup complete"); + } + + void Serve(const grpc::string& server_host, std::mutex* mu, + std::condition_variable* cond) { + std::ostringstream server_address; + server_address << server_host << ":" << port_; + ServerBuilder builder; + builder.AddListeningPort(server_address.str(), + InsecureServerCredentials()); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + std::lock_guard lock(*mu); + server_ready_ = true; + cond->notify_one(); + } + + void Shutdown(bool join = true) { + server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); + if (join) thread_->join(); + } + }; + + const grpc::string server_host_; + const grpc::string interface_; + const grpc::string ipv4_address_; + const grpc::string netmask_; + std::unique_ptr stub_; + std::unique_ptr server_; + int port_; + const grpc::string kRequestMessage_; +}; + +// gRPC should automatically detech network flaps (without enabling keepalives) +// when CFStream is enabled +TEST_F(CFStreamTest, NetworkTransition) { + auto channel = BuildChannel(); + auto stub = BuildStub(channel); + // Channel should be in READY state after we send an RPC + SendRpc(stub, /*expect_success=*/true); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + + std::atomic_bool shutdown{false}; + std::thread sender = std::thread([this, &stub, &shutdown]() { + while (true) { + if (shutdown.load()) { + return; + } + SendRpc(stub); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + }); + + // bring down network + NetworkDown(); + + // network going down should be detected by cfstream + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + + // bring network interface back up + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + NetworkUp(); + + // channel should reconnect + EXPECT_TRUE(WaitForChannelReady(channel.get())); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + shutdown.store(true); + sender.join(); +} + +} // namespace +} // namespace testing +} // namespace grpc +#endif // GRPC_CFSTREAM + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_test_init(argc, argv); + gpr_setenv("grpc_cfstream", "1"); + const auto result = RUN_ALL_TESTS(); + return result; +} diff --git a/tools/internal_ci/macos/grpc_cfstream.cfg b/tools/internal_ci/macos/grpc_cfstream.cfg new file mode 100644 index 00000000000..b911bbe6c69 --- /dev/null +++ b/tools/internal_ci/macos/grpc_cfstream.cfg @@ -0,0 +1,18 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/macos/grpc_run_bazel_tests.sh" diff --git a/tools/internal_ci/macos/grpc_run_bazel_tests.sh b/tools/internal_ci/macos/grpc_run_bazel_tests.sh new file mode 100644 index 00000000000..3dfa182d7c6 --- /dev/null +++ b/tools/internal_ci/macos/grpc_run_bazel_tests.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex + +# change to grpc repo root +cd $(dirname $0)/../../.. + + +./tools/run_tests/start_port_server.py + +# run cfstream_test separately because it messes with the network +bazel test --spawn_strategy=standalone --genrule_strategy=standalone --test_output=all //test/cpp/end2end:cfstream_test + +# kill port_server.py to prevent the build from hanging +ps aux | grep port_server\\.py | awk '{print $2}' | xargs kill -9