Istio Echo Server Implementation (#29940)
* Istio Echo Server Implementation * New line fix * Fix racepull/29947/head
parent
088a118c08
commit
1e9fe0b8b3
10 changed files with 637 additions and 344 deletions
@ -0,0 +1,170 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <algorithm> |
||||
#include <atomic> |
||||
#include <chrono> |
||||
#include <condition_variable> |
||||
#include <deque> |
||||
#include <map> |
||||
#include <mutex> |
||||
#include <set> |
||||
#include <sstream> |
||||
#include <string> |
||||
#include <thread> |
||||
#include <vector> |
||||
|
||||
#include "absl/algorithm/container.h" |
||||
#include "absl/flags/flag.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/strings/str_join.h" |
||||
#include "absl/strings/str_split.h" |
||||
|
||||
#include <grpcpp/ext/admin_services.h> |
||||
#include <grpcpp/ext/proto_server_reflection_plugin.h> |
||||
#include <grpcpp/grpcpp.h> |
||||
#include <grpcpp/server.h> |
||||
#include <grpcpp/server_builder.h> |
||||
#include <grpcpp/server_context.h> |
||||
#include <grpcpp/support/string_ref.h> |
||||
|
||||
#include "src/core/lib/channel/status_util.h" |
||||
#include "src/core/lib/gpr/env.h" |
||||
#include "src/core/lib/iomgr/gethostname.h" |
||||
#include "src/proto/grpc/testing/istio_echo.pb.h" |
||||
#include "test/core/util/test_config.h" |
||||
#include "test/cpp/interop/istio_echo_server_lib.h" |
||||
#include "test/cpp/util/test_config.h" |
||||
|
||||
// A list of ports to listen on, for gRPC traffic.
|
||||
ABSL_FLAG(std::vector<std::string>, grpc, std::vector<std::string>({"7070"}), |
||||
"GRPC ports"); |
||||
|
||||
// The following flags must be defined, but are not used for now. Some may be
|
||||
// necessary for certain tests.
|
||||
ABSL_FLAG(std::vector<std::string>, port, std::vector<std::string>({"8080"}), |
||||
"HTTP/1.1 ports"); |
||||
ABSL_FLAG(std::vector<std::string>, tcp, std::vector<std::string>({"9090"}), |
||||
"TCP ports"); |
||||
ABSL_FLAG(std::vector<std::string>, tls, std::vector<std::string>({""}), |
||||
"Ports that are using TLS. These must be defined as http/grpc/tcp."); |
||||
ABSL_FLAG(std::vector<std::string>, bind_ip, std::vector<std::string>({""}), |
||||
"Ports that are bound to INSTANCE_IP rather than wildcard IP."); |
||||
ABSL_FLAG(std::vector<std::string>, bind_localhost, |
||||
std::vector<std::string>({""}), |
||||
"Ports that are bound to localhost rather than wildcard IP."); |
||||
ABSL_FLAG(std::vector<std::string>, server_first, |
||||
std::vector<std::string>({""}), |
||||
"Ports that are server first. These must be defined as tcp."); |
||||
ABSL_FLAG(std::vector<std::string>, xds_grpc_server, |
||||
std::vector<std::string>({""}), |
||||
"Ports that should rely on XDS configuration to serve"); |
||||
ABSL_FLAG(std::string, metrics, "", "Metrics port"); |
||||
ABSL_FLAG(std::string, uds, "", "HTTP server on unix domain socket"); |
||||
ABSL_FLAG(std::string, cluster, "", "Cluster where this server is deployed"); |
||||
ABSL_FLAG(std::string, crt, "", "gRPC TLS server-side certificate"); |
||||
ABSL_FLAG(std::string, key, "", "gRPC TLS server-side key"); |
||||
ABSL_FLAG(std::string, istio_version, "", "Istio sidecar version"); |
||||
ABSL_FLAG(std::string, disable_alpn, "", "disable ALPN negotiation"); |
||||
|
||||
namespace grpc { |
||||
namespace testing { |
||||
namespace { |
||||
/*std::vector<std::unique_ptr<grpc::Server>>*/ |
||||
void RunServer(std::vector<int> ports) { |
||||
std::string hostname; |
||||
char* hostname_p = grpc_gethostname(); |
||||
if (hostname_p == nullptr) { |
||||
hostname = absl::StrFormat("generated-%d", rand() % 1000); |
||||
} else { |
||||
hostname = hostname_p; |
||||
free(hostname_p); |
||||
} |
||||
EchoTestServiceImpl echo_test_service(hostname); |
||||
ServerBuilder builder; |
||||
builder.RegisterService(&echo_test_service); |
||||
for (int port : ports) { |
||||
std::ostringstream server_address; |
||||
server_address << "0.0.0.0:" << port; |
||||
builder.AddListeningPort(server_address.str(), |
||||
grpc::InsecureServerCredentials()); |
||||
gpr_log(GPR_DEBUG, "Server listening on %s", server_address.str().c_str()); |
||||
} |
||||
|
||||
// 3333 is the magic port that the istio testing for k8s health checks. And
|
||||
// it only needs TCP. So also make the gRPC server to listen on 3333.
|
||||
std::ostringstream server_address_3333; |
||||
server_address_3333 << "0.0.0.0:" << 3333; |
||||
builder.AddListeningPort(server_address_3333.str(), |
||||
grpc::InsecureServerCredentials()); |
||||
std::unique_ptr<Server> server(builder.BuildAndStart()); |
||||
server->Wait(); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
||||
|
||||
int main(int argc, char** argv) { |
||||
// Preprocess argv, for two things:
|
||||
// 1. merge duplciate flags. So "--grpc=8080 --grpc=9090" becomes
|
||||
// "--grpc=8080,9090".
|
||||
// 2. replace '-' to '_'. So "--istio-version=123" becomes
|
||||
// "--istio_version=123".
|
||||
std::map<std::string, std::vector<std::string>> argv_dict; |
||||
for (int i = 0; i < argc; i++) { |
||||
std::string arg(argv[i]); |
||||
size_t equal = arg.find_first_of('='); |
||||
if (equal != std::string::npos) { |
||||
std::string f = arg.substr(0, equal); |
||||
if (f == "--version") { |
||||
continue; |
||||
} |
||||
std::string v = arg.substr(equal + 1, std::string::npos); |
||||
argv_dict[f].push_back(v); |
||||
} |
||||
} |
||||
std::vector<char*> new_argv_strs; |
||||
// Keep the command itself.
|
||||
new_argv_strs.push_back(argv[0]); |
||||
for (const auto& kv : argv_dict) { |
||||
std::string values; |
||||
for (const auto& s : kv.second) { |
||||
if (!values.empty()) values += ","; |
||||
values += s; |
||||
} |
||||
// replace '-' to '_', excluding the leading "--".
|
||||
std::string f = kv.first; |
||||
std::replace(f.begin() + 2, f.end(), '-', '_'); |
||||
std::string k_vs = absl::StrCat(f, "=", values); |
||||
char* writable = new char[k_vs.size() + 1]; |
||||
std::copy(k_vs.begin(), k_vs.end(), writable); |
||||
writable[k_vs.size()] = '\0'; |
||||
new_argv_strs.push_back(writable); |
||||
} |
||||
int new_argc = new_argv_strs.size(); |
||||
char** new_argv = new_argv_strs.data(); |
||||
grpc::testing::TestEnvironment env(&new_argc, new_argv); |
||||
grpc::testing::InitTest(&new_argc, &new_argv, true); |
||||
// Turn gRPC ports from a string vector to an int vector.
|
||||
std::vector<int> grpc_ports; |
||||
for (const auto& p : absl::GetFlag(FLAGS_grpc)) { |
||||
int grpc_port = std::stoi(p); |
||||
grpc_ports.push_back(grpc_port); |
||||
} |
||||
grpc::testing::RunServer(grpc_ports); |
||||
return 0; |
||||
} |
@ -0,0 +1,184 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include "test/cpp/interop/istio_echo_server_lib.h" |
||||
|
||||
#include <thread> |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/strings/str_split.h" |
||||
#include "absl/synchronization/blocking_counter.h" |
||||
|
||||
#include <grpcpp/grpcpp.h> |
||||
|
||||
#include "src/core/lib/gprpp/host_port.h" |
||||
#include "src/proto/grpc/testing/istio_echo.pb.h" |
||||
|
||||
using proto::EchoRequest; |
||||
using proto::EchoResponse; |
||||
using proto::EchoTestService; |
||||
using proto::ForwardEchoRequest; |
||||
using proto::ForwardEchoResponse; |
||||
|
||||
namespace grpc { |
||||
namespace testing { |
||||
namespace { |
||||
|
||||
const absl::string_view kRequestIdField = "x-request-id"; |
||||
// const absl::string_view kServiceVersionField = "ServiceVersion";
|
||||
// const absl::string_view kServicePortField = "ServicePort";
|
||||
const absl::string_view kStatusCodeField = "StatusCode"; |
||||
// const absl::string_view kUrlField = "URL";
|
||||
const absl::string_view kHostField = "Host"; |
||||
const absl::string_view kHostnameField = "Hostname"; |
||||
// const absl::string_view kMethodField = "Method";
|
||||
const absl::string_view kRequestHeader = "RequestHeader"; |
||||
// const absl::string_view kResponseHeader = "ResponseHeader";
|
||||
// const absl::string_view kClusterField = "Cluster";
|
||||
// const absl::string_view kIstioVersionField = "IstioVersion";
|
||||
const absl::string_view kIpField = "IP"; // The Requester’s IP Address.
|
||||
|
||||
absl::string_view StringRefToStringView(const string_ref& r) { |
||||
return absl::string_view(r.data(), r.size()); |
||||
} |
||||
|
||||
struct EchoCall { |
||||
grpc::ClientContext context; |
||||
proto::EchoResponse response; |
||||
Status status; |
||||
}; |
||||
|
||||
} // namespace
|
||||
|
||||
Status EchoTestServiceImpl::Echo(ServerContext* context, |
||||
const EchoRequest* request, |
||||
EchoResponse* response) { |
||||
std::string s; |
||||
absl::StrAppend(&s, kHostField, "=", |
||||
StringRefToStringView(context->ExperimentalGetAuthority()), |
||||
"\n"); |
||||
const std::multimap<string_ref, string_ref> metadata = |
||||
context->client_metadata(); |
||||
for (const auto& kv : metadata) { |
||||
// Skip all binary headers.
|
||||
if (kv.first.ends_with("-bin")) { |
||||
continue; |
||||
} |
||||
absl::StrAppend(&s, kRequestHeader, "=", StringRefToStringView(kv.first), |
||||
":", StringRefToStringView(kv.second), "\n"); |
||||
} |
||||
absl::string_view host; |
||||
absl::string_view port; |
||||
std::string peer = context->peer(); |
||||
grpc_core::SplitHostPort(peer, &host, &port); |
||||
// This is not a complete list, but also not all fields are used. May
|
||||
// need to add/remove fields later, if required by tests. Only keep the
|
||||
// fields needed for now.
|
||||
//
|
||||
// absl::StrAppend(&s,kServiceVersionField,"=",this->version_,"\n");
|
||||
// absl::StrAppend(&s,kServicePortField,"=",this->port_,"\n");
|
||||
// absl::StrAppend(&s,kClusterField,"=",this->cluster_,"\n");
|
||||
// absl::StrAppend(&s,kIstioVersionField,"=",this->istio_version_,"\n");
|
||||
absl::StrAppend(&s, kIpField, "=", host, "\n"); |
||||
absl::StrAppend(&s, kStatusCodeField, "=", std::to_string(200), "\n"); |
||||
absl::StrAppend(&s, kHostnameField, "=", this->hostname_, "\n"); |
||||
absl::StrAppend(&s, "Echo=", request->message(), "\n"); |
||||
gpr_log(GPR_ERROR, "here"); |
||||
response->set_message(s); |
||||
gpr_log(GPR_ERROR, "here"); |
||||
gpr_log(GPR_INFO, "Echo response:\n%s", s.c_str()); |
||||
return Status::OK; |
||||
} |
||||
|
||||
Status EchoTestServiceImpl::ForwardEcho(ServerContext* /*context*/, |
||||
const ForwardEchoRequest* request, |
||||
ForwardEchoResponse* response) { |
||||
std::string raw_url = request->url(); |
||||
size_t colon = raw_url.find_first_of(':'); |
||||
if (colon != std::string::npos) { |
||||
std::string scheme = raw_url.substr(0, colon); |
||||
if (scheme != "grpc") { |
||||
gpr_log(GPR_ERROR, "Protocol %s not supported", scheme.c_str()); |
||||
return Status( |
||||
StatusCode::UNIMPLEMENTED, |
||||
absl::StrFormat("Protocol %s not supported", scheme.c_str())); |
||||
} |
||||
} |
||||
// May need to use xds security if urlScheme is "xds"
|
||||
absl::string_view address = absl::StripPrefix(raw_url, "grpc://"); |
||||
gpr_log(GPR_INFO, "Creating channel to %s", std::string(address).c_str()); |
||||
auto channel = |
||||
CreateChannel(std::string(address), InsecureChannelCredentials()); |
||||
auto stub = EchoTestService::NewStub(channel); |
||||
auto count = request->count() == 0 ? 1 : request->count(); |
||||
// Calculate the amount of time to sleep after each call.
|
||||
std::chrono::duration<double> duration_per_query = |
||||
std::chrono::nanoseconds::zero(); |
||||
if (request->qps() > 0) { |
||||
duration_per_query = |
||||
std::chrono::nanoseconds(std::chrono::seconds(1)) / request->qps(); |
||||
} |
||||
std::vector<EchoCall> calls(count); |
||||
EchoRequest echo_request; |
||||
echo_request.set_message(request->message()); |
||||
absl::BlockingCounter counter(count); |
||||
for (int i = 0; i < count; ++i) { |
||||
calls[i].context.AddMetadata(std::string(kRequestIdField), |
||||
std::to_string(i)); |
||||
for (const auto& header : request->headers()) { |
||||
if (header.key() != kHostField) { |
||||
calls[i].context.AddMetadata(header.key(), header.value()); |
||||
} |
||||
} |
||||
std::chrono::system_clock::time_point deadline = |
||||
std::chrono::system_clock::now() + |
||||
std::chrono::microseconds(request->timeout_micros()); |
||||
calls[i].context.set_deadline(deadline); |
||||
stub->async()->Echo(&calls[i].context, &echo_request, &calls[i].response, |
||||
[&, index = i](Status s) { |
||||
calls[index].status = s; |
||||
counter.DecrementCount(); |
||||
}); |
||||
std::this_thread::sleep_for(duration_per_query); |
||||
} |
||||
// Wait for all calls to be done.
|
||||
counter.Wait(); |
||||
for (int i = 0; i < count; ++i) { |
||||
if (calls[i].status.ok()) { |
||||
std::string body; |
||||
// The test infrastructure might expect the entire struct instead of
|
||||
// just the message.
|
||||
absl::StrAppend(&body, absl::StrFormat("[%d] grpcecho.Echo(%s)\n", i, |
||||
request->message())); |
||||
auto contents = |
||||
absl::StrSplit(calls[i].response.message(), '\n', absl::SkipEmpty()); |
||||
for (const auto& line : contents) { |
||||
absl::StrAppend(&body, absl::StrFormat("[%d body] %s\n", i, line)); |
||||
} |
||||
response->add_output(body); |
||||
gpr_log(GPR_INFO, "Forward Echo response:%d\n%s", i, body.c_str()); |
||||
} else { |
||||
gpr_log(GPR_ERROR, "RPC %d failed %d: %s", i, |
||||
calls[i].status.error_code(), |
||||
calls[i].status.error_message().c_str()); |
||||
} |
||||
} |
||||
return Status::OK; |
||||
} |
||||
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
@ -0,0 +1,50 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_TEST_CPP_INTEROP_ISTIO_ECHO_SERVER_LIB_H |
||||
#define GRPC_TEST_CPP_INTEROP_ISTIO_ECHO_SERVER_LIB_H |
||||
|
||||
#include "src/proto/grpc/testing/istio_echo.grpc.pb.h" |
||||
|
||||
namespace grpc { |
||||
namespace testing { |
||||
|
||||
class EchoTestServiceImpl : public proto::EchoTestService::Service { |
||||
public: |
||||
explicit EchoTestServiceImpl(const std::string& hostname) |
||||
: hostname_(hostname) {} |
||||
|
||||
grpc::Status Echo(grpc::ServerContext* context, |
||||
const proto::EchoRequest* request, |
||||
proto::EchoResponse* response) override; |
||||
|
||||
grpc::Status ForwardEcho(grpc::ServerContext* /*context*/, |
||||
const proto::ForwardEchoRequest* request, |
||||
proto::ForwardEchoResponse* response) override; |
||||
|
||||
private: |
||||
std::string hostname_; |
||||
// The following fields are not set yet. But we may need them later.
|
||||
// int port_;
|
||||
// std::string version_;
|
||||
// std::string cluster_;
|
||||
// std::string istio_version_;
|
||||
}; |
||||
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPC_TEST_CPP_INTEROP_ISTIO_ECHO_SERVER_LIB_H
|
@ -0,0 +1,115 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <memory> |
||||
|
||||
#include <gmock/gmock.h> |
||||
#include <gtest/gtest.h> |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/str_format.h" |
||||
|
||||
#include <grpcpp/create_channel.h> |
||||
#include <grpcpp/security/credentials.h> |
||||
#include <grpcpp/server_builder.h> |
||||
|
||||
#include "src/core/lib/gprpp/host_port.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
#include "test/cpp/interop/istio_echo_server_lib.h" |
||||
|
||||
namespace grpc { |
||||
namespace testing { |
||||
namespace { |
||||
|
||||
using proto::EchoRequest; |
||||
using proto::EchoResponse; |
||||
using proto::EchoTestService; |
||||
using proto::ForwardEchoRequest; |
||||
using proto::ForwardEchoResponse; |
||||
|
||||
class EchoTest : public ::testing::Test { |
||||
protected: |
||||
EchoTest() : echo_test_service_impl_("hostname") { |
||||
ServerBuilder builder; |
||||
builder.RegisterService(&echo_test_service_impl_); |
||||
int port = grpc_pick_unused_port_or_die(); |
||||
server_address_ = grpc_core::JoinHostPort("localhost", port); |
||||
builder.AddListeningPort(grpc_core::JoinHostPort("localhost", port), |
||||
InsecureServerCredentials()); |
||||
server_ = builder.BuildAndStart(); |
||||
auto channel = CreateChannel(server_address_, InsecureChannelCredentials()); |
||||
stub_ = EchoTestService::NewStub(channel); |
||||
} |
||||
|
||||
EchoTestServiceImpl echo_test_service_impl_; |
||||
std::string server_address_; |
||||
std::unique_ptr<Server> server_; |
||||
std::unique_ptr<EchoTestService::Stub> stub_; |
||||
}; |
||||
|
||||
TEST_F(EchoTest, SimpleEchoTest) { |
||||
ClientContext context; |
||||
EchoRequest request; |
||||
EchoResponse response; |
||||
request.set_message("hello"); |
||||
auto status = stub_->Echo(&context, request, &response); |
||||
EXPECT_TRUE(status.ok()); |
||||
EXPECT_THAT(response.message(), |
||||
::testing::AllOf(::testing::HasSubstr("StatusCode=200\n"), |
||||
::testing::HasSubstr("Hostname=hostname\n"), |
||||
::testing::HasSubstr("Echo=hello\n"), |
||||
::testing::HasSubstr("Host="), |
||||
::testing::HasSubstr("IP="))); |
||||
} |
||||
|
||||
TEST_F(EchoTest, ForwardEchoTest) { |
||||
ClientContext context; |
||||
ForwardEchoRequest request; |
||||
ForwardEchoResponse response; |
||||
request.set_count(3); |
||||
request.set_qps(1); |
||||
request.set_timeout_micros(20 * 1000 * 1000); // 20 seconds
|
||||
request.set_url(absl::StrCat("grpc://", server_address_)); |
||||
request.set_message("hello"); |
||||
auto status = stub_->ForwardEcho(&context, request, &response); |
||||
EXPECT_TRUE(status.ok()); |
||||
for (int i = 0; i < 3; ++i) { |
||||
EXPECT_THAT( |
||||
response.output()[i], |
||||
::testing::AllOf( |
||||
::testing::HasSubstr( |
||||
absl::StrFormat("[%d body] StatusCode=200\n", i)), |
||||
::testing::HasSubstr( |
||||
absl::StrFormat("[%d body] Hostname=hostname\n", i)), |
||||
::testing::HasSubstr(absl::StrFormat("[%d body] Echo=hello\n", i)), |
||||
::testing::HasSubstr(absl::StrFormat("[%d body] Host=", i)), |
||||
::testing::HasSubstr(absl::StrFormat("[%d body] IP=", i)))); |
||||
} |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
grpc_init(); |
||||
auto result = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return result; |
||||
} |
@ -1,338 +0,0 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <algorithm> |
||||
#include <atomic> |
||||
#include <chrono> |
||||
#include <condition_variable> |
||||
#include <deque> |
||||
#include <map> |
||||
#include <mutex> |
||||
#include <set> |
||||
#include <sstream> |
||||
#include <string> |
||||
#include <thread> |
||||
#include <vector> |
||||
|
||||
#include "absl/algorithm/container.h" |
||||
#include "absl/flags/flag.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/strings/str_join.h" |
||||
#include "absl/strings/str_split.h" |
||||
|
||||
#include <grpcpp/ext/admin_services.h> |
||||
#include <grpcpp/ext/proto_server_reflection_plugin.h> |
||||
#include <grpcpp/grpcpp.h> |
||||
#include <grpcpp/server.h> |
||||
#include <grpcpp/server_builder.h> |
||||
#include <grpcpp/server_context.h> |
||||
|
||||
#include "src/core/lib/channel/status_util.h" |
||||
#include "src/core/lib/gpr/env.h" |
||||
#include "src/core/lib/iomgr/gethostname.h" |
||||
#include "src/proto/grpc/testing/istio_echo.grpc.pb.h" |
||||
#include "src/proto/grpc/testing/istio_echo.pb.h" |
||||
#include "test/core/util/test_config.h" |
||||
#include "test/cpp/util/test_config.h" |
||||
|
||||
// A list of ports to listen on, for gRPC traffic.
|
||||
ABSL_FLAG(std::vector<std::string>, grpc, std::vector<std::string>({"7070"}), |
||||
"GRPC ports"); |
||||
|
||||
// The following flags must be defined, but are not used for now. Some may be
|
||||
// necessary for certain tests.
|
||||
ABSL_FLAG(std::vector<std::string>, port, std::vector<std::string>({"8080"}), |
||||
"HTTP/1.1 ports"); |
||||
ABSL_FLAG(std::vector<std::string>, tcp, std::vector<std::string>({"9090"}), |
||||
"TCP ports"); |
||||
ABSL_FLAG(std::vector<std::string>, tls, std::vector<std::string>({""}), |
||||
"Ports that are using TLS. These must be defined as http/grpc/tcp."); |
||||
ABSL_FLAG(std::vector<std::string>, bind_ip, std::vector<std::string>({""}), |
||||
"Ports that are bound to INSTANCE_IP rather than wildcard IP."); |
||||
ABSL_FLAG(std::vector<std::string>, bind_localhost, |
||||
std::vector<std::string>({""}), |
||||
"Ports that are bound to localhost rather than wildcard IP."); |
||||
ABSL_FLAG(std::vector<std::string>, server_first, |
||||
std::vector<std::string>({""}), |
||||
"Ports that are server first. These must be defined as tcp."); |
||||
ABSL_FLAG(std::vector<std::string>, xds_grpc_server, |
||||
std::vector<std::string>({""}), |
||||
"Ports that should rely on XDS configuration to serve"); |
||||
ABSL_FLAG(std::string, metrics, "", "Metrics port"); |
||||
ABSL_FLAG(std::string, uds, "", "HTTP server on unix domain socket"); |
||||
ABSL_FLAG(std::string, cluster, "", "Cluster where this server is deployed"); |
||||
ABSL_FLAG(std::string, crt, "", "gRPC TLS server-side certificate"); |
||||
ABSL_FLAG(std::string, key, "", "gRPC TLS server-side key"); |
||||
ABSL_FLAG(std::string, istio_version, "", "Istio sidecar version"); |
||||
ABSL_FLAG(std::string, disable_alpn, "", "disable ALPN negotiation"); |
||||
|
||||
using grpc::Channel; |
||||
using grpc::ClientAsyncResponseReader; |
||||
using grpc::ClientContext; |
||||
using grpc::CompletionQueue; |
||||
using grpc::Server; |
||||
using grpc::ServerBuilder; |
||||
using grpc::ServerContext; |
||||
using grpc::Status; |
||||
|
||||
using proto::EchoRequest; |
||||
using proto::EchoResponse; |
||||
using proto::EchoTestService; |
||||
using proto::ForwardEchoRequest; |
||||
using proto::ForwardEchoResponse; |
||||
|
||||
const std::string host_key = "Host"; |
||||
const std::string request_id_field = "X-Request-Id"; |
||||
const std::string service_version_field = "ServiceVersion"; |
||||
const std::string service_port_field = "ServicePort"; |
||||
const std::string status_code_field = "StatusCode"; |
||||
const std::string url_field = "URL"; |
||||
const std::string host_field = "Host"; |
||||
const std::string hostname_field = "Hostname"; |
||||
const std::string method_field = "Method"; |
||||
const std::string response_header = "ResponseHeader"; |
||||
const std::string cluster_field = "Cluster"; |
||||
const std::string istio_version_field = "IstioVersion"; |
||||
const std::string ip_field = "IP"; // The Requester’s IP Address.
|
||||
|
||||
class EchoTestServiceImpl : public EchoTestService::Service { |
||||
public: |
||||
explicit EchoTestServiceImpl(const std::string& hostname) |
||||
: hostname_(hostname) {} |
||||
|
||||
Status Echo(ServerContext* context, const EchoRequest* request, |
||||
EchoResponse* response) override { |
||||
std::string s = ""; |
||||
const std::multimap<grpc::string_ref, grpc::string_ref> metadata = |
||||
context->client_metadata(); |
||||
for (const auto& kv : metadata) { |
||||
// Skip all binary headers.
|
||||
size_t isbin = kv.first.find("-bin"); |
||||
if ((isbin != std::string::npos) && (isbin + 4 == kv.first.size())) { |
||||
continue; |
||||
} |
||||
if (kv.first == ":authority") { |
||||
absl::StrAppend(&s, host_key, "=", kv.second.data(), "\n"); |
||||
} else { |
||||
absl::StrAppend(&s, kv.first.data(), "=", kv.second.data(), "\n"); |
||||
} |
||||
} |
||||
std::string peer = context->peer(); |
||||
size_t colon = peer.find_first_of(':'); |
||||
std::string ip = peer.substr(0, colon); |
||||
|
||||
// This is not a complete list, but also not all fields are used. May
|
||||
// need to add/remove fields later, if required by tests. Only keep the
|
||||
// fields needed for now.
|
||||
//
|
||||
// absl::StrAppend(&s,service_version_field,"=",this->version_,"\n");
|
||||
// absl::StrAppend(&s,service_port_field,"=",this->port_,"\n");
|
||||
// absl::StrAppend(&s,cluster_field,"=",this->cluster_,"\n");
|
||||
// absl::StrAppend(&s,istio_version_field,"=",this->istio_version_,"\n");
|
||||
absl::StrAppend(&s, ip_field, "=", ip, "\n"); |
||||
absl::StrAppend(&s, status_code_field, "=", std::to_string(200), "\n"); |
||||
absl::StrAppend(&s, hostname_field, "=", this->hostname_, "\n"); |
||||
absl::StrAppend(&s, "Echo=", request->message(), "\n"); |
||||
response->set_message(s); |
||||
return Status::OK; |
||||
} |
||||
|
||||
Status ForwardEcho(ServerContext* /*context*/, |
||||
const ForwardEchoRequest* request, |
||||
ForwardEchoResponse* response) override { |
||||
std::string rawUrl = request->url(); |
||||
size_t colon = rawUrl.find_first_of(':'); |
||||
std::string urlScheme = rawUrl.substr(0, colon); |
||||
// May need to use xds security if urlScheme is "xds"
|
||||
std::string address = rawUrl; |
||||
if (urlScheme == "grpc") { |
||||
address = rawUrl.substr(strlen("grpc://"), std::string::npos); |
||||
} |
||||
std::shared_ptr<Channel> channel = |
||||
grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); |
||||
std::unique_ptr<EchoTestService::Stub> stub_ = |
||||
EchoTestService::NewStub(channel); |
||||
CompletionQueue cq_; |
||||
|
||||
auto count = request->count() == 0 ? 1 : request->count(); |
||||
std::vector<std::string> responses_(count); |
||||
std::thread thread_ = std::thread(&EchoTestServiceImpl::AsyncCompleteRpc, |
||||
this, &cq_, count, &responses_); |
||||
std::chrono::duration<double> elapsed; |
||||
std::chrono::duration<double> duration_per_query = |
||||
std::chrono::nanoseconds::zero(); |
||||
if (request->qps() > 0) { |
||||
duration_per_query = |
||||
std::chrono::nanoseconds(std::chrono::seconds(1)) / request->qps(); |
||||
} |
||||
std::chrono::time_point<std::chrono::system_clock> start = |
||||
std::chrono::system_clock::now(); |
||||
for (int i = 0; i < count;) { |
||||
elapsed = std::chrono::system_clock::now() - start; |
||||
if (elapsed > duration_per_query) { |
||||
start = std::chrono::system_clock::now(); |
||||
// Send the request.
|
||||
EchoCall* call = new EchoCall; |
||||
std::chrono::system_clock::time_point deadline = |
||||
std::chrono::system_clock::now() + |
||||
std::chrono::microseconds(request->timeout_micros()); |
||||
call->context.set_deadline(deadline); |
||||
for (const auto& data : request->headers()) { |
||||
if (data.key() != host_key) { |
||||
call->context.AddMetadata(data.key(), data.value()); |
||||
} |
||||
} |
||||
call->context.AddMetadata("x-request-id", std::to_string(i)); |
||||
EchoRequest echo_request; |
||||
echo_request.set_message(request->message()); |
||||
call->r_id = i; |
||||
call->request = echo_request; |
||||
call->response_reader = |
||||
stub_->PrepareAsyncEcho(&call->context, echo_request, &cq_); |
||||
call->response_reader->StartCall(); |
||||
call->response_reader->Finish(&call->reply, &call->status, |
||||
static_cast<void*>(call)); |
||||
++i; |
||||
} |
||||
} |
||||
thread_.join(); |
||||
for (const auto& r : responses_) { |
||||
response->add_output(r); |
||||
} |
||||
return Status::OK; |
||||
} |
||||
|
||||
void AsyncCompleteRpc(CompletionQueue* cq_, int count, |
||||
std::vector<std::string>* responses_) { |
||||
void* got_tag; |
||||
bool ok = false; |
||||
for (int i = 0; i < count && cq_->Next(&got_tag, &ok);) { |
||||
++i; |
||||
EchoCall* call = static_cast<EchoCall*>(got_tag); |
||||
GPR_ASSERT(ok); |
||||
std::string s; |
||||
if (call->status.ok()) { |
||||
absl::StrAppend(&s, "[", call->r_id, "] grpcecho.Echo(", |
||||
call->request.message(), ")\n"); |
||||
std::stringstream resp_ss(call->reply.message()); |
||||
std::string line; |
||||
while (std::getline(resp_ss, line, '\n')) { |
||||
absl::StrAppend(&s, "[", call->r_id, " body]\n"); |
||||
} |
||||
responses_->at(call->r_id) = s; |
||||
} else { |
||||
gpr_log(GPR_DEBUG, "RPC failed %d: %s", call->status.error_code(), |
||||
call->status.error_message().c_str()); |
||||
} |
||||
delete call; |
||||
} |
||||
} |
||||
|
||||
private: |
||||
struct EchoCall { |
||||
int r_id; // The index of this call (5 for the 5th request sent).
|
||||
EchoRequest request; |
||||
EchoResponse reply; |
||||
ClientContext context; |
||||
Status status; |
||||
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader; |
||||
}; |
||||
std::string hostname_; |
||||
// The following fields are not set yet. But we may need them later.
|
||||
// int port_;
|
||||
// std::string version_;
|
||||
// std::string cluster_;
|
||||
// std::string istio_version_;
|
||||
}; |
||||
|
||||
/*std::vector<std::unique_ptr<grpc::Server>>*/ |
||||
void RunServer(std::vector<int> ports) { |
||||
std::string hostname; |
||||
char* hostname_p = grpc_gethostname(); |
||||
if (hostname_p == nullptr) { |
||||
hostname = absl::StrFormat("generated-%d", rand() % 1000); |
||||
} else { |
||||
hostname.assign(hostname_p); |
||||
} |
||||
EchoTestServiceImpl echo_test_service(hostname); |
||||
ServerBuilder builder; |
||||
builder.RegisterService(&echo_test_service); |
||||
for (int port : ports) { |
||||
std::ostringstream server_address; |
||||
server_address << "0.0.0.0:" << port; |
||||
builder.AddListeningPort(server_address.str(), |
||||
grpc::InsecureServerCredentials()); |
||||
gpr_log(GPR_DEBUG, "Server listening on %s", server_address.str().c_str()); |
||||
} |
||||
|
||||
// 3333 is the magic port that the istio testing for k8s health checks. And
|
||||
// it only needs TCP. So also make the gRPC server to listen on 3333.
|
||||
std::ostringstream server_address_3333; |
||||
server_address_3333 << "0.0.0.0:" << 3333; |
||||
builder.AddListeningPort(server_address_3333.str(), |
||||
grpc::InsecureServerCredentials()); |
||||
std::unique_ptr<Server> server(builder.BuildAndStart()); |
||||
server->Wait(); |
||||
} |
||||
|
||||
int main(int argc, char** argv) { |
||||
// Preprocess argv, for two things:
|
||||
// 1. merge duplciate flags. So "--grpc=8080 --grpc=9090" becomes
|
||||
// "--grpc=8080,9090".
|
||||
// 2. replace '-' to '_'. So "--istio-veriosn=123" becomes
|
||||
// "--istio_version=123".
|
||||
std::map<std::string, std::vector<std::string>> argv_dict; |
||||
for (int i = 0; i < argc; i++) { |
||||
std::string arg(argv[i]); |
||||
size_t equal = arg.find_first_of('='); |
||||
if (equal != std::string::npos) { |
||||
std::string f = arg.substr(0, equal); |
||||
std::string v = arg.substr(equal + 1, std::string::npos); |
||||
argv_dict[f].push_back(v); |
||||
} |
||||
} |
||||
|
||||
std::vector<char*> new_argv_strs; |
||||
// Keep the command itself.
|
||||
new_argv_strs.push_back(argv[0]); |
||||
for (const auto& kv : argv_dict) { |
||||
std::string values; |
||||
for (const auto& s : kv.second) { |
||||
if (!values.empty()) values += ","; |
||||
values += s; |
||||
} |
||||
// replace '-' to '_', excluding the leading "--".
|
||||
std::string f = kv.first; |
||||
std::replace(f.begin() + 2, f.end(), '-', '_'); |
||||
std::string k_vs = absl::StrCat(f, "=", values); |
||||
char* writable = new char[k_vs.size() + 1]; |
||||
std::copy(k_vs.begin(), k_vs.end(), writable); |
||||
writable[k_vs.size()] = '\0'; |
||||
new_argv_strs.push_back(writable); |
||||
} |
||||
int new_argc = new_argv_strs.size(); |
||||
char** new_argv = new_argv_strs.data(); |
||||
grpc::testing::TestEnvironment env(&new_argc, new_argv); |
||||
grpc::testing::InitTest(&new_argc, &new_argv, true); |
||||
// Turn gRPC ports from a string vector to an int vector.
|
||||
std::vector<int> grpc_ports; |
||||
for (const auto& p : absl::GetFlag(FLAGS_grpc)) { |
||||
int grpc_port = std::stoi(p); |
||||
grpc_ports.push_back(grpc_port); |
||||
} |
||||
RunServer(grpc_ports); |
||||
return 0; |
||||
} |
Loading…
Reference in new issue