//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include "test/cpp/interop/istio_echo_server_lib.h"

#include <thread>

#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_split.h"
#include "absl/synchronization/blocking_counter.h"

#include <grpcpp/client_context.h>
#include <grpcpp/grpcpp.h>

#include "src/core/lib/gprpp/host_port.h"
#include "src/proto/grpc/testing/istio_echo.pb.h"

using proto::EchoRequest;
using proto::EchoResponse;
using proto::EchoTestService;
using proto::ForwardEchoRequest;
using proto::ForwardEchoResponse;

namespace grpc {
namespace testing {
namespace {

const absl::string_view kRequestIdField = "x-request-id";
const absl::string_view kServiceVersionField = "ServiceVersion";
// const absl::string_view kServicePortField = "ServicePort";
const absl::string_view kStatusCodeField = "StatusCode";
// const absl::string_view kUrlField = "URL";
const absl::string_view kHostField = "Host";
const absl::string_view kHostnameField = "Hostname";
// const absl::string_view kMethodField = "Method";
const absl::string_view kRequestHeader = "RequestHeader";
// const absl::string_view kResponseHeader = "ResponseHeader";
// const absl::string_view kClusterField = "Cluster";
// const absl::string_view kIstioVersionField = "IstioVersion";
const absl::string_view kIpField = "IP";  // The Requester’s IP Address.

absl::string_view StringRefToStringView(const string_ref& r) {
  return absl::string_view(r.data(), r.size());
}

struct EchoCall {
  grpc::ClientContext context;
  proto::EchoResponse response;
  Status status;
};

}  // namespace

EchoTestServiceImpl::EchoTestServiceImpl(std::string hostname,
                                         std::string service_version,
                                         std::string forwarding_address)
    : hostname_(std::move(hostname)),
      service_version_(std::move(service_version)),
      forwarding_address_(std::move(forwarding_address)) {
  forwarding_stub_ = EchoTestService::NewStub(
      CreateChannel(forwarding_address_, InsecureChannelCredentials()));
}

Status EchoTestServiceImpl::Echo(ServerContext* context,
                                 const EchoRequest* request,
                                 EchoResponse* response) {
  std::string s;
  absl::StrAppend(&s, kHostField, "=",
                  StringRefToStringView(context->ExperimentalGetAuthority()),
                  "\n");
  const std::multimap<string_ref, string_ref> metadata =
      context->client_metadata();
  for (const auto& kv : metadata) {
    // Skip all binary headers.
    if (kv.first.ends_with("-bin")) {
      continue;
    }
    absl::StrAppend(&s, kRequestHeader, "=", StringRefToStringView(kv.first),
                    ":", StringRefToStringView(kv.second), "\n");
  }
  absl::string_view host;
  absl::string_view port;
  std::string peer = context->peer();
  grpc_core::SplitHostPort(peer, &host, &port);
  // This is not a complete list, but also not all fields are used. May
  //  need to add/remove fields later, if required by tests. Only keep the
  //  fields needed for now.
  //
  //  absl::StrAppend(&s,kServicePortField,"=",this->port_,"\n");
  //  absl::StrAppend(&s,kClusterField,"=",this->cluster_,"\n");
  //  absl::StrAppend(&s,kIstioVersionField,"=",this->istio_version_,"\n");
  absl::StrAppend(&s, kServiceVersionField, "=", this->service_version_, "\n");
  absl::StrAppend(&s, kIpField, "=", host, "\n");
  absl::StrAppend(&s, kStatusCodeField, "=", std::to_string(200), "\n");
  absl::StrAppend(&s, kHostnameField, "=", this->hostname_, "\n");
  absl::StrAppend(&s, "Echo=", request->message(), "\n");
  response->set_message(s);
  gpr_log(GPR_INFO, "Echo response:\n%s", s.c_str());
  return Status::OK;
}

Status EchoTestServiceImpl::ForwardEcho(ServerContext* context,
                                        const ForwardEchoRequest* request,
                                        ForwardEchoResponse* response) {
  std::string raw_url = request->url();
  size_t colon = raw_url.find_first_of(':');
  std::string scheme;
  if (colon == std::string::npos) {
    return Status(
        StatusCode::INVALID_ARGUMENT,
        absl::StrFormat("No protocol configured for url %s", raw_url));
  }
  scheme = raw_url.substr(0, colon);
  std::shared_ptr<Channel> channel;
  if (scheme == "xds") {
    // We can optionally add support for TLS creds, but we are primarily
    // concerned with proxyless-grpc here.
    gpr_log(GPR_INFO, "Creating channel to %s using xDS Creds",
            raw_url.c_str());
    channel =
        CreateChannel(raw_url, XdsCredentials(InsecureChannelCredentials()));
  } else if (scheme == "grpc") {
    // We don't really want to test this but the istio test infrastructure needs
    // this to be supported. If we ever decide to add support for this properly,
    // we would need to add support for TLS creds here.
    absl::string_view address = absl::StripPrefix(raw_url, "grpc://");
    gpr_log(GPR_INFO, "Creating channel to %s", std::string(address).c_str());
    channel = CreateChannel(std::string(address), InsecureChannelCredentials());
  } else {
    gpr_log(GPR_INFO, "Protocol %s not supported. Forwarding to %s",
            scheme.c_str(), forwarding_address_.c_str());
    ClientContext forwarding_ctx;
    forwarding_ctx.set_deadline(context->deadline());
    return forwarding_stub_->ForwardEcho(&forwarding_ctx, *request, response);
  }
  auto stub = EchoTestService::NewStub(channel);
  auto count = request->count() == 0 ? 1 : request->count();
  // Calculate the amount of time to sleep after each call.
  std::chrono::duration<double> duration_per_query =
      std::chrono::nanoseconds::zero();
  if (request->qps() > 0) {
    duration_per_query =
        std::chrono::nanoseconds(std::chrono::seconds(1)) / request->qps();
  }
  std::vector<EchoCall> calls(count);
  EchoRequest echo_request;
  echo_request.set_message(request->message());
  absl::BlockingCounter counter(count);
  for (int i = 0; i < count; ++i) {
    calls[i].context.AddMetadata(std::string(kRequestIdField),
                                 std::to_string(i));
    for (const auto& header : request->headers()) {
      if (header.key() != kHostField) {
        calls[i].context.AddMetadata(header.key(), header.value());
      }
    }
    constexpr int kDefaultTimeout = 5 * 1000 * 1000 /* 5 seconds */;
    std::chrono::system_clock::time_point deadline =
        std::chrono::system_clock::now() +
        std::chrono::microseconds(request->timeout_micros() > 0
                                      ? request->timeout_micros()
                                      : kDefaultTimeout);
    calls[i].context.set_deadline(deadline);
    stub->async()->Echo(&calls[i].context, &echo_request, &calls[i].response,
                        [&, index = i](Status s) {
                          calls[index].status = s;
                          counter.DecrementCount();
                        });
    std::this_thread::sleep_for(duration_per_query);
  }
  // Wait for all calls to be done.
  counter.Wait();
  for (int i = 0; i < count; ++i) {
    if (calls[i].status.ok()) {
      std::string body;
      // The test infrastructure might expect the entire struct instead of
      // just the message.
      absl::StrAppend(&body, absl::StrFormat("[%d] grpcecho.Echo(%s)\n", i,
                                             request->message()));
      auto contents =
          absl::StrSplit(calls[i].response.message(), '\n', absl::SkipEmpty());
      for (const auto& line : contents) {
        absl::StrAppend(&body, absl::StrFormat("[%d body] %s\n", i, line));
      }
      response->add_output(body);
      gpr_log(GPR_INFO, "Forward Echo response:%d\n%s", i, body.c_str());
    } else {
      gpr_log(GPR_ERROR, "RPC %d failed %d: %s", i,
              calls[i].status.error_code(),
              calls[i].status.error_message().c_str());
      response->clear_output();
      return calls[i].status;
    }
  }
  return Status::OK;
}

}  // namespace testing
}  // namespace grpc