|
|
|
@ -16,11 +16,8 @@ |
|
|
|
|
* |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
#include <gtest/gtest.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/gprpp/time.h" |
|
|
|
|
#include "src/core/lib/iomgr/port.h" |
|
|
|
|
#include "test/core/util/test_config.h" |
|
|
|
|
|
|
|
|
|
// This test won't work except with posix sockets enabled
|
|
|
|
|
#ifdef GRPC_POSIX_SOCKET_TCP |
|
|
|
@ -45,6 +42,7 @@ |
|
|
|
|
#include "src/core/lib/iomgr/tcp_posix.h" |
|
|
|
|
#include "src/core/lib/slice/slice_internal.h" |
|
|
|
|
#include "test/core/iomgr/endpoint_tests.h" |
|
|
|
|
#include "test/core/util/test_config.h" |
|
|
|
|
|
|
|
|
|
static gpr_mu* g_mu; |
|
|
|
|
static grpc_pollset* g_pollset; |
|
|
|
@ -69,11 +67,11 @@ GPR_GLOBAL_CONFIG_DECLARE_BOOL(grpc_experimental_enable_tcp_frame_size_tuning); |
|
|
|
|
|
|
|
|
|
static void create_sockets(int sv[2]) { |
|
|
|
|
int flags; |
|
|
|
|
ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0); |
|
|
|
|
GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); |
|
|
|
|
flags = fcntl(sv[0], F_GETFL, 0); |
|
|
|
|
ASSERT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0); |
|
|
|
|
GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); |
|
|
|
|
flags = fcntl(sv[1], F_GETFL, 0); |
|
|
|
|
ASSERT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0); |
|
|
|
|
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void create_inet_sockets(int sv[2]) { |
|
|
|
@ -82,16 +80,16 @@ static void create_inet_sockets(int sv[2]) { |
|
|
|
|
memset(&addr, 0, sizeof(struct sockaddr_in)); |
|
|
|
|
addr.sin_family = AF_INET; |
|
|
|
|
int sock = socket(AF_INET, SOCK_STREAM, 0); |
|
|
|
|
ASSERT_TRUE(sock); |
|
|
|
|
ASSERT_EQ(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)), 0); |
|
|
|
|
GPR_ASSERT(sock); |
|
|
|
|
GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0); |
|
|
|
|
listen(sock, 1); |
|
|
|
|
|
|
|
|
|
/* Prepare client socket and connect to server */ |
|
|
|
|
socklen_t len = sizeof(sockaddr_in); |
|
|
|
|
ASSERT_EQ(getsockname(sock, (sockaddr*)&addr, &len), 0); |
|
|
|
|
GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0); |
|
|
|
|
|
|
|
|
|
int client = socket(AF_INET, SOCK_STREAM, 0); |
|
|
|
|
ASSERT_TRUE(client); |
|
|
|
|
GPR_ASSERT(client); |
|
|
|
|
int ret; |
|
|
|
|
do { |
|
|
|
|
ret = connect(client, reinterpret_cast<sockaddr*>(&addr), |
|
|
|
@ -104,14 +102,14 @@ static void create_inet_sockets(int sv[2]) { |
|
|
|
|
do { |
|
|
|
|
server = accept(sock, reinterpret_cast<sockaddr*>(&addr), &len); |
|
|
|
|
} while (server == -1 && errno == EINTR); |
|
|
|
|
ASSERT_NE(server, -1); |
|
|
|
|
GPR_ASSERT(server != -1); |
|
|
|
|
|
|
|
|
|
sv[0] = server; |
|
|
|
|
sv[1] = client; |
|
|
|
|
int flags = fcntl(sv[0], F_GETFL, 0); |
|
|
|
|
ASSERT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0); |
|
|
|
|
GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); |
|
|
|
|
flags = fcntl(sv[1], F_GETFL, 0); |
|
|
|
|
ASSERT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0); |
|
|
|
|
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static ssize_t fill_socket(int fd) { |
|
|
|
@ -128,7 +126,7 @@ static ssize_t fill_socket(int fd) { |
|
|
|
|
total_bytes += write_bytes; |
|
|
|
|
} |
|
|
|
|
} while (write_bytes >= 0 || errno == EINTR); |
|
|
|
|
EXPECT_EQ(errno, EAGAIN); |
|
|
|
|
GPR_ASSERT(errno == EAGAIN); |
|
|
|
|
return total_bytes; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -170,7 +168,7 @@ static size_t count_slices(grpc_slice* slices, size_t nslices, |
|
|
|
|
for (i = 0; i < nslices; ++i) { |
|
|
|
|
buf = GRPC_SLICE_START_PTR(slices[i]); |
|
|
|
|
for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) { |
|
|
|
|
EXPECT_EQ(buf[j], *current_data); |
|
|
|
|
GPR_ASSERT(buf[j] == *current_data); |
|
|
|
|
*current_data = (*current_data + 1) % 256; |
|
|
|
|
} |
|
|
|
|
num_bytes += GRPC_SLICE_LENGTH(slices[i]); |
|
|
|
@ -184,20 +182,20 @@ static void read_cb(void* user_data, grpc_error_handle error) { |
|
|
|
|
size_t read_bytes; |
|
|
|
|
int current_data; |
|
|
|
|
|
|
|
|
|
ASSERT_TRUE(GRPC_ERROR_IS_NONE(error)); |
|
|
|
|
GPR_ASSERT(GRPC_ERROR_IS_NONE(error)); |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
current_data = state->read_bytes % 256; |
|
|
|
|
// The number of bytes read each time this callback is invoked must be >=
|
|
|
|
|
// the min_progress_size.
|
|
|
|
|
ASSERT_LE(state->min_progress_size, state->incoming.length); |
|
|
|
|
GPR_ASSERT(state->min_progress_size <= state->incoming.length); |
|
|
|
|
read_bytes = count_slices(state->incoming.slices, state->incoming.count, |
|
|
|
|
¤t_data); |
|
|
|
|
state->read_bytes += read_bytes; |
|
|
|
|
gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes, |
|
|
|
|
state->target_read_bytes); |
|
|
|
|
if (state->read_bytes >= state->target_read_bytes) { |
|
|
|
|
ASSERT_TRUE( |
|
|
|
|
GPR_ASSERT( |
|
|
|
|
GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
} else { |
|
|
|
@ -254,13 +252,13 @@ static void read_test(size_t num_bytes, size_t slice_size, |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
while (state.read_bytes < state.target_read_bytes) { |
|
|
|
|
grpc_pollset_worker* worker = nullptr; |
|
|
|
|
ASSERT_TRUE(GRPC_LOG_IF_ERROR( |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
} |
|
|
|
|
ASSERT_EQ(state.read_bytes, state.target_read_bytes); |
|
|
|
|
GPR_ASSERT(state.read_bytes == state.target_read_bytes); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer_destroy_internal(&state.incoming); |
|
|
|
@ -314,13 +312,13 @@ static void large_read_test(size_t slice_size, int min_progress_size) { |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
while (state.read_bytes < state.target_read_bytes) { |
|
|
|
|
grpc_pollset_worker* worker = nullptr; |
|
|
|
|
ASSERT_TRUE(GRPC_LOG_IF_ERROR( |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
} |
|
|
|
|
ASSERT_EQ(state.read_bytes, state.target_read_bytes); |
|
|
|
|
GPR_ASSERT(state.read_bytes == state.target_read_bytes); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer_destroy_internal(&state.incoming); |
|
|
|
@ -354,18 +352,18 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size, |
|
|
|
|
(*current_data)++; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
EXPECT_EQ(num_bytes_left, 0); |
|
|
|
|
GPR_ASSERT(num_bytes_left == 0); |
|
|
|
|
return slices; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void write_done(void* user_data /* write_socket_state */, |
|
|
|
|
grpc_error_handle error) { |
|
|
|
|
ASSERT_TRUE(GRPC_ERROR_IS_NONE(error)); |
|
|
|
|
GPR_ASSERT(GRPC_ERROR_IS_NONE(error)); |
|
|
|
|
struct write_socket_state* state = |
|
|
|
|
static_cast<struct write_socket_state*>(user_data); |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
state->write_done = 1; |
|
|
|
|
ASSERT_TRUE( |
|
|
|
|
GPR_ASSERT( |
|
|
|
|
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
} |
|
|
|
@ -380,12 +378,12 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { |
|
|
|
|
grpc_core::ExecCtx exec_ctx; |
|
|
|
|
|
|
|
|
|
flags = fcntl(fd, F_GETFL, 0); |
|
|
|
|
ASSERT_EQ(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK), 0); |
|
|
|
|
GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); |
|
|
|
|
|
|
|
|
|
for (;;) { |
|
|
|
|
grpc_pollset_worker* worker = nullptr; |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
ASSERT_TRUE(GRPC_LOG_IF_ERROR( |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", |
|
|
|
|
grpc_pollset_work(g_pollset, &worker, |
|
|
|
|
grpc_core::Timestamp::FromTimespecRoundUp( |
|
|
|
@ -396,16 +394,16 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { |
|
|
|
|
bytes_read = |
|
|
|
|
read(fd, buf, bytes_left > read_size ? read_size : bytes_left); |
|
|
|
|
} while (bytes_read < 0 && errno == EINTR); |
|
|
|
|
ASSERT_GE(bytes_read, 0); |
|
|
|
|
GPR_ASSERT(bytes_read >= 0); |
|
|
|
|
for (i = 0; i < bytes_read; ++i) { |
|
|
|
|
ASSERT_EQ(buf[i], current); |
|
|
|
|
GPR_ASSERT(buf[i] == current); |
|
|
|
|
current = (current + 1) % 256; |
|
|
|
|
} |
|
|
|
|
bytes_left -= static_cast<size_t>(bytes_read); |
|
|
|
|
if (bytes_left == 0) break; |
|
|
|
|
} |
|
|
|
|
flags = fcntl(fd, F_GETFL, 0); |
|
|
|
|
ASSERT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0); |
|
|
|
|
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); |
|
|
|
|
|
|
|
|
|
gpr_free(buf); |
|
|
|
|
} |
|
|
|
@ -413,11 +411,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { |
|
|
|
|
/* Verifier for timestamps callback for write_test */ |
|
|
|
|
void timestamps_verifier(void* arg, grpc_core::Timestamps* ts, |
|
|
|
|
grpc_error_handle error) { |
|
|
|
|
ASSERT_TRUE(GRPC_ERROR_IS_NONE(error)); |
|
|
|
|
ASSERT_NE(arg, nullptr); |
|
|
|
|
ASSERT_EQ(ts->sendmsg_time.time.clock_type, GPR_CLOCK_REALTIME); |
|
|
|
|
ASSERT_EQ(ts->scheduled_time.time.clock_type, GPR_CLOCK_REALTIME); |
|
|
|
|
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); |
|
|
|
|
GPR_ASSERT(GRPC_ERROR_IS_NONE(error)); |
|
|
|
|
GPR_ASSERT(arg != nullptr); |
|
|
|
|
GPR_ASSERT(ts->sendmsg_time.time.clock_type == GPR_CLOCK_REALTIME); |
|
|
|
|
GPR_ASSERT(ts->scheduled_time.time.clock_type == GPR_CLOCK_REALTIME); |
|
|
|
|
GPR_ASSERT(ts->acked_time.time.clock_type == GPR_CLOCK_REALTIME); |
|
|
|
|
gpr_atm* done_timestamps = static_cast<gpr_atm*>(arg); |
|
|
|
|
gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1)); |
|
|
|
|
} |
|
|
|
@ -494,7 +492,7 @@ static void write_test(size_t num_bytes, size_t slice_size, |
|
|
|
|
gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
ASSERT_TRUE(GRPC_LOG_IF_ERROR( |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
exec_ctx.Flush(); |
|
|
|
@ -512,7 +510,7 @@ static void write_test(size_t num_bytes, size_t slice_size, |
|
|
|
|
void on_fd_released(void* arg, grpc_error_handle /*errors*/) { |
|
|
|
|
int* done = static_cast<int*>(arg); |
|
|
|
|
*done = 1; |
|
|
|
|
ASSERT_TRUE( |
|
|
|
|
GPR_ASSERT( |
|
|
|
|
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -549,8 +547,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { |
|
|
|
|
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; |
|
|
|
|
ep = |
|
|
|
|
grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test"); |
|
|
|
|
ASSERT_EQ(grpc_tcp_fd(ep), sv[1]); |
|
|
|
|
ASSERT_GE(sv[1], 0); |
|
|
|
|
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); |
|
|
|
|
grpc_endpoint_add_to_pollset(ep, g_pollset); |
|
|
|
|
|
|
|
|
|
written_bytes = fill_socket_partial(sv[0], num_bytes); |
|
|
|
@ -569,7 +566,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
while (state.read_bytes < state.target_read_bytes) { |
|
|
|
|
grpc_pollset_worker* worker = nullptr; |
|
|
|
|
ASSERT_TRUE(GRPC_LOG_IF_ERROR( |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR, |
|
|
|
|
state.read_bytes, state.target_read_bytes); |
|
|
|
@ -577,7 +574,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { |
|
|
|
|
grpc_core::ExecCtx::Get()->Flush(); |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
} |
|
|
|
|
ASSERT_EQ(state.read_bytes, state.target_read_bytes); |
|
|
|
|
GPR_ASSERT(state.read_bytes == state.target_read_bytes); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer_destroy_internal(&state.incoming); |
|
|
|
@ -586,13 +583,13 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
while (!fd_released_done) { |
|
|
|
|
grpc_pollset_worker* worker = nullptr; |
|
|
|
|
ASSERT_TRUE(GRPC_LOG_IF_ERROR( |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
ASSERT_EQ(fd_released_done, 1); |
|
|
|
|
ASSERT_EQ(fd, sv[1]); |
|
|
|
|
GPR_ASSERT(fd_released_done == 1); |
|
|
|
|
GPR_ASSERT(fd == sv[1]); |
|
|
|
|
|
|
|
|
|
written_bytes = fill_socket_partial(sv[0], num_bytes); |
|
|
|
|
drain_socket_blocking(fd, written_bytes, written_bytes); |
|
|
|
@ -671,8 +668,9 @@ static void destroy_pollset(void* p, grpc_error_handle /*error*/) { |
|
|
|
|
grpc_pollset_destroy(static_cast<grpc_pollset*>(p)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TEST(TcpPosixTest, MainTest) { |
|
|
|
|
int main(int argc, char** argv) { |
|
|
|
|
grpc_closure destroyed; |
|
|
|
|
grpc::testing::TestEnvironment env(&argc, argv); |
|
|
|
|
GPR_GLOBAL_CONFIG_SET(grpc_experimental_enable_tcp_frame_size_tuning, true); |
|
|
|
|
grpc_init(); |
|
|
|
|
grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier); |
|
|
|
@ -690,12 +688,12 @@ TEST(TcpPosixTest, MainTest) { |
|
|
|
|
} |
|
|
|
|
grpc_shutdown(); |
|
|
|
|
gpr_free(g_pollset); |
|
|
|
|
|
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#endif /* GRPC_POSIX_SOCKET_TCP */ |
|
|
|
|
#else /* GRPC_POSIX_SOCKET_TCP */ |
|
|
|
|
|
|
|
|
|
int main(int argc, char** argv) { |
|
|
|
|
grpc::testing::TestEnvironment env(&argc, argv); |
|
|
|
|
::testing::InitGoogleTest(&argc, argv); |
|
|
|
|
return RUN_ALL_TESTS(); |
|
|
|
|
} |
|
|
|
|
int main(int argc, char** argv) { return 1; } |
|
|
|
|
|
|
|
|
|
#endif /* GRPC_POSIX_SOCKET_TCP */ |
|
|
|
|