/*
 *
 * Copyright 2019 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <gtest/gtest.h>

#include <mutex>
#include <thread>

#include "src/core/lib/gpr/env.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/core/util/test_tcp_server.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/test_credentials_provider.h"

#ifdef GRPC_POSIX_SOCKET_TCP_SERVER

#include "src/core/lib/iomgr/tcp_posix.h"

namespace grpc {
namespace testing {
namespace {

class TestScenario {
 public:
  TestScenario(bool server_port, bool pending_data,
               const grpc::string& creds_type)
      : server_has_port(server_port),
        queue_pending_data(pending_data),
        credentials_type(creds_type) {}
  void Log() const;
  // server has its own port or not
  bool server_has_port;
  // whether tcp server should read some data before handoff
  bool queue_pending_data;
  const grpc::string credentials_type;
};

static std::ostream& operator<<(std::ostream& out,
                                const TestScenario& scenario) {
  return out << "TestScenario{server_has_port="
             << (scenario.server_has_port ? "true" : "false")
             << ", queue_pending_data="
             << (scenario.queue_pending_data ? "true" : "false")
             << ", credentials='" << scenario.credentials_type << "'}";
}

void TestScenario::Log() const {
  std::ostringstream out;
  out << *this;
  gpr_log(GPR_ERROR, "%s", out.str().c_str());
}

// Set up a test tcp server which is in charge of accepting connections and
// handing off the connections as fds.
class TestTcpServer {
 public:
  TestTcpServer()
      : shutdown_(false),
        queue_data_(false),
        port_(grpc_pick_unused_port_or_die()) {
    std::ostringstream server_address;
    server_address << "localhost:" << port_;
    address_ = server_address.str();
    test_tcp_server_init(&tcp_server_, &TestTcpServer::OnConnect, this);
    GRPC_CLOSURE_INIT(&on_fd_released_, &TestTcpServer::OnFdReleased, this,
                      grpc_schedule_on_exec_ctx);
  }

  ~TestTcpServer() {
    running_thread_.join();
    test_tcp_server_destroy(&tcp_server_);
    grpc_recycle_unused_port(port_);
  }

  // Read some data before handing off the connection.
  void SetQueueData() { queue_data_ = true; }

  void Start() {
    test_tcp_server_start(&tcp_server_, port_);
    gpr_log(GPR_INFO, "Test TCP server started at %s", address_.c_str());
  }

  const grpc::string& address() { return address_; }

  void SetAcceptor(
      std::unique_ptr<experimental::ExternalConnectionAcceptor> acceptor) {
    connection_acceptor_ = std::move(acceptor);
  }

  void Run() {
    running_thread_ = std::thread([this]() {
      while (true) {
        {
          std::lock_guard<std::mutex> lock(mu_);
          if (shutdown_) {
            return;
          }
        }
        test_tcp_server_poll(&tcp_server_, 1);
      }
    });
  }

  void Shutdown() {
    std::lock_guard<std::mutex> lock(mu_);
    shutdown_ = true;
  }

  static void OnConnect(void* arg, grpc_endpoint* tcp,
                        grpc_pollset* accepting_pollset,
                        grpc_tcp_server_acceptor* acceptor) {
    auto* self = static_cast<TestTcpServer*>(arg);
    self->OnConnect(tcp, accepting_pollset, acceptor);
  }

  static void OnFdReleased(void* arg, grpc_error* err) {
    auto* self = static_cast<TestTcpServer*>(arg);
    self->OnFdReleased(err);
  }

 private:
  void OnConnect(grpc_endpoint* tcp, grpc_pollset* /*accepting_pollset*/,
                 grpc_tcp_server_acceptor* acceptor) {
    char* peer = grpc_endpoint_get_peer(tcp);
    gpr_log(GPR_INFO, "Got incoming connection! from %s", peer);
    gpr_free(peer);
    EXPECT_FALSE(acceptor->external_connection);
    listener_fd_ = grpc_tcp_server_port_fd(
        acceptor->from_server, acceptor->port_index, acceptor->fd_index);
    gpr_free(acceptor);
    grpc_tcp_destroy_and_release_fd(tcp, &fd_, &on_fd_released_);
  }

  void OnFdReleased(grpc_error* err) {
    EXPECT_EQ(GRPC_ERROR_NONE, err);
    experimental::ExternalConnectionAcceptor::NewConnectionParameters p;
    p.listener_fd = listener_fd_;
    p.fd = fd_;
    if (queue_data_) {
      char buf[1024];
      ssize_t read_bytes = 0;
      while (read_bytes <= 0) {
        read_bytes = read(fd_, buf, 1024);
      }
      Slice data(buf, read_bytes);
      p.read_buffer = ByteBuffer(&data, 1);
    }
    gpr_log(GPR_INFO, "Handing off fd %d with data size %d from listener fd %d",
            fd_, static_cast<int>(p.read_buffer.Length()), listener_fd_);
    connection_acceptor_->HandleNewConnection(&p);
  }

  std::mutex mu_;
  bool shutdown_;

  int listener_fd_ = -1;
  int fd_ = -1;
  bool queue_data_ = false;

  grpc_closure on_fd_released_;
  std::thread running_thread_;
  int port_ = -1;
  grpc::string address_;
  std::unique_ptr<experimental::ExternalConnectionAcceptor>
      connection_acceptor_;
  test_tcp_server tcp_server_;
};

