testing: add a fake TCP and UDP server to test/core/util (extracted from #27883) (#28332)

* add a fake TCP and UDP server to test/core/util
pull/28351/head
apolcyn 3 years ago committed by GitHub
parent 32770542b2
commit 2a4b7f25e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      CMakeLists.txt
  2. 6
      build_autogenerated.yaml
  3. 1
      test/core/tsi/alts/handshaker/BUILD
  4. 240
      test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc
  5. 16
      test/core/util/BUILD
  6. 269
      test/core/util/fake_udp_and_tcp_server.cc
  7. 137
      test/core/util/fake_udp_and_tcp_server.h
  8. 14
      test/cpp/naming/BUILD
  9. 18
      test/cpp/naming/cancel_ares_query_test.cc
  10. 101
      test/cpp/naming/dns_test_util.cc
  11. 38
      test/cpp/naming/dns_test_util.h
  12. 2
      test/cpp/naming/generate_resolver_component_tests.bzl
  13. 16
      test/cpp/naming/resolver_component_test.cc

3
CMakeLists.txt generated

@ -7829,6 +7829,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
test/core/end2end/cq_verifier.cc
test/core/tsi/alts/fake_handshaker/fake_handshaker_server.cc
test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc
test/core/util/fake_udp_and_tcp_server.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
@ -8620,8 +8621,8 @@ if(gRPC_BUILD_TESTS)
add_executable(cancel_ares_query_test
test/core/end2end/cq_verifier.cc
test/core/util/fake_udp_and_tcp_server.cc
test/cpp/naming/cancel_ares_query_test.cc
test/cpp/naming/dns_test_util.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)

@ -4531,12 +4531,14 @@ targets:
headers:
- test/core/end2end/cq_verifier.h
- test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h
- test/core/util/fake_udp_and_tcp_server.h
src:
- test/core/tsi/alts/fake_handshaker/handshaker.proto
- test/core/tsi/alts/fake_handshaker/transport_security_common.proto
- test/core/end2end/cq_verifier.cc
- test/core/tsi/alts/fake_handshaker/fake_handshaker_server.cc
- test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc
- test/core/util/fake_udp_and_tcp_server.cc
deps:
- grpc++
- grpc_test_util
@ -4907,11 +4909,11 @@ targets:
language: c++
headers:
- test/core/end2end/cq_verifier.h
- test/cpp/naming/dns_test_util.h
- test/core/util/fake_udp_and_tcp_server.h
src:
- test/core/end2end/cq_verifier.cc
- test/core/util/fake_udp_and_tcp_server.cc
- test/cpp/naming/cancel_ares_query_test.cc
- test/cpp/naming/dns_test_util.cc
deps:
- grpc++_test_config
- grpc++_test_util

@ -96,6 +96,7 @@ grpc_cc_test(
"//:grpc",
"//test/core/end2end:cq_verifier",
"//test/core/tsi/alts/fake_handshaker:fake_handshaker_lib",
"//test/core/util:fake_udp_and_tcp_server",
"//test/core/util:grpc_test_util",
],
)

