From e8efe06a42bd2804c1ce8b1f296268e1dbaa37c5 Mon Sep 17 00:00:00 2001
From: Prashant Jaikumar <>
Date: Fri, 15 Feb 2019 14:43:15 -0800
Subject: [PATCH] Re-add cfstream_test

2nd attempt at adding cfstream_test after fixing internal build failures caused by first attempt.
 BUILD                                         |   7 +
 bazel/grpc_build_system.bzl                   |  23 +-
 test/cpp/end2end/BUILD                        |  21 ++
 test/cpp/end2end/             | 278 ++++++++++++++++++
 tools/internal_ci/macos/grpc_cfstream.cfg     |  19 ++
 .../internal_ci/macos/ |  29 ++
 6 files changed, 371 insertions(+), 6 deletions(-)
 create mode 100644 test/cpp/end2end/
 create mode 100644 tools/internal_ci/macos/grpc_cfstream.cfg
 create mode 100644 tools/internal_ci/macos/

diff --git a/BUILD b/BUILD
index a566057e926..c8c49ff4a41 100644
--- a/BUILD
+++ b/BUILD
@@ -63,6 +63,11 @@ config_setting(
     values = {"cpu": "x64_windows_msvc"},
+    name = "mac_x86_64",
+    values = {"cpu": "darwin"},
 # This should be updated along with build.yaml
 g_stands_for = "godric"
@@ -981,6 +986,7 @@ grpc_cc_library(
     language = "c++",
     public_hdrs = GRPC_PUBLIC_HDRS,
+    use_cfstream = True,
     deps = [
@@ -1044,6 +1050,7 @@ grpc_cc_library(
+    use_cfstream = True,
     deps = [
diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl
index be85bc87324..3ea8e305ca5 100644
--- a/bazel/grpc_build_system.bzl
+++ b/bazel/grpc_build_system.bzl
@@ -35,6 +35,12 @@ def if_not_windows(a):
         "//conditions:default": a,
+def if_mac(a):
+    return select({
+        "//:mac_x86_64": a,
+        "//conditions:default": [],
+    })
 def _get_external_deps(external_deps):
     ret = []
     for dep in external_deps:
@@ -73,10 +79,16 @@ def grpc_cc_library(
         testonly = False,
         visibility = None,
         alwayslink = 0,
-        data = []):
+        data = [],
+        use_cfstream = False):
     copts = []
+    if use_cfstream:
+        copts = if_mac(["-DGRPC_CFSTREAM"])
     if language.upper() == "C":
-        copts = if_not_windows(["-std=c99"])
+        copts = copts + if_not_windows(["-std=c99"])
+    linkopts = if_not_windows(["-pthread"])
+    if use_cfstream:
+        linkopts = linkopts + if_mac(["-framework CoreFoundation"])
         name = name,
         srcs = srcs,
@@ -98,7 +110,7 @@ def grpc_cc_library(
         copts = copts,
         visibility = visibility,
         testonly = testonly,
-        linkopts = if_not_windows(["-pthread"]),
+        linkopts = linkopts,
         includes = [
@@ -113,7 +125,6 @@ def grpc_proto_plugin(name, srcs = [], deps = []):
         deps = deps,
 def grpc_proto_library(
         srcs = [],
@@ -133,9 +144,9 @@ def grpc_proto_library(
 def grpc_cc_test(name, srcs = [], deps = [], external_deps = [], args = [], data = [], uses_polling = True, language = "C++", size = "medium", timeout = None, tags = [], exec_compatible_with = []):
-    copts = []
+    copts = if_mac(["-DGRPC_CFSTREAM"])
     if language.upper() == "C":
-        copts = if_not_windows(["-std=c99"])
+        copts = copts + if_not_windows(["-std=c99"])
     args = {
         "name": name,
         "srcs": srcs,
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 64b3eae60da..1970f3693cb 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -625,3 +625,24 @@ grpc_cc_test(
+    name = "cfstream_test",
+    srcs = [""],
+    external_deps = [
+        "gtest",
+    ],
+    tags = ["manual"],  # test requires root, won't work with bazel RBE
+    deps = [
+        ":test_service_impl",
+        "//:gpr",
+        "//:grpc",
+        "//:grpc++",
+        "//:grpc_cfstream",
+        "//src/proto/grpc/testing:echo_messages_proto",
+        "//src/proto/grpc/testing:echo_proto",
+        "//src/proto/grpc/testing:simple_messages_proto",
+        "//test/core/util:grpc_test_util",
+        "//test/cpp/util:test_util",
+    ],
diff --git a/test/cpp/end2end/ b/test/cpp/end2end/
new file mode 100644
index 00000000000..9039329d815
--- /dev/null
+++ b/test/cpp/end2end/
@@ -0,0 +1,278 @@
+ *
+ * Copyright 2019 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "src/core/lib/iomgr/port.h"
+#include <algorithm>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <thread>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+#include <grpcpp/channel.h>
+#include <grpcpp/client_context.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/health_check_service_interface.h>
+#include <grpcpp/server.h>
+#include <grpcpp/server_builder.h>
+#include <gtest/gtest.h>
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/gpr/env.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_service_impl.h"
+using grpc::testing::EchoRequest;
+using grpc::testing::EchoResponse;
+using std::chrono::system_clock;
+namespace grpc {
+namespace testing {
+namespace {
+class CFStreamTest : public ::testing::Test {
+ protected:
+  CFStreamTest()
+      : server_host_("grpctest"),
+        interface_("lo0"),
+        ipv4_address_(""),
+        netmask_("/32"),
+        kRequestMessage_("🖖") {}
+  void DNSUp() {
+    std::ostringstream cmd;
+    // Add DNS entry for server_host_ in /etc/hosts
+    cmd << "echo '" << ipv4_address_ << "      " << server_host_
+        << "  ' | sudo tee -a /etc/hosts";
+    std::system(cmd.str().c_str());
+  }
+  void DNSDown() {
+    std::ostringstream cmd;
+    // Remove DNS entry for server_host_ in /etc/hosts
+    cmd << "sudo sed -i '.bak' '/" << server_host_ << "/d' /etc/hosts";
+    std::system(cmd.str().c_str());
+  }
+  void InterfaceUp() {
+    std::ostringstream cmd;
+    cmd << "sudo /sbin/ifconfig " << interface_ << " alias " << ipv4_address_;
+    std::system(cmd.str().c_str());
+  }
+  void InterfaceDown() {
+    std::ostringstream cmd;
+    cmd << "sudo /sbin/ifconfig " << interface_ << " -alias " << ipv4_address_;
+    std::system(cmd.str().c_str());
+  }
+  void NetworkUp() {
+    InterfaceUp();
+    DNSUp();
+  }
+  void NetworkDown() {
+    InterfaceDown();
+    DNSDown();
+  }
+  void SetUp() override {
+    NetworkUp();
+    grpc_init();
+    StartServer();
+  }
+  void TearDown() override {
+    NetworkDown();
+    StopServer();
+    grpc_shutdown();
+  }
+  void StartServer() {
+    port_ = grpc_pick_unused_port_or_die();
+    server_.reset(new ServerData(port_));
+    server_->Start(server_host_);
+  }
+  void StopServer() { server_->Shutdown(); }
+  std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
+      const std::shared_ptr<Channel>& channel) {
+    return grpc::testing::EchoTestService::NewStub(channel);
+  }
+  std::shared_ptr<Channel> BuildChannel() {
+    std::ostringstream server_address;
+    server_address << server_host_ << ":" << port_;
+    return CreateCustomChannel(
+        server_address.str(), InsecureChannelCredentials(), ChannelArguments());
+  }
+  void SendRpc(
+      const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
+      bool expect_success = false) {
+    auto response = std::unique_ptr<EchoResponse>(new EchoResponse());
+    EchoRequest request;
+    request.set_message(kRequestMessage_);
+    ClientContext context;
+    Status status = stub->Echo(&context, request, response.get());
+    if (status.ok()) {
+      gpr_log(GPR_DEBUG, "RPC returned %s\n", response->message().c_str());
+    } else {
+      gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str());
+    }
+    if (expect_success) {
+      EXPECT_TRUE(status.ok());
+    }
+  }
+  bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
+    const gpr_timespec deadline =
+        grpc_timeout_seconds_to_deadline(timeout_seconds);
+    grpc_connectivity_state state;
+    while ((state = channel->GetState(false /* try_to_connect */)) ==
+           GRPC_CHANNEL_READY) {
+      if (!channel->WaitForStateChange(state, deadline)) return false;
+    }
+    return true;
+  }
+  bool WaitForChannelReady(Channel* channel, int timeout_seconds = 10) {
+    const gpr_timespec deadline =
+        grpc_timeout_seconds_to_deadline(timeout_seconds);
+    grpc_connectivity_state state;
+    while ((state = channel->GetState(true /* try_to_connect */)) !=
+           GRPC_CHANNEL_READY) {
+      if (!channel->WaitForStateChange(state, deadline)) return false;
+    }
+    return true;
+  }
+ private:
+  struct ServerData {
+    int port_;
+    std::unique_ptr<Server> server_;
+    TestServiceImpl service_;
+    std::unique_ptr<std::thread> thread_;
+    bool server_ready_ = false;
+    explicit ServerData(int port) { port_ = port; }
+    void Start(const grpc::string& server_host) {
+      gpr_log(GPR_INFO, "starting server on port %d", port_);
+      std::mutex mu;
+      std::unique_lock<std::mutex> lock(mu);
+      std::condition_variable cond;
+      thread_.reset(new std::thread(
+          std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
+      cond.wait(lock, [this] { return server_ready_; });
+      server_ready_ = false;
+      gpr_log(GPR_INFO, "server startup complete");
+    }
+    void Serve(const grpc::string& server_host, std::mutex* mu,
+               std::condition_variable* cond) {
+      std::ostringstream server_address;
+      server_address << server_host << ":" << port_;
+      ServerBuilder builder;
+      builder.AddListeningPort(server_address.str(),
+                               InsecureServerCredentials());
+      builder.RegisterService(&service_);
+      server_ = builder.BuildAndStart();
+      std::lock_guard<std::mutex> lock(*mu);
+      server_ready_ = true;
+      cond->notify_one();
+    }
+    void Shutdown(bool join = true) {
+      server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
+      if (join) thread_->join();
+    }
+  };
+  const grpc::string server_host_;
+  const grpc::string interface_;
+  const grpc::string ipv4_address_;
+  const grpc::string netmask_;
+  std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+  std::unique_ptr<ServerData> server_;
+  int port_;
+  const grpc::string kRequestMessage_;
+// gRPC should automatically detech network flaps (without enabling keepalives)
+//  when CFStream is enabled
+TEST_F(CFStreamTest, NetworkTransition) {
+  auto channel = BuildChannel();
+  auto stub = BuildStub(channel);
+  // Channel should be in READY state after we send an RPC
+  SendRpc(stub, /*expect_success=*/true);
+  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
+  std::atomic_bool shutdown{false};
+  std::thread sender = std::thread([this, &stub, &shutdown]() {
+    while (true) {
+      if (shutdown.load()) {
+        return;
+      }
+      SendRpc(stub);
+      std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    }
+  });
+  // bring down network
+  NetworkDown();
+  // network going down should be detected by cfstream
+  EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
+  // bring network interface back up
+  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+  NetworkUp();
+  // channel should reconnect
+  EXPECT_TRUE(WaitForChannelReady(channel.get()));
+  EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY);
+  sender.join();
+}  // namespace
+}  // namespace testing
+}  // namespace grpc
+#endif  // GRPC_CFSTREAM
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  grpc_test_init(argc, argv);
+  gpr_setenv("grpc_cfstream", "1");
+  // TODO (pjaikumar): remove the line below when
+  // has been fixed.
+  gpr_setenv("GRPC_DNS_RESOLVER", "native");
+  const auto result = RUN_ALL_TESTS();
+  return result;
diff --git a/tools/internal_ci/macos/grpc_cfstream.cfg b/tools/internal_ci/macos/grpc_cfstream.cfg
new file mode 100644
index 00000000000..2b1ce0a89c7
--- /dev/null
+++ b/tools/internal_ci/macos/grpc_cfstream.cfg
@@ -0,0 +1,19 @@
+# Copyright 2019 The gRPC Authors
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# Config file for the internal CI (in protobuf text format)
+# Location of the continuous shell script in repository.
+build_file: "grpc/tools/internal_ci/macos/"
diff --git a/tools/internal_ci/macos/ b/tools/internal_ci/macos/
new file mode 100644
index 00000000000..ef02a675d5b
--- /dev/null
+++ b/tools/internal_ci/macos/
@@ -0,0 +1,29 @@
+#!/usr/bin/env bash
+# Copyright 2019 The gRPC Authors
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+set -ex
+# change to grpc repo root
+cd $(dirname $0)/../../..
+# run cfstream_test separately because it messes with the network
+bazel test --spawn_strategy=standalone --genrule_strategy=standalone --test_output=all //test/cpp/end2end:cfstream_test
+# kill to prevent the build from hanging
+ps aux | grep port_server\\.py | awk '{print $2}' | xargs kill -9