|
|
|
@ -334,9 +334,17 @@ class FakeTcpServer { |
|
|
|
|
CLOSE_SOCKET, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
FakeTcpServer( |
|
|
|
|
enum class AcceptMode { |
|
|
|
|
kWaitForClientToSendFirstBytes, // useful for emulating ALTS based
|
|
|
|
|
// grpc servers
|
|
|
|
|
kEagerlySendSettings, // useful for emulating insecure grpc servers (e.g.
|
|
|
|
|
// ALTS handshake servers)
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
explicit FakeTcpServer( |
|
|
|
|
AcceptMode accept_mode, |
|
|
|
|
const std::function<ProcessReadResult(int, int, int)>& process_read_cb) |
|
|
|
|
: process_read_cb_(process_read_cb) { |
|
|
|
|
: accept_mode_(accept_mode), process_read_cb_(process_read_cb) { |
|
|
|
|
port_ = grpc_pick_unused_port_or_die(); |
|
|
|
|
accept_socket_ = socket(AF_INET6, SOCK_STREAM, 0); |
|
|
|
|
address_ = absl::StrCat("[::]:", port_); |
|
|
|
@ -429,12 +437,51 @@ class FakeTcpServer { |
|
|
|
|
return CONTINUE_READING; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class FakeTcpServerPeer { |
|
|
|
|
public: |
|
|
|
|
explicit FakeTcpServerPeer(int fd) : fd_(fd) {} |
|
|
|
|
|
|
|
|
|
~FakeTcpServerPeer() { close(fd_); } |
|
|
|
|
|
|
|
|
|
void MaybeContinueSendingSettings() { |
|
|
|
|
// https://tools.ietf.org/html/rfc7540#section-4.1
|
|
|
|
|
const std::vector<uint8_t> kEmptyHttp2SettingsFrame = { |
|
|
|
|
0x00, 0x00, 0x00, // length
|
|
|
|
|
0x04, // settings type
|
|
|
|
|
0x00, // flags
|
|
|
|
|
0x00, 0x00, 0x00, 0x00 // stream identifier
|
|
|
|
|
}; |
|
|
|
|
if (total_bytes_sent_ < kEmptyHttp2SettingsFrame.size()) { |
|
|
|
|
int bytes_to_send = kEmptyHttp2SettingsFrame.size() - total_bytes_sent_; |
|
|
|
|
int bytes_sent = |
|
|
|
|
send(fd_, kEmptyHttp2SettingsFrame.data() + total_bytes_sent_, |
|
|
|
|
bytes_to_send, 0); |
|
|
|
|
if (bytes_sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"Fake TCP server encountered unexpected error:%d |%s| " |
|
|
|
|
"sending %d bytes on fd:%d", |
|
|
|
|
errno, strerror(errno), bytes_to_send, fd_); |
|
|
|
|
GPR_ASSERT(0); |
|
|
|
|
} else if (bytes_sent > 0) { |
|
|
|
|
total_bytes_sent_ += bytes_sent; |
|
|
|
|
GPR_ASSERT(total_bytes_sent_ <= kEmptyHttp2SettingsFrame.size()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int fd() { return fd_; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
int fd_; |
|
|
|
|
int total_bytes_sent_ = 0; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Run a loop that periodically, every 10 ms:
|
|
|
|
|
// 1) Checks if there are any new TCP connections to accept.
|
|
|
|
|
// 2) Checks if any data has arrived yet on established connections,
|
|
|
|
|
// and reads from them if so, processing the sockets as configured.
|
|
|
|
|
static void RunServerLoop(FakeTcpServer* self) { |
|
|
|
|
std::set<int> peers; |
|
|
|
|
std::set<std::unique_ptr<FakeTcpServerPeer>> peers; |
|
|
|
|
while (!gpr_event_get(&self->stop_ev_)) { |
|
|
|
|
int p = accept(self->accept_socket_, nullptr, nullptr); |
|
|
|
|
if (p == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { |
|
|
|
@ -449,17 +496,19 @@ class FakeTcpServer { |
|
|
|
|
errno); |
|
|
|
|
abort(); |
|
|
|
|
} |
|
|
|
|
peers.insert(p); |
|
|
|
|
peers.insert(absl::make_unique<FakeTcpServerPeer>(p)); |
|
|
|
|
} |
|
|
|
|
auto it = peers.begin(); |
|
|
|
|
while (it != peers.end()) { |
|
|
|
|
int p = *it; |
|
|
|
|
FakeTcpServerPeer* peer = (*it).get(); |
|
|
|
|
if (self->accept_mode_ == AcceptMode::kEagerlySendSettings) { |
|
|
|
|
peer->MaybeContinueSendingSettings(); |
|
|
|
|
} |
|
|
|
|
char buf[100]; |
|
|
|
|
int bytes_received_size = recv(p, buf, 100, 0); |
|
|
|
|
int bytes_received_size = recv(peer->fd(), buf, 100, 0); |
|
|
|
|
ProcessReadResult r = |
|
|
|
|
self->process_read_cb_(bytes_received_size, errno, p); |
|
|
|
|
self->process_read_cb_(bytes_received_size, errno, peer->fd()); |
|
|
|
|
if (r == CLOSE_SOCKET) { |
|
|
|
|
close(p); |
|
|
|
|
it = peers.erase(it); |
|
|
|
|
} else { |
|
|
|
|
GPR_ASSERT(r == CONTINUE_READING); |
|
|
|
@ -469,9 +518,6 @@ class FakeTcpServer { |
|
|
|
|
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), |
|
|
|
|
gpr_time_from_millis(10, GPR_TIMESPAN))); |
|
|
|
|
} |
|
|
|
|
for (auto it = peers.begin(); it != peers.end(); it++) { |
|
|
|
|
close(*it); |
|
|
|
|
} |
|
|
|
|
close(self->accept_socket_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -481,6 +527,7 @@ class FakeTcpServer { |
|
|
|
|
gpr_event stop_ev_; |
|
|
|
|
std::string address_; |
|
|
|
|
std::unique_ptr<std::thread> run_server_loop_thd_; |
|
|
|
|
const AcceptMode accept_mode_; |
|
|
|
|
std::function<ProcessReadResult(int, int, int)> process_read_cb_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -500,7 +547,10 @@ TEST(AltsConcurrentConnectivityTest, |
|
|
|
|
// RPCs at the fake handshake server would be inherently racey.
|
|
|
|
|
FakeHandshakeServer fake_handshake_server( |
|
|
|
|
false /* check num concurrent rpcs */); |
|
|
|
|
FakeTcpServer fake_tcp_server( |
|
|
|
|
// The fake_backend_server emulates a secure (ALTS based) gRPC backend. So
|
|
|
|
|
// it waits for the client to send the first bytes.
|
|
|
|
|
FakeTcpServer fake_backend_server( |
|
|
|
|
FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes, |
|
|
|
|
FakeTcpServer::CloseSocketUponReceivingBytesFromPeer); |
|
|
|
|
{ |
|
|
|
|
gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20); |
|
|
|
@ -510,7 +560,7 @@ TEST(AltsConcurrentConnectivityTest, |
|
|
|
|
for (size_t i = 0; i < num_concurrent_connects; i++) { |
|
|
|
|
connect_loop_runners.push_back( |
|
|
|
|
std::unique_ptr<ConnectLoopRunner>(new ConnectLoopRunner( |
|
|
|
|
fake_tcp_server.address(), fake_handshake_server.address(), |
|
|
|
|
fake_backend_server.address(), fake_handshake_server.address(), |
|
|
|
|
10 /* per connect deadline seconds */, 3 /* loops */, |
|
|
|
|
GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */, |
|
|
|
|
0 /* reconnect_backoff_ms unset */))); |
|
|
|
@ -530,9 +580,16 @@ TEST(AltsConcurrentConnectivityTest, |
|
|
|
|
* fail fast when the ALTS handshake server fails incoming handshakes fast. */ |
|
|
|
|
TEST(AltsConcurrentConnectivityTest, |
|
|
|
|
TestHandshakeFailsFastWhenHandshakeServerClosesConnectionAfterAccepting) { |
|
|
|
|
// The fake_handshake_server emulates a broken ALTS handshaker, which
|
|
|
|
|
// is an insecure server. So send settings to the client eagerly.
|
|
|
|
|
FakeTcpServer fake_handshake_server( |
|
|
|
|
FakeTcpServer::AcceptMode::kEagerlySendSettings, |
|
|
|
|
FakeTcpServer::CloseSocketUponReceivingBytesFromPeer); |
|
|
|
|
FakeTcpServer fake_tcp_server(FakeTcpServer::CloseSocketUponCloseFromPeer); |
|
|
|
|
// The fake_backend_server emulates a secure (ALTS based) server, so wait
|
|
|
|
|
// for the client to send the first bytes.
|
|
|
|
|
FakeTcpServer fake_backend_server( |
|
|
|
|
FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes, |
|
|
|
|
FakeTcpServer::CloseSocketUponCloseFromPeer); |
|
|
|
|
{ |
|
|
|
|
gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20); |
|
|
|
|
std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners; |
|
|
|
@ -541,7 +598,7 @@ TEST(AltsConcurrentConnectivityTest, |
|
|
|
|
for (size_t i = 0; i < num_concurrent_connects; i++) { |
|
|
|
|
connect_loop_runners.push_back( |
|
|
|
|
std::unique_ptr<ConnectLoopRunner>(new ConnectLoopRunner( |
|
|
|
|
fake_tcp_server.address(), fake_handshake_server.address(), |
|
|
|
|
fake_backend_server.address(), fake_handshake_server.address(), |
|
|
|
|
10 /* per connect deadline seconds */, 2 /* loops */, |
|
|
|
|
GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */, |
|
|
|
|
0 /* reconnect_backoff_ms unset */))); |
|
|
|
@ -562,9 +619,16 @@ TEST(AltsConcurrentConnectivityTest, |
|
|
|
|
* the overall connection deadline kicks in. */ |
|
|
|
|
TEST(AltsConcurrentConnectivityTest, |
|
|
|
|
TestHandshakeFailsFastWhenHandshakeServerHangsAfterAccepting) { |
|
|
|
|
// fake_handshake_server emulates an insecure server, so send settings first.
|
|
|
|
|
// It will be unresponsive for the rest of the connection, though.
|
|
|
|
|
FakeTcpServer fake_handshake_server( |
|
|
|
|
FakeTcpServer::AcceptMode::kEagerlySendSettings, |
|
|
|
|
FakeTcpServer::CloseSocketUponCloseFromPeer); |
|
|
|
|
// fake_backend_server emulates an ALTS based server, so wait for the client
|
|
|
|
|
// to send the first bytes.
|
|
|
|
|
FakeTcpServer fake_backend_server( |
|
|
|
|
FakeTcpServer::AcceptMode::kWaitForClientToSendFirstBytes, |
|
|
|
|
FakeTcpServer::CloseSocketUponCloseFromPeer); |
|
|
|
|
FakeTcpServer fake_tcp_server(FakeTcpServer::CloseSocketUponCloseFromPeer); |
|
|
|
|
{ |
|
|
|
|
gpr_timespec test_deadline = grpc_timeout_seconds_to_deadline(20); |
|
|
|
|
std::vector<std::unique_ptr<ConnectLoopRunner>> connect_loop_runners; |
|
|
|
@ -573,7 +637,7 @@ TEST(AltsConcurrentConnectivityTest, |
|
|
|
|
for (size_t i = 0; i < num_concurrent_connects; i++) { |
|
|
|
|
connect_loop_runners.push_back( |
|
|
|
|
std::unique_ptr<ConnectLoopRunner>(new ConnectLoopRunner( |
|
|
|
|
fake_tcp_server.address(), fake_handshake_server.address(), |
|
|
|
|
fake_backend_server.address(), fake_handshake_server.address(), |
|
|
|
|
10 /* per connect deadline seconds */, 2 /* loops */, |
|
|
|
|
GRPC_CHANNEL_TRANSIENT_FAILURE /* expected connectivity states */, |
|
|
|
|
100 /* reconnect_backoff_ms */))); |
|
|
|
|