@ -56,6 +56,7 @@
#include "src/core/lib/slice/slice_string_helpers.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h"
#include "test/core/util/fake_udp_and_tcp_server.h"
#include "test/core/util/memory_counters.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@ -325,210 +326,6 @@ TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) {
}
}
class FakeTcpServer {
public:
enum ProcessReadResult {
CONTINUE_READING,
CLOSE_SOCKET,
};
enum class AcceptMode {
kWaitForClientToSendFirstBytes, // useful for emulating ALTS based
// grpc servers
kEagerlySendSettings, // useful for emulating insecure grpc servers (e.g.
// ALTS handshake servers)
};
explicit FakeTcpServer(
AcceptMode accept_mode,
const std::function<ProcessReadResult(int, int, int)>& process_read_cb)
: accept_mode_(accept_mode), process_read_cb_(process_read_cb) {
port_ = grpc_pick_unused_port_or_die();
accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
address_ = absl::StrCat("[::]:", port_);
GPR_ASSERT(accept_socket_ != -1);
if (accept_socket_ == -1) {
gpr_log(GPR_ERROR, "Failed to create socket: %d", errno);
abort();
}
int val = 1;
if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val,
sizeof(val)) != 0) {
gpr_log(GPR_ERROR,
"Failed to set SO_REUSEADDR on socket bound to [::1]:%d : %d",
port_, errno);
abort();
}
if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", errno);
abort();
}
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port_);
(reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),
sizeof(addr)) != 0) {
gpr_log(GPR_ERROR, "Failed to bind socket to [::1]:%d : %d", port_,
errno);
abort();
}
if (listen(accept_socket_, 100)) {
gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d",
port_, errno);
abort();
}
gpr_event_init(&stop_ev_);
run_server_loop_thd_ = absl::make_unique<std::thread>(RunServerLoop, this);
}
~FakeTcpServer() {
gpr_log(GPR_DEBUG,
"FakeTcpServer stop and "
"join server thread");
gpr_event_set(&stop_ev_, reinterpret_cast<void*>(1));
run_server_loop_thd_->join();
gpr_log(GPR_DEBUG,
"FakeTcpServer join server "
"thread complete");
}
const char* address() { return address_.c_str(); }
static ProcessReadResult CloseSocketUponReceivingBytesFromPeer(
int bytes_received_size, int read_error, int s) {
if (bytes_received_size < 0 && read_error != EAGAIN &&
read_error != EWOULDBLOCK) {
gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
errno);
abort();
}
if (bytes_received_size >= 0) {
gpr_log(GPR_DEBUG,
"Fake TCP server received %d bytes from peer socket: %d. Close "
"the "
"connection.",
bytes_received_size, s);
return CLOSE_SOCKET;
}
return CONTINUE_READING;
}
static ProcessReadResult CloseSocketUponCloseFromPeer(int bytes_received_size,
int read_error, int s) {
if (bytes_received_size < 0 && read_error != EAGAIN &&
read_error != EWOULDBLOCK) {
gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
errno);
abort();
}
if (bytes_received_size == 0) {
// The peer has shut down the connection.
gpr_log(GPR_DEBUG,
"Fake TCP server received 0 bytes from peer socket: %d. Close "
"the "
"connection.",
s);
return CLOSE_SOCKET;
}
return CONTINUE_READING;
}
class FakeTcpServerPeer {
public:
explicit FakeTcpServerPeer(int fd) : fd_(fd) {}
~FakeTcpServerPeer() { close(fd_); }
void MaybeContinueSendingSettings() {
// https://tools.ietf.org/html/rfc7540#section-4.1
const std::vector<uint8_t> kEmptyHttp2SettingsFrame = {
0x00, 0x00, 0x00, // length
0x04, // settings type
0x00, // flags
0x00, 0x00, 0x00, 0x00 // stream identifier
};
if (total_bytes_sent_ < int(kEmptyHttp2SettingsFrame.size())) {
int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_;
int bytes_sent =
send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_,
bytes_to_send, 0);
if (bytes_sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
gpr_log(GPR_ERROR,
"Fake TCP server encountered unexpected error:%d |%s| "
"sending %d bytes on fd:%d",
errno, strerror(errno), bytes_to_send, fd_);
GPR_ASSERT(0);
} else if (bytes_sent > 0) {
total_bytes_sent_ += bytes_sent;
GPR_ASSERT(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size()));
}
}
}
int fd() { return fd_; }
private:
int fd_;
int total_bytes_sent_ = 0;
};
// Run a loop that periodically, every 10 ms:
// 1) Checks if there are any new TCP connections to accept.
// 2) Checks if any data has arrived yet on established connections,
// and reads from them if so, processing the sockets as configured.
static void RunServerLoop(FakeTcpServer* self) {
std::set<std::unique_ptr<FakeTcpServerPeer>> peers;
while (!gpr_event_get(&self->stop_ev_)) {
int p = accept(self->accept_socket_, nullptr, nullptr);
if (p == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
gpr_log(GPR_ERROR, "Failed to accept connection: %d", errno);
abort();
}
if (p != -1) {
gpr_log(GPR_DEBUG, "accepted peer socket: %d", p);
if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
gpr_log(GPR_ERROR,
"Failed to set O_NONBLOCK on peer socket:%d errno:%d", p,
errno);
abort();
}
peers.insert(absl::make_unique<FakeTcpServerPeer>(p));
}
auto it = peers.begin();
while (it != peers.end()) {
FakeTcpServerPeer* peer = (*it).get();
if (self->accept_mode_ == AcceptMode::kEagerlySendSettings) {
peer->MaybeContinueSendingSettings();
}
char buf[100];
int bytes_received_size = recv(peer->fd(), buf, 100, 0);
ProcessReadResult r =
self->process_read_cb_(bytes_received_size, errno, peer->fd());
if (r == CLOSE_SOCKET) {
it = peers.erase(it);
} else {
GPR_ASSERT(r == CONTINUE_READING);
it++;
}
}
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(10, GPR_TIMESPAN)));
}
close(self->accept_socket_);
}
private:
int accept_socket_;
int port_;
gpr_event stop_ev_;
std::string address_;
std::unique_ptr<std::thread> run_server_loop_thd_;
const AcceptMode accept_mode_;
std::function<ProcessReadResult(int, int, int)> process_read_cb_;
};
/* This test is intended to make sure that ALTS handshakes we correctly
* fail fast when the security handshaker gets an error while reading
* from the remote peer, after having earlier sent the first bytes of the
@ -547,9 +344,11 @@ TEST(AltsConcurrentConnectivityTest,
false /* check num concurrent rpcs */);
// The fake_backend_server emulates a secure (ALTS based) gRPC backend. So
// it waits for the client to send the first bytes.
FakeTcpServer fake_backend_server(
FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes,
FakeTcpServer::CloseSocketUponReceivingBytesFromPeer);
grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::
CloseSocketUponReceivingBytesFromPeer);
{
gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
@ -579,14 +378,16 @@ TEST(AltsConcurrentConnectivityTest,
TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) {
// The fake_handshake_server emulates a broken ALTS handshaker, which
// is an insecure server. So send settings to the client eagerly.
FakeTcpServer fake_handshake_server(
FakeTcpServer::AcceptMode::kEagerlySendSettings,
FakeTcpServer::CloseSocketUponReceivingBytesFromPeer);
grpc_core::testing::FakeUdpAndTcpServer fake_handshake_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::kEagerlySendSettings,
grpc_core::testing::FakeUdpAndTcpServer::
CloseSocketUponReceivingBytesFromPeer);
// The fake_backend_server emulates a secure (ALTS based) server, so wait
// for the client to send the first bytes.
FakeTcpServer fake_backend_server(
FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes,
FakeTcpServer::CloseSocketUponCloseFromPeer);
grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
{
gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;
@ -617,14 +418,15 @@ TEST(AltsConcurrentConnectivityTest,
TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) {
// fake_handshake_server emulates an insecure server, so send settings first.
// It will be unresponsive for the rest of the connection, though.
FakeTcpServer fake_handshake_server(
FakeTcpServer::AcceptMode::kEagerlySendSettings,
FakeTcpServer::CloseSocketUponCloseFromPeer);
grpc_core::testing::FakeUdpAndTcpServer fake_handshake_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::kEagerlySendSettings,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
// fake_backend_server emulates an ALTS based server, so wait for the client
// to send the first bytes.
FakeTcpServer fake_backend_server(
FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes,
FakeTcpServer::CloseSocketUponCloseFromPeer);
grpc_core::testing::FakeUdpAndTcpServer fake_backend_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
{
gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20);
std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners;

@ -208,6 +208,22 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "fake_udp_and_tcp_server",
srcs = ["fake_udp_and_tcp_server.cc"],
hdrs = ["fake_udp_and_tcp_server.h"],
external_deps = [
"absl/strings",
"absl/strings:str_format",
],
language = "C++",
deps = [
":grpc_test_util",
"//:gpr",
"//:grpc",
],
)
grpc_cc_library(
name = "build",
srcs = ["build.cc"],

@ -0,0 +1,269 @@
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "fake_udp_and_tcp_server.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "src/core/lib/event_engine/sockaddr.h"
#include "test/core/util/port.h"
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/sockaddr_windows.h"
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_windows.h"
#define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
#define CLOSE_SOCKET closesocket
#else
#include <fcntl.h>
#include "src/core/lib/iomgr/sockaddr_posix.h"
#define BAD_SOCKET_RETURN_VAL (-1)
#define CLOSE_SOCKET close
#endif
namespace grpc_core {
namespace testing {
FakeUdpAndTcpServer::FakeUdpAndTcpServer(
AcceptMode accept_mode,
std::function<FakeUdpAndTcpServer::ProcessReadResult(int, int, int)>
process_read_cb)
: accept_mode_(accept_mode), process_read_cb_(std::move(process_read_cb)) {
port_ = grpc_pick_unused_port_or_die();
udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
if (udp_socket_ == BAD_SOCKET_RETURN_VAL) {
gpr_log(GPR_DEBUG, "Failed to create UDP ipv6 socket: %d", errno);
GPR_ASSERT(0);
}
accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
address_ = absl::StrCat("[::]:", port_);
if (accept_socket_ == BAD_SOCKET_RETURN_VAL) {
gpr_log(GPR_ERROR, "Failed to create TCP IPv6 socket: %d", errno);
GPR_ASSERT(0);
}
#ifdef GPR_WINDOWS
char val = 1;
if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
SOCKET_ERROR) {
gpr_log(GPR_DEBUG,
"Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d, "
"errno: %d",
port_, errno);
GPR_ASSERT(0);
}
grpc_error_handle set_non_block_error;
set_non_block_error = grpc_tcp_set_non_block(udp_socket_);
if (set_non_block_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
grpc_error_std_string(set_non_block_error).c_str());
GPR_ASSERT(0);
}
set_non_block_error = grpc_tcp_set_non_block(accept_socket_);
if (set_non_block_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
grpc_error_std_string(set_non_block_error).c_str());
GPR_ASSERT(0);
}
#else
int val = 1;
if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
0) {
gpr_log(GPR_DEBUG, "Failed to set SO_REUSEADDR on socket [::1]:%d", port_);
GPR_ASSERT(0);
}
if (fcntl(udp_socket_, F_SETFL, O_NONBLOCK) != 0) {
gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", errno);
GPR_ASSERT(0);
}
if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) {
gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", errno);
GPR_ASSERT(0);
}
#endif
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port_);
(reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
if (bind(udp_socket_, reinterpret_cast<const sockaddr*>(&addr),
sizeof(addr)) != 0) {
gpr_log(GPR_DEBUG, "Failed to bind UDP socket to [::1]:%d", port_);
GPR_ASSERT(0);
}
if (bind(accept_socket_, reinterpret_cast<const sockaddr*>(&addr),
sizeof(addr)) != 0) {
gpr_log(GPR_ERROR, "Failed to bind TCP socket to [::1]:%d : %d", port_,
errno);
GPR_ASSERT(0);
}
if (listen(accept_socket_, 100)) {
gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d",
port_, errno);
GPR_ASSERT(0);
}
gpr_event_init(&stop_ev_);
run_server_loop_thd_ = absl::make_unique<std::thread>(
std::bind(&FakeUdpAndTcpServer::RunServerLoop, this));
}
FakeUdpAndTcpServer::~FakeUdpAndTcpServer() {
gpr_log(GPR_DEBUG,
"FakeUdpAndTcpServer stop and "
"join server thread");
gpr_event_set(&stop_ev_, reinterpret_cast<void*>(1));
run_server_loop_thd_->join();
gpr_log(GPR_DEBUG,
"FakeUdpAndTcpServer join server "
"thread complete");
CLOSE_SOCKET(accept_socket_);
CLOSE_SOCKET(udp_socket_);
}
FakeUdpAndTcpServer::ProcessReadResult
FakeUdpAndTcpServer::CloseSocketUponReceivingBytesFromPeer(
int bytes_received_size, int read_error, int s) {
if (bytes_received_size < 0 && read_error != EAGAIN &&
read_error != EWOULDBLOCK) {
gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
errno);
GPR_ASSERT(0);
}
if (bytes_received_size >= 0) {
gpr_log(GPR_DEBUG,
"Fake TCP server received %d bytes from peer socket: %d. Close "
"the "
"connection.",
bytes_received_size, s);
return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
}
return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
}
FakeUdpAndTcpServer::ProcessReadResult
FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer(int bytes_received_size,
int read_error, int s) {
if (bytes_received_size < 0 && read_error != EAGAIN &&
read_error != EWOULDBLOCK) {
gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s,
errno);
GPR_ASSERT(0);
}
if (bytes_received_size == 0) {
// The peer has shut down the connection.
gpr_log(GPR_DEBUG,
"Fake TCP server received 0 bytes from peer socket: %d. Close "
"the "
"connection.",
s);
return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket;
}
return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading;
}
FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::FakeUdpAndTcpServerPeer(int fd)
: fd_(fd) {}
FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::~FakeUdpAndTcpServerPeer() {
CLOSE_SOCKET(fd_);
}
void FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::
MaybeContinueSendingSettings() {
// https://tools.ietf.org/html/rfc7540#section-4.1
const std::vector<char> kEmptyHttp2SettingsFrame = {
0x00, 0x00, 0x00, // length
0x04, // settings type
0x00, // flags
0x00, 0x00, 0x00, 0x00 // stream identifier
};
if (total_bytes_sent_ < int(kEmptyHttp2SettingsFrame.size())) {
int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_;
int bytes_sent =
send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_,
bytes_to_send, 0);
if (bytes_sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
gpr_log(GPR_ERROR,
"Fake TCP server encountered unexpected error:%d |%s| "
"sending %d bytes on fd:%d",
errno, strerror(errno), bytes_to_send, fd_);
GPR_ASSERT(0);
} else if (bytes_sent > 0) {
total_bytes_sent_ += bytes_sent;
GPR_ASSERT(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size()));
}
}
}
void FakeUdpAndTcpServer::ReadFromUdpSocket() {
char buf[100];
recvfrom(udp_socket_, buf, sizeof(buf), 0, nullptr, nullptr);
}
void FakeUdpAndTcpServer::RunServerLoop() {
std::set<std::unique_ptr<FakeUdpAndTcpServerPeer>> peers;
while (!gpr_event_get(&stop_ev_)) {
// handle TCP connections
int p = accept(accept_socket_, nullptr, nullptr);
if (p != BAD_SOCKET_RETURN_VAL) {
gpr_log(GPR_DEBUG, "accepted peer socket: %d", p);
#ifdef GPR_WINDOWS
grpc_error_handle set_non_block_error;
set_non_block_error = grpc_tcp_set_non_block(p);
if (set_non_block_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s",
grpc_error_std_string(set_non_block_error).c_str());
GPR_ASSERT(0);
}
#else
if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) {
gpr_log(GPR_ERROR, "Failed to configure non-blocking socket, errno: %d",
errno);
GPR_ASSERT(0);
}
#endif
peers.insert(absl::make_unique<FakeUdpAndTcpServerPeer>(p));
}
auto it = peers.begin();
while (it != peers.end()) {
FakeUdpAndTcpServerPeer* peer = (*it).get();
if (accept_mode_ == AcceptMode::kEagerlySendSettings) {
peer->MaybeContinueSendingSettings();
}
char buf[100];
int bytes_received_size = recv(peer->fd(), buf, 100, 0);
FakeUdpAndTcpServer::ProcessReadResult r =
process_read_cb_(bytes_received_size, errno, peer->fd());
if (r == FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket) {
it = peers.erase(it);
} else {
GPR_ASSERT(r ==
FakeUdpAndTcpServer::ProcessReadResult::kContinueReading);
it++;
}
}
// read from the UDP socket
ReadFromUdpSocket();
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(10, GPR_TIMESPAN)));
}
}
} // namespace testing
} // namespace grpc_core