class PortSharingEnd2endTest : public ::testing::TestWithParam<TestScenario> {
 protected:
  PortSharingEnd2endTest() : is_server_started_(false), first_picked_port_(0) {
    GetParam().Log();
  }

  void SetUp() override {
    if (GetParam().queue_pending_data) {
      tcp_server1_.SetQueueData();
      tcp_server2_.SetQueueData();
    }
    tcp_server1_.Start();
    tcp_server2_.Start();
    ServerBuilder builder;
    if (GetParam().server_has_port) {
      int port = grpc_pick_unused_port_or_die();
      first_picked_port_ = port;
      server_address_ << "localhost:" << port;
      auto creds = GetCredentialsProvider()->GetServerCredentials(
          GetParam().credentials_type);
      builder.AddListeningPort(server_address_.str(), creds);
      gpr_log(GPR_INFO, "gRPC server listening on %s",
              server_address_.str().c_str());
    }
    auto server_creds = GetCredentialsProvider()->GetServerCredentials(
        GetParam().credentials_type);
    auto acceptor1 = builder.experimental().AddExternalConnectionAcceptor(
        ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
        server_creds);
    tcp_server1_.SetAcceptor(std::move(acceptor1));
    auto acceptor2 = builder.experimental().AddExternalConnectionAcceptor(
        ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
        server_creds);
    tcp_server2_.SetAcceptor(std::move(acceptor2));
    builder.RegisterService(&service_);
    server_ = builder.BuildAndStart();
    is_server_started_ = true;

    tcp_server1_.Run();
    tcp_server2_.Run();
  }

  void TearDown() override {
    tcp_server1_.Shutdown();
    tcp_server2_.Shutdown();
    if (is_server_started_) {
      server_->Shutdown();
    }
    if (first_picked_port_ > 0) {
      grpc_recycle_unused_port(first_picked_port_);
    }
  }

