diff --git a/CMakeLists.txt b/CMakeLists.txt index 2121a8e5aa4..60370ff03e2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7829,6 +7829,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) test/core/end2end/cq_verifier.cc test/core/tsi/alts/fake_handshaker/fake_handshaker_server.cc test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc + test/core/util/fake_udp_and_tcp_server.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc ) @@ -8620,8 +8621,8 @@ if(gRPC_BUILD_TESTS) add_executable(cancel_ares_query_test test/core/end2end/cq_verifier.cc + test/core/util/fake_udp_and_tcp_server.cc test/cpp/naming/cancel_ares_query_test.cc - test/cpp/naming/dns_test_util.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index e4ce93b76b7..a29a4d40f3f 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -4531,12 +4531,14 @@ targets: headers: - test/core/end2end/cq_verifier.h - test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h + - test/core/util/fake_udp_and_tcp_server.h src: - test/core/tsi/alts/fake_handshaker/handshaker.proto - test/core/tsi/alts/fake_handshaker/transport_security_common.proto - test/core/end2end/cq_verifier.cc - test/core/tsi/alts/fake_handshaker/fake_handshaker_server.cc - test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc + - test/core/util/fake_udp_and_tcp_server.cc deps: - grpc++ - grpc_test_util @@ -4907,11 +4909,11 @@ targets: language: c++ headers: - test/core/end2end/cq_verifier.h - - test/cpp/naming/dns_test_util.h + - test/core/util/fake_udp_and_tcp_server.h src: - test/core/end2end/cq_verifier.cc + - test/core/util/fake_udp_and_tcp_server.cc - test/cpp/naming/cancel_ares_query_test.cc - - test/cpp/naming/dns_test_util.cc deps: - grpc++_test_config - grpc++_test_util diff --git a/test/core/tsi/alts/handshaker/BUILD b/test/core/tsi/alts/handshaker/BUILD index 29a366aa7a6..c5c7ff5c0e2 100644 --- a/test/core/tsi/alts/handshaker/BUILD +++ b/test/core/tsi/alts/handshaker/BUILD @@ -96,6 +96,7 @@ grpc_cc_test( "//:grpc", "//test/core/end2end:cq_verifier", "//test/core/tsi/alts/fake_handshaker:fake_handshaker_lib", + "//test/core/util:fake_udp_and_tcp_server", "//test/core/util:grpc_test_util", ], ) diff --git a/test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc b/test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc index a5a12698db0..6fe291b09aa 100644 --- a/test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc +++ b/test/core/tsi/alts/handshaker/alts_concurrent_connectivity_test.cc @@ -56,6 +56,7 @@ #include "src/core/lib/slice/slice_string_helpers.h" #include "test/core/end2end/cq_verifier.h" #include "test/core/tsi/alts/fake_handshaker/fake_handshaker_server.h" +#include "test/core/util/fake_udp_and_tcp_server.h" #include "test/core/util/memory_counters.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -325,210 +326,6 @@ TEST(AltsConcurrentConnectivityTest, TestConcurrentClientServerHandshakes) { } } -class FakeTcpServer { - public: - enum ProcessReadResult { - CONTINUE_READING, - CLOSE_SOCKET, - }; - - enum class AcceptMode { - kWaitForClientToSendFirstBytes, // useful for emulating ALTS based - // grpc servers - kEagerlySendSettings, // useful for emulating insecure grpc servers (e.g. - // ALTS handshake servers) - }; - - explicit FakeTcpServer( - AcceptMode accept_mode, - const std::function& process_read_cb) - : accept_mode_(accept_mode), process_read_cb_(process_read_cb) { - port_ = grpc_pick_unused_port_or_die(); - accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0); - address_ = absl::StrCat("[::]:", port_); - GPR_ASSERT(accept_socket_ != -1); - if (accept_socket_ == -1) { - gpr_log(GPR_ERROR, "Failed to create socket: %d", errno); - abort(); - } - int val = 1; - if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, - sizeof(val)) != 0) { - gpr_log(GPR_ERROR, - "Failed to set SO_REUSEADDR on socket bound to [::1]:%d : %d", - port_, errno); - abort(); - } - if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) { - gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", errno); - abort(); - } - sockaddr_in6 addr; - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - addr.sin6_port = htons(port_); - (reinterpret_cast(&addr.sin6_addr))[15] = 1; - if (bind(accept_socket_, reinterpret_cast(&addr), - sizeof(addr)) != 0) { - gpr_log(GPR_ERROR, "Failed to bind socket to [::1]:%d : %d", port_, - errno); - abort(); - } - if (listen(accept_socket_, 100)) { - gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d", - port_, errno); - abort(); - } - gpr_event_init(&stop_ev_); - run_server_loop_thd_ = absl::make_unique(RunServerLoop, this); - } - - ~FakeTcpServer() { - gpr_log(GPR_DEBUG, - "FakeTcpServer stop and " - "join server thread"); - gpr_event_set(&stop_ev_, reinterpret_cast(1)); - run_server_loop_thd_->join(); - gpr_log(GPR_DEBUG, - "FakeTcpServer join server " - "thread complete"); - } - - const char* address() { return address_.c_str(); } - - static ProcessReadResult CloseSocketUponReceivingBytesFromPeer( - int bytes_received_size, int read_error, int s) { - if (bytes_received_size < 0 && read_error != EAGAIN && - read_error != EWOULDBLOCK) { - gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s, - errno); - abort(); - } - if (bytes_received_size >= 0) { - gpr_log(GPR_DEBUG, - "Fake TCP server received %d bytes from peer socket: %d. Close " - "the " - "connection.", - bytes_received_size, s); - return CLOSE_SOCKET; - } - return CONTINUE_READING; - } - - static ProcessReadResult CloseSocketUponCloseFromPeer(int bytes_received_size, - int read_error, int s) { - if (bytes_received_size < 0 && read_error != EAGAIN && - read_error != EWOULDBLOCK) { - gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s, - errno); - abort(); - } - if (bytes_received_size == 0) { - // The peer has shut down the connection. - gpr_log(GPR_DEBUG, - "Fake TCP server received 0 bytes from peer socket: %d. Close " - "the " - "connection.", - s); - return CLOSE_SOCKET; - } - return CONTINUE_READING; - } - - class FakeTcpServerPeer { - public: - explicit FakeTcpServerPeer(int fd) : fd_(fd) {} - - ~FakeTcpServerPeer() { close(fd_); } - - void MaybeContinueSendingSettings() { - // https://tools.ietf.org/html/rfc7540#section-4.1 - const std::vector kEmptyHttp2SettingsFrame = { - 0x00, 0x00, 0x00, // length - 0x04, // settings type - 0x00, // flags - 0x00, 0x00, 0x00, 0x00 // stream identifier - }; - if (total_bytes_sent_ < int(kEmptyHttp2SettingsFrame.size())) { - int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_; - int bytes_sent = - send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_, - bytes_to_send, 0); - if (bytes_sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { - gpr_log(GPR_ERROR, - "Fake TCP server encountered unexpected error:%d |%s| " - "sending %d bytes on fd:%d", - errno, strerror(errno), bytes_to_send, fd_); - GPR_ASSERT(0); - } else if (bytes_sent > 0) { - total_bytes_sent_ += bytes_sent; - GPR_ASSERT(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size())); - } - } - } - - int fd() { return fd_; } - - private: - int fd_; - int total_bytes_sent_ = 0; - }; - - // Run a loop that periodically, every 10 ms: - // 1) Checks if there are any new TCP connections to accept. - // 2) Checks if any data has arrived yet on established connections, - // and reads from them if so, processing the sockets as configured. - static void RunServerLoop(FakeTcpServer* self) { - std::set> peers; - while (!gpr_event_get(&self->stop_ev_)) { - int p = accept(self->accept_socket_, nullptr, nullptr); - if (p == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - gpr_log(GPR_ERROR, "Failed to accept connection: %d", errno); - abort(); - } - if (p != -1) { - gpr_log(GPR_DEBUG, "accepted peer socket: %d", p); - if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) { - gpr_log(GPR_ERROR, - "Failed to set O_NONBLOCK on peer socket:%d errno:%d", p, - errno); - abort(); - } - peers.insert(absl::make_unique(p)); - } - auto it = peers.begin(); - while (it != peers.end()) { - FakeTcpServerPeer* peer = (*it).get(); - if (self->accept_mode_ == AcceptMode::kEagerlySendSettings) { - peer->MaybeContinueSendingSettings(); - } - char buf[100]; - int bytes_received_size = recv(peer->fd(), buf, 100, 0); - ProcessReadResult r = - self->process_read_cb_(bytes_received_size, errno, peer->fd()); - if (r == CLOSE_SOCKET) { - it = peers.erase(it); - } else { - GPR_ASSERT(r == CONTINUE_READING); - it++; - } - } - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(10, GPR_TIMESPAN))); - } - close(self->accept_socket_); - } - - private: - int accept_socket_; - int port_; - gpr_event stop_ev_; - std::string address_; - std::unique_ptr run_server_loop_thd_; - const AcceptMode accept_mode_; - std::function process_read_cb_; -}; - /* This test is intended to make sure that ALTS handshakes we correctly * fail fast when the security handshaker gets an error while reading * from the remote peer, after having earlier sent the first bytes of the @@ -547,9 +344,11 @@ TEST(AltsConcurrentConnectivityTest, false /* check num concurrent rpcs */); // The fake_backend_server emulates a secure (ALTS based) gRPC backend. So // it waits for the client to send the first bytes. - FakeTcpServer fake_backend_server( - FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes, - FakeTcpServer::CloseSocketUponReceivingBytesFromPeer); + grpc_core::testing::FakeUdpAndTcpServer fake_backend_server( + grpc_core::testing::FakeUdpAndTcpServer::AcceptMode:: + kWaitForClientToSendFirstBytes, + grpc_core::testing::FakeUdpAndTcpServer:: + CloseSocketUponReceivingBytesFromPeer); { gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20); std::vector> connect_loop_runners; @@ -579,14 +378,16 @@ TEST(AltsConcurrentConnectivityTest, TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) { // The fake_handshake_server emulates a broken ALTS handshaker, which // is an insecure server. So send settings to the client eagerly. - FakeTcpServer fake_handshake_server( - FakeTcpServer::AcceptMode::kEagerlySendSettings, - FakeTcpServer::CloseSocketUponReceivingBytesFromPeer); + grpc_core::testing::FakeUdpAndTcpServer fake_handshake_server( + grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::kEagerlySendSettings, + grpc_core::testing::FakeUdpAndTcpServer:: + CloseSocketUponReceivingBytesFromPeer); // The fake_backend_server emulates a secure (ALTS based) server, so wait // for the client to send the first bytes. - FakeTcpServer fake_backend_server( - FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes, - FakeTcpServer::CloseSocketUponCloseFromPeer); + grpc_core::testing::FakeUdpAndTcpServer fake_backend_server( + grpc_core::testing::FakeUdpAndTcpServer::AcceptMode:: + kWaitForClientToSendFirstBytes, + grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer); { gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20); std::vector> connect_loop_runners; @@ -617,14 +418,15 @@ TEST(AltsConcurrentConnectivityTest, TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) { // fake_handshake_server emulates an insecure server, so send settings first. // It will be unresponsive for the rest of the connection, though. - FakeTcpServer fake_handshake_server( - FakeTcpServer::AcceptMode::kEagerlySendSettings, - FakeTcpServer::CloseSocketUponCloseFromPeer); + grpc_core::testing::FakeUdpAndTcpServer fake_handshake_server( + grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::kEagerlySendSettings, + grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer); // fake_backend_server emulates an ALTS based server, so wait for the client // to send the first bytes. - FakeTcpServer fake_backend_server( - FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes, - FakeTcpServer::CloseSocketUponCloseFromPeer); + grpc_core::testing::FakeUdpAndTcpServer fake_backend_server( + grpc_core::testing::FakeUdpAndTcpServer::AcceptMode:: + kWaitForClientToSendFirstBytes, + grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer); { gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20); std::vector> connect_loop_runners; diff --git a/test/core/util/BUILD b/test/core/util/BUILD index 7202058f439..9d56daaf9ac 100644 --- a/test/core/util/BUILD +++ b/test/core/util/BUILD @@ -208,6 +208,22 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "fake_udp_and_tcp_server", + srcs = ["fake_udp_and_tcp_server.cc"], + hdrs = ["fake_udp_and_tcp_server.h"], + external_deps = [ + "absl/strings", + "absl/strings:str_format", + ], + language = "C++", + deps = [ + ":grpc_test_util", + "//:gpr", + "//:grpc", + ], +) + grpc_cc_library( name = "build", srcs = ["build.cc"], diff --git a/test/core/util/fake_udp_and_tcp_server.cc b/test/core/util/fake_udp_and_tcp_server.cc new file mode 100644 index 00000000000..f7af63d7c9a --- /dev/null +++ b/test/core/util/fake_udp_and_tcp_server.cc @@ -0,0 +1,269 @@ +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "fake_udp_and_tcp_server.h" + +#include "absl/strings/match.h" +#include "absl/strings/str_cat.h" + +#include "src/core/lib/event_engine/sockaddr.h" +#include "test/core/util/port.h" + +#ifdef GPR_WINDOWS +#include "src/core/lib/iomgr/sockaddr_windows.h" +#include "src/core/lib/iomgr/socket_windows.h" +#include "src/core/lib/iomgr/tcp_windows.h" +#define BAD_SOCKET_RETURN_VAL INVALID_SOCKET +#define CLOSE_SOCKET closesocket +#else +#include + +#include "src/core/lib/iomgr/sockaddr_posix.h" +#define BAD_SOCKET_RETURN_VAL (-1) +#define CLOSE_SOCKET close +#endif + +namespace grpc_core { +namespace testing { + +FakeUdpAndTcpServer::FakeUdpAndTcpServer( + AcceptMode accept_mode, + std::function + process_read_cb) + : accept_mode_(accept_mode), process_read_cb_(std::move(process_read_cb)) { + port_ = grpc_pick_unused_port_or_die(); + udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0); + if (udp_socket_ == BAD_SOCKET_RETURN_VAL) { + gpr_log(GPR_DEBUG, "Failed to create UDP ipv6 socket: %d", errno); + GPR_ASSERT(0); + } + accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0); + address_ = absl::StrCat("[::]:", port_); + if (accept_socket_ == BAD_SOCKET_RETURN_VAL) { + gpr_log(GPR_ERROR, "Failed to create TCP IPv6 socket: %d", errno); + GPR_ASSERT(0); + } +#ifdef GPR_WINDOWS + char val = 1; + if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == + SOCKET_ERROR) { + gpr_log(GPR_DEBUG, + "Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d, " + "errno: %d", + port_, errno); + GPR_ASSERT(0); + } + grpc_error_handle set_non_block_error; + set_non_block_error = grpc_tcp_set_non_block(udp_socket_); + if (set_non_block_error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s", + grpc_error_std_string(set_non_block_error).c_str()); + GPR_ASSERT(0); + } + set_non_block_error = grpc_tcp_set_non_block(accept_socket_); + if (set_non_block_error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s", + grpc_error_std_string(set_non_block_error).c_str()); + GPR_ASSERT(0); + } +#else + int val = 1; + if (setsockopt(accept_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != + 0) { + gpr_log(GPR_DEBUG, "Failed to set SO_REUSEADDR on socket [::1]:%d", port_); + GPR_ASSERT(0); + } + if (fcntl(udp_socket_, F_SETFL, O_NONBLOCK) != 0) { + gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", errno); + GPR_ASSERT(0); + } + if (fcntl(accept_socket_, F_SETFL, O_NONBLOCK) != 0) { + gpr_log(GPR_ERROR, "Failed to set O_NONBLOCK on socket: %d", errno); + GPR_ASSERT(0); + } +#endif + sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_port = htons(port_); + (reinterpret_cast(&addr.sin6_addr))[15] = 1; + if (bind(udp_socket_, reinterpret_cast(&addr), + sizeof(addr)) != 0) { + gpr_log(GPR_DEBUG, "Failed to bind UDP socket to [::1]:%d", port_); + GPR_ASSERT(0); + } + if (bind(accept_socket_, reinterpret_cast(&addr), + sizeof(addr)) != 0) { + gpr_log(GPR_ERROR, "Failed to bind TCP socket to [::1]:%d : %d", port_, + errno); + GPR_ASSERT(0); + } + if (listen(accept_socket_, 100)) { + gpr_log(GPR_ERROR, "Failed to listen on socket bound to [::1]:%d : %d", + port_, errno); + GPR_ASSERT(0); + } + gpr_event_init(&stop_ev_); + run_server_loop_thd_ = absl::make_unique( + std::bind(&FakeUdpAndTcpServer::RunServerLoop, this)); +} + +FakeUdpAndTcpServer::~FakeUdpAndTcpServer() { + gpr_log(GPR_DEBUG, + "FakeUdpAndTcpServer stop and " + "join server thread"); + gpr_event_set(&stop_ev_, reinterpret_cast(1)); + run_server_loop_thd_->join(); + gpr_log(GPR_DEBUG, + "FakeUdpAndTcpServer join server " + "thread complete"); + CLOSE_SOCKET(accept_socket_); + CLOSE_SOCKET(udp_socket_); +} + +FakeUdpAndTcpServer::ProcessReadResult +FakeUdpAndTcpServer::CloseSocketUponReceivingBytesFromPeer( + int bytes_received_size, int read_error, int s) { + if (bytes_received_size < 0 && read_error != EAGAIN && + read_error != EWOULDBLOCK) { + gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s, + errno); + GPR_ASSERT(0); + } + if (bytes_received_size >= 0) { + gpr_log(GPR_DEBUG, + "Fake TCP server received %d bytes from peer socket: %d. Close " + "the " + "connection.", + bytes_received_size, s); + return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket; + } + return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading; +} + +FakeUdpAndTcpServer::ProcessReadResult +FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer(int bytes_received_size, + int read_error, int s) { + if (bytes_received_size < 0 && read_error != EAGAIN && + read_error != EWOULDBLOCK) { + gpr_log(GPR_ERROR, "Failed to receive from peer socket: %d. errno: %d", s, + errno); + GPR_ASSERT(0); + } + if (bytes_received_size == 0) { + // The peer has shut down the connection. + gpr_log(GPR_DEBUG, + "Fake TCP server received 0 bytes from peer socket: %d. Close " + "the " + "connection.", + s); + return FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket; + } + return FakeUdpAndTcpServer::ProcessReadResult::kContinueReading; +} + +FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::FakeUdpAndTcpServerPeer(int fd) + : fd_(fd) {} + +FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer::~FakeUdpAndTcpServerPeer() { + CLOSE_SOCKET(fd_); +} + +void FakeUdpAndTcpServer::FakeUdpAndTcpServerPeer:: + MaybeContinueSendingSettings() { + // https://tools.ietf.org/html/rfc7540#section-4.1 + const std::vector kEmptyHttp2SettingsFrame = { + 0x00, 0x00, 0x00, // length + 0x04, // settings type + 0x00, // flags + 0x00, 0x00, 0x00, 0x00 // stream identifier + }; + if (total_bytes_sent_ < int(kEmptyHttp2SettingsFrame.size())) { + int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_; + int bytes_sent = + send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_, + bytes_to_send, 0); + if (bytes_sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + gpr_log(GPR_ERROR, + "Fake TCP server encountered unexpected error:%d |%s| " + "sending %d bytes on fd:%d", + errno, strerror(errno), bytes_to_send, fd_); + GPR_ASSERT(0); + } else if (bytes_sent > 0) { + total_bytes_sent_ += bytes_sent; + GPR_ASSERT(total_bytes_sent_ <= int(kEmptyHttp2SettingsFrame.size())); + } + } +} + +void FakeUdpAndTcpServer::ReadFromUdpSocket() { + char buf[100]; + recvfrom(udp_socket_, buf, sizeof(buf), 0, nullptr, nullptr); +} + +void FakeUdpAndTcpServer::RunServerLoop() { + std::set> peers; + while (!gpr_event_get(&stop_ev_)) { + // handle TCP connections + int p = accept(accept_socket_, nullptr, nullptr); + if (p != BAD_SOCKET_RETURN_VAL) { + gpr_log(GPR_DEBUG, "accepted peer socket: %d", p); +#ifdef GPR_WINDOWS + grpc_error_handle set_non_block_error; + set_non_block_error = grpc_tcp_set_non_block(p); + if (set_non_block_error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Failed to configure non-blocking socket: %s", + grpc_error_std_string(set_non_block_error).c_str()); + GPR_ASSERT(0); + } +#else + if (fcntl(p, F_SETFL, O_NONBLOCK) != 0) { + gpr_log(GPR_ERROR, "Failed to configure non-blocking socket, errno: %d", + errno); + GPR_ASSERT(0); + } +#endif + peers.insert(absl::make_unique(p)); + } + auto it = peers.begin(); + while (it != peers.end()) { + FakeUdpAndTcpServerPeer* peer = (*it).get(); + if (accept_mode_ == AcceptMode::kEagerlySendSettings) { + peer->MaybeContinueSendingSettings(); + } + char buf[100]; + int bytes_received_size = recv(peer->fd(), buf, 100, 0); + FakeUdpAndTcpServer::ProcessReadResult r = + process_read_cb_(bytes_received_size, errno, peer->fd()); + if (r == FakeUdpAndTcpServer::ProcessReadResult::kCloseSocket) { + it = peers.erase(it); + } else { + GPR_ASSERT(r == + FakeUdpAndTcpServer::ProcessReadResult::kContinueReading); + it++; + } + } + // read from the UDP socket + ReadFromUdpSocket(); + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(10, GPR_TIMESPAN))); + } +} + +} // namespace testing +} // namespace grpc_core diff --git a/test/core/util/fake_udp_and_tcp_server.h b/test/core/util/fake_udp_and_tcp_server.h new file mode 100644 index 00000000000..acb23d20ab5 --- /dev/null +++ b/test/core/util/fake_udp_and_tcp_server.h @@ -0,0 +1,137 @@ +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include +#include +#include +#include + +#include "absl/memory/memory.h" + +#include +#include +#include +#include +#include + +namespace grpc_core { +namespace testing { + +// This class is used to simulate a variety of network conditions in +// unit tests. +// +// Note that resulting server only listens on the IPv6 loopback +// address, "[::1]". This is expected to be OK as all known gRPC unit test +// environments have this address available. +// +// As examples, this can be used to (but is not limited to) exercise +// the following cases: +// +// 1) DNS resolver's UDP requests experience packet loss: +// +// testing::FakeUdpAndTcpServer fake_dns_server( +// testing::FakeUdpAndTcpServer::AcceptMode:: +// kWaitForClientToSendFirstBytes, +// testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer); +// auto server_uri = absl::StrFormat("dns:///[::]:%d/localhost:1234", +// fake_dns_server.port()); +// +// 2) Server gets stuck while setting up a security handshake and client's +// security handshake times out (requires using secure channels): +// +// testing::FakeUdpAndTcpServer fake_server( +// testing::FakeUdpAndTcpServer::AcceptMode:: +// kWaitForClientToSendFirstBytes, +// testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer); +// auto server_uri = absl::StrFormat("[::1]:%d", fake_server.port()); +// +// 3) Client connections are immediately closed after sending the first bytes +// to an insecure server: +// +// testing::FakeUdpAndTcpServer fake_server( +// testing::FakeUdpAndTcpServer::AcceptMode:: +// kEagerlySendSettings, +// testing::FakeUdpAndTcpServer::CloseSocketUponReceivingBytesFromPeer); +// auto server_uri = absl::StrFormat("[::1]:%d", fake_server.port()); +// +class FakeUdpAndTcpServer { + public: + enum class ProcessReadResult { + kContinueReading = 0, + kCloseSocket, + }; + + enum class AcceptMode { + kWaitForClientToSendFirstBytes, // useful for emulating ALTS based + // grpc servers + kEagerlySendSettings, // useful for emulating insecure grpc servers (e.g. + // ALTS handshake servers) + }; + + explicit FakeUdpAndTcpServer( + AcceptMode accept_mode, + std::function process_read_cb); + + ~FakeUdpAndTcpServer(); + + const char* address() { return address_.c_str(); } + + int port() { return port_; }; + + static ProcessReadResult CloseSocketUponReceivingBytesFromPeer( + int bytes_received_size, int read_error, int s); + + static ProcessReadResult CloseSocketUponCloseFromPeer(int bytes_received_size, + int read_error, int s); + + void ReadFromUdpSocket(); + + // Run a loop that periodically, every 10 ms: + // 1) Checks if there are any new TCP connections to accept. + // 2) Checks if any data has arrived yet on established connections, + // and reads from them if so, processing the sockets as configured. + void RunServerLoop(); + + private: + class FakeUdpAndTcpServerPeer { + public: + explicit FakeUdpAndTcpServerPeer(int fd); + + ~FakeUdpAndTcpServerPeer(); + + void MaybeContinueSendingSettings(); + + int fd() { return fd_; } + + private: + int fd_; + int total_bytes_sent_ = 0; + }; + + int accept_socket_; + int udp_socket_; + int port_; + gpr_event stop_ev_; + std::string address_; + std::unique_ptr run_server_loop_thd_; + const AcceptMode accept_mode_; + std::function process_read_cb_; +}; + +} // namespace testing +} // namespace grpc_core diff --git a/test/cpp/naming/BUILD b/test/cpp/naming/BUILD index f2653c2cf99..24f27b5a94f 100644 --- a/test/cpp/naming/BUILD +++ b/test/cpp/naming/BUILD @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_py_binary") +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_py_binary") load(":generate_resolver_component_tests.bzl", "generate_resolver_component_tests") package( @@ -39,25 +39,15 @@ grpc_cc_test( srcs = ["cancel_ares_query_test.cc"], external_deps = ["gtest"], deps = [ - ":dns_test_util", "//:gpr", "//:grpc", "//:grpc++", "//test/core/end2end:cq_verifier", + "//test/core/util:fake_udp_and_tcp_server", "//test/core/util:grpc_test_util", "//test/cpp/util:test_config", "//test/cpp/util:test_util", ], ) -grpc_cc_library( - name = "dns_test_util", - srcs = ["dns_test_util.cc"], - hdrs = ["dns_test_util.h"], - deps = [ - "//:gpr", - "//:grpc", - ], -) - generate_resolver_component_tests() diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index 8bd38efc88d..c393a92c59c 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -45,9 +45,9 @@ #include "src/core/lib/iomgr/work_serializer.h" #include "test/core/end2end/cq_verifier.h" #include "test/core/util/cmdline.h" +#include "test/core/util/fake_udp_and_tcp_server.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#include "test/cpp/naming/dns_test_util.h" #ifdef GPR_WINDOWS #include "src/core/lib/iomgr/sockaddr_windows.h" @@ -154,11 +154,13 @@ class AssertFailureResultHandler : public grpc_core::Resolver::ResultHandler { }; void TestCancelActiveDNSQuery(ArgsStruct* args) { - int fake_dns_port = grpc_pick_unused_port_or_die(); - grpc::testing::FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port); + grpc_core::testing::FakeUdpAndTcpServer fake_dns_server( + grpc_core::testing::FakeUdpAndTcpServer::AcceptMode:: + kWaitForClientToSendFirstBytes, + grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer); std::string client_target = absl::StrFormat( "dns://[::1]:%d/dont-care-since-wont-be-resolved.test.com:1234", - fake_dns_port); + fake_dns_server.port()); // create resolver and resolve grpc_core::OrphanablePtr resolver = grpc_core::ResolverRegistry::CreateResolver( @@ -274,12 +276,14 @@ typedef enum { void TestCancelDuringActiveQuery( cancellation_test_query_timeout_setting query_timeout_setting) { // Start up fake non responsive DNS server - int fake_dns_port = grpc_pick_unused_port_or_die(); - grpc::testing::FakeNonResponsiveDNSServer fake_dns_server(fake_dns_port); + grpc_core::testing::FakeUdpAndTcpServer fake_dns_server( + grpc_core::testing::FakeUdpAndTcpServer::AcceptMode:: + kWaitForClientToSendFirstBytes, + grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer); // Create a call that will try to use the fake DNS server std::string name = "dont-care-since-wont-be-resolved.test.com:1234"; std::string client_target = - absl::StrFormat("dns://[::1]:%d/%s", fake_dns_port, name); + absl::StrFormat("dns://[::1]:%d/%s", fake_dns_server.port(), name); gpr_log(GPR_DEBUG, "TestCancelActiveDNSQuery. query timeout setting: %d", query_timeout_setting); grpc_channel_args* client_args = nullptr; diff --git a/test/cpp/naming/dns_test_util.cc b/test/cpp/naming/dns_test_util.cc deleted file mode 100644 index e0ea96fddb7..00000000000 --- a/test/cpp/naming/dns_test_util.cc +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "test/cpp/naming/dns_test_util.h" - -#include -#include - -#include -#include - -#include "src/core/lib/event_engine/sockaddr.h" - -#ifdef GPR_WINDOWS -#include "src/core/lib/iomgr/sockaddr_windows.h" -#include "src/core/lib/iomgr/socket_windows.h" -#define BAD_SOCKET_RETURN_VAL INVALID_SOCKET -#else -#include "src/core/lib/iomgr/sockaddr_posix.h" -#define BAD_SOCKET_RETURN_VAL (-1) -#endif - -namespace grpc { -namespace testing { - -FakeNonResponsiveDNSServer::FakeNonResponsiveDNSServer(int port) { - udp_socket_ = socket(AF_INET6, SOCK_DGRAM, 0); - tcp_socket_ = socket(AF_INET6, SOCK_STREAM, 0); - if (udp_socket_ == BAD_SOCKET_RETURN_VAL) { - gpr_log(GPR_DEBUG, "Failed to create UDP ipv6 socket"); - abort(); - } - if (tcp_socket_ == BAD_SOCKET_RETURN_VAL) { - gpr_log(GPR_DEBUG, "Failed to create TCP ipv6 socket"); - abort(); - } - sockaddr_in6 addr; - memset(&addr, 0, sizeof(addr)); - addr.sin6_family = AF_INET6; - addr.sin6_port = htons(port); - (reinterpret_cast(&addr.sin6_addr))[15] = 1; - if (bind(udp_socket_, reinterpret_cast(&addr), - sizeof(addr)) != 0) { - gpr_log(GPR_DEBUG, "Failed to bind UDP ipv6 socket to [::1]:%d", port); - abort(); - } -#ifdef GPR_WINDOWS - char val = 1; - if (setsockopt(tcp_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == - SOCKET_ERROR) { - gpr_log(GPR_DEBUG, - "Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d", port); - abort(); - } -#else - int val = 1; - if (setsockopt(tcp_socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != - 0) { - gpr_log(GPR_DEBUG, - "Failed to set SO_REUSEADDR on TCP ipv6 socket to [::1]:%d", port); - abort(); - } -#endif - if (bind(tcp_socket_, reinterpret_cast(&addr), - sizeof(addr)) != 0) { - gpr_log(GPR_DEBUG, "Failed to bind TCP ipv6 socket to [::1]:%d", port); - abort(); - } - if (listen(tcp_socket_, 100)) { - gpr_log(GPR_DEBUG, "Failed to listen on TCP ipv6 socket to [::1]:%d", port); - abort(); - } -} - -FakeNonResponsiveDNSServer::~FakeNonResponsiveDNSServer() { -#ifdef GPR_WINDOWS - closesocket(udp_socket_); - closesocket(tcp_socket_); -#else - close(udp_socket_); - close(tcp_socket_); -#endif -} - -} // namespace testing -} // namespace grpc diff --git a/test/cpp/naming/dns_test_util.h b/test/cpp/naming/dns_test_util.h deleted file mode 100644 index d5e50992672..00000000000 --- a/test/cpp/naming/dns_test_util.h +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_DNS_TEST_UTIL_H -#define GRPC_DNS_TEST_UTIL_H - -namespace grpc { -namespace testing { - -class FakeNonResponsiveDNSServer { - public: - explicit FakeNonResponsiveDNSServer(int port); - virtual ~FakeNonResponsiveDNSServer(); - - private: - int udp_socket_; - int tcp_socket_; -}; - -} // namespace testing -} // namespace grpc - -#endif /* GRPC_DNS_TEST_UTIL_H */ diff --git a/test/cpp/naming/generate_resolver_component_tests.bzl b/test/cpp/naming/generate_resolver_component_tests.bzl index 19dc6b9b48d..c66b5d95dfb 100755 --- a/test/cpp/naming/generate_resolver_component_tests.bzl +++ b/test/cpp/naming/generate_resolver_component_tests.bzl @@ -52,9 +52,9 @@ def generate_resolver_component_tests(): "gtest", ], deps = [ - ":dns_test_util", "//test/cpp/util:test_util%s" % unsecure_build_config_suffix, "//test/core/util:grpc_test_util%s" % unsecure_build_config_suffix, + "//test/core/util:fake_udp_and_tcp_server", "//:grpc++%s" % unsecure_build_config_suffix, "//:grpc%s" % unsecure_build_config_suffix, "//:gpr", diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index 8a453bca62e..dd18ee074fc 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -57,9 +57,9 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/iomgr/work_serializer.h" +#include "test/core/util/fake_udp_and_tcp_server.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#include "test/cpp/naming/dns_test_util.h" #include "test/cpp/util/subprocess.h" #include "test/cpp/util/test_config.h" @@ -580,14 +580,16 @@ void RunResolvesRelevantRecordsTest( gpr_log(GPR_DEBUG, "resolver_component_test: --inject_broken_nameserver_list: %s", absl::GetFlag(FLAGS_inject_broken_nameserver_list).c_str()); - std::unique_ptr + std::unique_ptr fake_non_responsive_dns_server; if (absl::GetFlag(FLAGS_inject_broken_nameserver_list) == "True") { - g_fake_non_responsive_dns_server_port = grpc_pick_unused_port_or_die(); - fake_non_responsive_dns_server = - absl::make_unique( - - g_fake_non_responsive_dns_server_port); + fake_non_responsive_dns_server = absl::make_unique< + grpc_core::testing::FakeUdpAndTcpServer>( + grpc_core::testing::FakeUdpAndTcpServer::AcceptMode:: + kWaitForClientToSendFirstBytes, + grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer); + g_fake_non_responsive_dns_server_port = + fake_non_responsive_dns_server->port(); grpc_ares_test_only_inject_config = InjectBrokenNameServerList; whole_uri = absl::StrCat("dns:///", absl::GetFlag(FLAGS_target_name)); } else if (absl::GetFlag(FLAGS_inject_broken_nameserver_list) == "False") {