@ -0,0 +1,137 @@
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <functional>
#include <set>
#include <string>
#include <thread>
#include "absl/memory/memory.h"
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
namespace grpc_core {
namespace testing {
// This class is used to simulate a variety of network conditions in
// unit tests.
//
// Note that resulting server only listens on the IPv6 loopback
// address, "[::1]". This is expected to be OK as all known gRPC unit test
// environments have this address available.
//
// As examples, this can be used to (but is not limited to) exercise
// the following cases:
//
// 1) DNS resolver's UDP requests experience packet loss:
//
// testing::FakeUdpAndTcpServer fake_dns_server(
// testing::FakeUdpAndTcpServer::AcceptMode::
// kWaitForClientToSendFirstBytes,
// testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
// auto server_uri = absl::StrFormat("dns:///[::]:%d/localhost:1234",
// fake_dns_server.port());
//
// 2) Server gets stuck while setting up a security handshake and client's
// security handshake times out (requires using secure channels):
//
// testing::FakeUdpAndTcpServer fake_server(
// testing::FakeUdpAndTcpServer::AcceptMode::
// kWaitForClientToSendFirstBytes,
// testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
// auto server_uri = absl::StrFormat("[::1]:%d", fake_server.port());
//
// 3) Client connections are immediately closed after sending the first bytes
// to an insecure server:
//
// testing::FakeUdpAndTcpServer fake_server(
// testing::FakeUdpAndTcpServer::AcceptMode::
// kEagerlySendSettings,
// testing::FakeUdpAndTcpServer::CloseSocketUponReceivingBytesFromPeer);
// auto server_uri = absl::StrFormat("[::1]:%d", fake_server.port());
//
class FakeUdpAndTcpServer {
public:
enum class ProcessReadResult {
kContinueReading = 0,
kCloseSocket,
};
enum class AcceptMode {
kWaitForClientToSendFirstBytes, // useful for emulating ALTS based
// grpc servers
kEagerlySendSettings, // useful for emulating insecure grpc servers (e.g.
// ALTS handshake servers)
};
explicit FakeUdpAndTcpServer(
AcceptMode accept_mode,
std::function<ProcessReadResult(int, int, int)> process_read_cb);
~FakeUdpAndTcpServer();
const char* address() { return address_.c_str(); }
int port() { return port_; };
static ProcessReadResult CloseSocketUponReceivingBytesFromPeer(
int bytes_received_size, int read_error, int s);
static ProcessReadResult CloseSocketUponCloseFromPeer(int bytes_received_size,
int read_error, int s);
void ReadFromUdpSocket();
// Run a loop that periodically, every 10 ms:
// 1) Checks if there are any new TCP connections to accept.
// 2) Checks if any data has arrived yet on established connections,
// and reads from them if so, processing the sockets as configured.
void RunServerLoop();
private:
class FakeUdpAndTcpServerPeer {
public:
explicit FakeUdpAndTcpServerPeer(int fd);
~FakeUdpAndTcpServerPeer();
void MaybeContinueSendingSettings();
int fd() { return fd_; }
private:
int fd_;
int total_bytes_sent_ = 0;
};
int accept_socket_;
int udp_socket_;
int port_;
gpr_event stop_ev_;
std::string address_;
std::unique_ptr<std::thread> run_server_loop_thd_;
const AcceptMode accept_mode_;
std::function<ProcessReadResult(int, int, int)> process_read_cb_;
};
} // namespace testing
} // namespace grpc_core

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_py_binary")
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_py_binary")
load(":generate_resolver_component_tests.bzl", "generate_resolver_component_tests")
package(
@ -39,25 +39,15 @@ grpc_cc_test(
srcs = ["cancel_ares_query_test.cc"],
external_deps = ["gtest"],
deps = [
":dns_test_util",
"//:gpr",
"//:grpc",
"//:grpc++",
"//test/core/end2end:cq_verifier",
"//test/core/util:fake_udp_and_tcp_server",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],
)
grpc_cc_library(
name = "dns_test_util",
srcs = ["dns_test_util.cc"],
hdrs = ["dns_test_util.h"],
deps = [
"//:gpr",
"//:grpc",
],
)
generate_resolver_component_tests()

@ -45,9 +45,9 @@
#include "src/core/lib/iomgr/work_serializer.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/cmdline.h"
#include "test/core/util/fake_udp_and_tcp_server.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/naming/dns_test_util.h"
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/sockaddr_windows.h"
@ -154,11 +154,13 @@ class AssertFailureResultHandler : public grpc_core::Resolver::ResultHandler {
};
void TestCancelActiveDNSQuery(ArgsStruct* args) {
int fake_dns_port = grpc_pick_unused_port_or_die();
grpc::testing::FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port);
grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
std::string client_target = absl::StrFormat(
"dns://[::1]:%d/dont-care-since-wont-be-resolved.test.com:1234",
fake_dns_port);
fake_dns_server.port());
// create resolver and resolve
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
grpc_core::ResolverRegistry::CreateResolver(
@ -274,12 +276,14 @@ typedef enum {
void TestCancelDuringActiveQuery(
cancellation_test_query_timeout_setting query_timeout_setting) {
// Start up fake non responsive DNS server
int fake_dns_port = grpc_pick_unused_port_or_die();
grpc::testing::FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port);
grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
// Create a call that will try to use the fake DNS server
std::string name = "dont-care-since-wont-be-resolved.test.com:1234";
std::string client_target =
absl::StrFormat("dns://[::1]:%d/%s", fake_dns_port, name);
absl::StrFormat("dns://[::1]:%d/%s", fake_dns_server.port(), name);
gpr_log(GPR_DEBUG, "TestCancelActiveDNSQuery. query timeout setting: %d",
query_timeout_setting);
grpc_channel_args* client_args = nullptr;

@ -1,101 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test/cpp/naming/dns_test_util.h"
#include <stdio.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/sockaddr.h"
#ifdef GPR_WINDOWS
#include "src/core/lib/iomgr/sockaddr_windows.h"
#include "src/core/lib/iomgr/socket_windows.h"
#define BAD_SOCKET_RETURN_VAL INVALID_SOCKET
#else
#include "src/core/lib/iomgr/sockaddr_posix.h"
#define BAD_SOCKET_RETURN_VAL (-1)
#endif
namespace grpc {
namespace testing {
FakeNonResponsiveDNSServer::FakeNonResponsiveDNSServer(int port) {
udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0);
tcp_socket_ = socket(AF_INET6, SOCK_STREAM, 0);
if (udp_socket_ == BAD_SOCKET_RETURN_VAL) {
gpr_log(GPR_DEBUG, "Failed to create UDP ipv6 socket");
abort();
}
if (tcp_socket_ == BAD_SOCKET_RETURN_VAL) {
gpr_log(GPR_DEBUG, "Failed to create TCP ipv6 socket");
abort();
}
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port);
(reinterpret_cast<char*>(&addr.sin6_addr))[15] = 1;
if (bind(udp_socket_, reinterpret_cast<const sockaddr*>(&addr),
sizeof(addr)) != 0) {
gpr_log(GPR_DEBUG, "Failed to bind UDP ipv6 socket to [::1]:%d", port);
abort();
}
#ifdef GPR_WINDOWS
char val = 1;
if (setsockopt(tcp_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) ==
SOCKET_ERROR) {
gpr_log(GPR_DEBUG,
"Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d", port);
abort();
}
#else
int val = 1;
if (setsockopt(tcp_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) !=
0) {
gpr_log(GPR_DEBUG,
"Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d", port);
abort();
}
#endif
if (bind(tcp_socket_, reinterpret_cast<const sockaddr*>(&addr),
sizeof(addr)) != 0) {
gpr_log(GPR_DEBUG, "Failed to bind TCP ipv6 socket to [::1]:%d", port);
abort();
}
if (listen(tcp_socket_, 100)) {
gpr_log(GPR_DEBUG, "Failed to listen on TCP ipv6 socket to [::1]:%d", port);
abort();
}
}
FakeNonResponsiveDNSServer::~FakeNonResponsiveDNSServer() {
#ifdef GPR_WINDOWS
closesocket(udp_socket_);
closesocket(tcp_socket_);
#else
close(udp_socket_);
close(tcp_socket_);
#endif
}
} // namespace testing
} // namespace grpc