  void ResetStubs() {
    EXPECT_TRUE(is_server_started_);
    ChannelArguments args;
    args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
    auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
        GetParam().credentials_type, &args);
    channel_handoff1_ =
        CreateCustomChannel(tcp_server1_.address(), channel_creds, args);
    stub_handoff1_ = EchoTestService::NewStub(channel_handoff1_);
    channel_handoff2_ =
        CreateCustomChannel(tcp_server2_.address(), channel_creds, args);
    stub_handoff2_ = EchoTestService::NewStub(channel_handoff2_);
    if (GetParam().server_has_port) {
      ChannelArguments direct_args;
      direct_args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
      auto direct_creds = GetCredentialsProvider()->GetChannelCredentials(
          GetParam().credentials_type, &direct_args);
      channel_direct_ =
          CreateCustomChannel(server_address_.str(), direct_creds, direct_args);
      stub_direct_ = EchoTestService::NewStub(channel_direct_);
    }
  }

  bool is_server_started_;
  // channel/stub to the test tcp server, the connection will be handed to the
  // grpc server.
  std::shared_ptr<Channel> channel_handoff1_;
  std::unique_ptr<EchoTestService::Stub> stub_handoff1_;
  std::shared_ptr<Channel> channel_handoff2_;
  std::unique_ptr<EchoTestService::Stub> stub_handoff2_;
  // channel/stub to talk to the grpc server directly, if applicable.
  std::shared_ptr<Channel> channel_direct_;
  std::unique_ptr<EchoTestService::Stub> stub_direct_;
  std::unique_ptr<Server> server_;
  std::ostringstream server_address_;
  TestServiceImpl service_;
  TestTcpServer tcp_server1_;
  TestTcpServer tcp_server2_;
  int first_picked_port_;
};

static void SendRpc(EchoTestService::Stub* stub, int num_rpcs) {
  EchoRequest request;
  EchoResponse response;
  request.set_message("Hello hello hello hello");

  for (int i = 0; i < num_rpcs; ++i) {
    ClientContext context;
    Status s = stub->Echo(&context, request, &response);
    EXPECT_EQ(response.message(), request.message());
    EXPECT_TRUE(s.ok());
  }
}

std::vector<TestScenario> CreateTestScenarios() {
  std::vector<TestScenario> scenarios;
  std::vector<grpc::string> credentials_types;

#if TARGET_OS_IPHONE
  // Workaround Apple CFStream bug
  gpr_setenv("grpc_cfstream", "0");
#endif

  credentials_types = GetCredentialsProvider()->GetSecureCredentialsTypeList();
  // Only allow insecure credentials type when it is registered with the
  // provider. User may create providers that do not have insecure.
  if (GetCredentialsProvider()->GetChannelCredentials(kInsecureCredentialsType,
                                                      nullptr) != nullptr) {
    credentials_types.push_back(kInsecureCredentialsType);
  }

  GPR_ASSERT(!credentials_types.empty());
  for (const auto& cred : credentials_types) {
    for (auto server_has_port : {true, false}) {
      for (auto queue_pending_data : {true, false}) {
        scenarios.emplace_back(server_has_port, queue_pending_data, cred);
      }
    }
  }
  return scenarios;
}

TEST_P(PortSharingEnd2endTest, HandoffAndDirectCalls) {
  ResetStubs();
  SendRpc(stub_handoff1_.get(), 5);
  if (GetParam().server_has_port) {
    SendRpc(stub_direct_.get(), 5);
  }
}

TEST_P(PortSharingEnd2endTest, MultipleHandoff) {
  for (int i = 0; i < 3; i++) {
    ResetStubs();
    SendRpc(stub_handoff2_.get(), 1);
  }
}

TEST_P(PortSharingEnd2endTest, TwoHandoffPorts) {
  for (int i = 0; i < 3; i++) {
    ResetStubs();
    SendRpc(stub_handoff1_.get(), 5);
    SendRpc(stub_handoff2_.get(), 5);
  }
}

INSTANTIATE_TEST_SUITE_P(PortSharingEnd2end, PortSharingEnd2endTest,
                         ::testing::ValuesIn(CreateTestScenarios()));

}  // namespace
}  // namespace testing
}  // namespace grpc

#endif  // GRPC_POSIX_SOCKET_TCP_SERVER

int main(int argc, char** argv) {
  grpc::testing::TestEnvironment env(argc, argv);
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}