@ -1,38 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_DNS_TEST_UTIL_H
#define GRPC_DNS_TEST_UTIL_H
namespace grpc {
namespace testing {
class FakeNonResponsiveDNSServer {
public:
explicit FakeNonResponsiveDNSServer(int port);
virtual ~FakeNonResponsiveDNSServer();
private:
int udp_socket_;
int tcp_socket_;
};
} // namespace testing
} // namespace grpc
#endif /* GRPC_DNS_TEST_UTIL_H */

@ -52,9 +52,9 @@ def generate_resolver_component_tests():
"gtest",
],
deps = [
":dns_test_util",
"//test/cpp/util:test_util%s" % unsecure_build_config_suffix,
"//test/core/util:grpc_test_util%s" % unsecure_build_config_suffix,
"//test/core/util:fake_udp_and_tcp_server",
"//:grpc++%s" % unsecure_build_config_suffix,
"//:grpc%s" % unsecure_build_config_suffix,
"//:gpr",

@ -57,9 +57,9 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/work_serializer.h"
#include "test/core/util/fake_udp_and_tcp_server.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/naming/dns_test_util.h"
#include "test/cpp/util/subprocess.h"
#include "test/cpp/util/test_config.h"
@ -580,14 +580,16 @@ void RunResolvesRelevantRecordsTest(
gpr_log(GPR_DEBUG,
"resolver_component_test: --inject_broken_nameserver_list: %s",
absl::GetFlag(FLAGS_inject_broken_nameserver_list).c_str());
std::unique_ptr<grpc::testing::FakeNonResponsiveDNSServer>
std::unique_ptr<grpc_core::testing::FakeUdpAndTcpServer>
fake_non_responsive_dns_server;
if (absl::GetFlag(FLAGS_inject_broken_nameserver_list) == "True") {
g_fake_non_responsive_dns_server_port = grpc_pick_unused_port_or_die();
fake_non_responsive_dns_server =
absl::make_unique<grpc::testing::FakeNonResponsiveDNSServer>(
g_fake_non_responsive_dns_server_port);
fake_non_responsive_dns_server = absl::make_unique<
grpc_core::testing::FakeUdpAndTcpServer>(
grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
kWaitForClientToSendFirstBytes,
grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
g_fake_non_responsive_dns_server_port =
fake_non_responsive_dns_server->port();
grpc_ares_test_only_inject_config = InjectBrokenNameServerList;
whole_uri = absl::StrCat("dns:///", absl::GetFlag(FLAGS_target_name));
} else if (absl::GetFlag(FLAGS_inject_broken_nameserver_list) == "False") {

Loading…
Cancel
Save