|
|
|
@ -36,6 +36,8 @@ |
|
|
|
|
|
|
|
|
|
#include <gtest/gtest.h> |
|
|
|
|
|
|
|
|
|
#include "absl/log/check.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/grpc.h> |
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
@ -80,11 +82,11 @@ static constexpr int64_t kDeadlineMillis = 20000; |
|
|
|
|
|
|
|
|
|
static void create_sockets(int sv[2]) { |
|
|
|
|
int flags; |
|
|
|
|
GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); |
|
|
|
|
CHECK_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0); |
|
|
|
|
flags = fcntl(sv[0], F_GETFL, 0); |
|
|
|
|
GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); |
|
|
|
|
CHECK_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0); |
|
|
|
|
flags = fcntl(sv[1], F_GETFL, 0); |
|
|
|
|
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); |
|
|
|
|
CHECK_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static ssize_t fill_socket(int fd) { |
|
|
|
@ -101,7 +103,7 @@ static ssize_t fill_socket(int fd) { |
|
|
|
|
total_bytes += write_bytes; |
|
|
|
|
} |
|
|
|
|
} while (write_bytes >= 0 || errno == EINTR); |
|
|
|
|
GPR_ASSERT(errno == EAGAIN); |
|
|
|
|
CHECK(errno == EAGAIN); |
|
|
|
|
return total_bytes; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -143,7 +145,7 @@ static size_t count_slices(grpc_slice* slices, size_t nslices, |
|
|
|
|
for (i = 0; i < nslices; ++i) { |
|
|
|
|
buf = GRPC_SLICE_START_PTR(slices[i]); |
|
|
|
|
for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) { |
|
|
|
|
GPR_ASSERT(buf[j] == *current_data); |
|
|
|
|
CHECK(buf[j] == *current_data); |
|
|
|
|
*current_data = (*current_data + 1) % 256; |
|
|
|
|
} |
|
|
|
|
num_bytes += GRPC_SLICE_LENGTH(slices[i]); |
|
|
|
@ -157,14 +159,14 @@ static void read_cb(void* user_data, grpc_error_handle error) { |
|
|
|
|
size_t read_bytes; |
|
|
|
|
int current_data; |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(error.ok()); |
|
|
|
|
CHECK_OK(error); |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
current_data = state->read_bytes % 256; |
|
|
|
|
// The number of bytes read each time this callback is invoked must be >=
|
|
|
|
|
// the min_progress_size.
|
|
|
|
|
if (grpc_core::IsTcpFrameSizeTuningEnabled()) { |
|
|
|
|
GPR_ASSERT(state->min_progress_size <= state->incoming.length); |
|
|
|
|
CHECK(state->min_progress_size <= state->incoming.length); |
|
|
|
|
} |
|
|
|
|
read_bytes = count_slices(state->incoming.slices, state->incoming.count, |
|
|
|
|
¤t_data); |
|
|
|
@ -172,8 +174,7 @@ static void read_cb(void* user_data, grpc_error_handle error) { |
|
|
|
|
gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes, |
|
|
|
|
state->target_read_bytes); |
|
|
|
|
if (state->read_bytes >= state->target_read_bytes) { |
|
|
|
|
GPR_ASSERT( |
|
|
|
|
GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr))); |
|
|
|
|
CHECK(GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
@ -233,13 +234,13 @@ static void read_test(size_t num_bytes, size_t slice_size, |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
while (state.read_bytes < state.target_read_bytes) { |
|
|
|
|
grpc_pollset_worker* worker = nullptr; |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
CHECK(GRPC_LOG_IF_ERROR("pollset_work", |
|
|
|
|
grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
grpc_core::ExecCtx::Get()->Flush(); |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(state.read_bytes == state.target_read_bytes); |
|
|
|
|
CHECK(state.read_bytes == state.target_read_bytes); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer_destroy(&state.incoming); |
|
|
|
@ -297,13 +298,13 @@ static void large_read_test(size_t slice_size, int min_progress_size) { |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
while (state.read_bytes < state.target_read_bytes) { |
|
|
|
|
grpc_pollset_worker* worker = nullptr; |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
CHECK(GRPC_LOG_IF_ERROR("pollset_work", |
|
|
|
|
grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
grpc_core::ExecCtx::Get()->Flush(); |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(state.read_bytes == state.target_read_bytes); |
|
|
|
|
CHECK(state.read_bytes == state.target_read_bytes); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer_destroy(&state.incoming); |
|
|
|
@ -337,18 +338,18 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size, |
|
|
|
|
(*current_data)++; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(num_bytes_left == 0); |
|
|
|
|
CHECK_EQ(num_bytes_left, 0); |
|
|
|
|
return slices; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void write_done(void* user_data /* write_socket_state */, |
|
|
|
|
grpc_error_handle error) { |
|
|
|
|
GPR_ASSERT(error.ok()); |
|
|
|
|
CHECK_OK(error); |
|
|
|
|
struct write_socket_state* state = |
|
|
|
|
static_cast<struct write_socket_state*>(user_data); |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
state->write_done = 1; |
|
|
|
|
GPR_ASSERT( |
|
|
|
|
CHECK( |
|
|
|
|
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
} |
|
|
|
@ -363,12 +364,12 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { |
|
|
|
|
grpc_core::ExecCtx exec_ctx; |
|
|
|
|
|
|
|
|
|
flags = fcntl(fd, F_GETFL, 0); |
|
|
|
|
GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); |
|
|
|
|
CHECK_EQ(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK), 0); |
|
|
|
|
|
|
|
|
|
for (;;) { |
|
|
|
|
grpc_pollset_worker* worker = nullptr; |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
CHECK(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", |
|
|
|
|
grpc_pollset_work(g_pollset, &worker, |
|
|
|
|
grpc_core::Timestamp::FromTimespecRoundUp( |
|
|
|
@ -379,16 +380,16 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { |
|
|
|
|
bytes_read = |
|
|
|
|
read(fd, buf, bytes_left > read_size ? read_size : bytes_left); |
|
|
|
|
} while (bytes_read < 0 && errno == EINTR); |
|
|
|
|
GPR_ASSERT(bytes_read >= 0); |
|
|
|
|
CHECK_GE(bytes_read, 0); |
|
|
|
|
for (i = 0; i < bytes_read; ++i) { |
|
|
|
|
GPR_ASSERT(buf[i] == current); |
|
|
|
|
CHECK(buf[i] == current); |
|
|
|
|
current = (current + 1) % 256; |
|
|
|
|
} |
|
|
|
|
bytes_left -= static_cast<size_t>(bytes_read); |
|
|
|
|
if (bytes_left == 0) break; |
|
|
|
|
} |
|
|
|
|
flags = fcntl(fd, F_GETFL, 0); |
|
|
|
|
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); |
|
|
|
|
CHECK_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0); |
|
|
|
|
|
|
|
|
|
gpr_free(buf); |
|
|
|
|
} |
|
|
|
@ -453,8 +454,8 @@ static void write_test(size_t num_bytes, size_t slice_size) { |
|
|
|
|
if (state.write_done) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
CHECK(GRPC_LOG_IF_ERROR("pollset_work", |
|
|
|
|
grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
exec_ctx.Flush(); |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
@ -531,7 +532,8 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { |
|
|
|
|
grpc_event_engine::experimental::ChannelArgsEndpointConfig( |
|
|
|
|
grpc_core::ChannelArgs::FromC(&args))), |
|
|
|
|
"test"); |
|
|
|
|
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); |
|
|
|
|
CHECK(grpc_tcp_fd(ep) == sv[1]); |
|
|
|
|
CHECK_GE(sv[1], 0); |
|
|
|
|
} |
|
|
|
|
grpc_endpoint_add_to_pollset(ep, g_pollset); |
|
|
|
|
|
|
|
|
@ -551,23 +553,23 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
while (state.read_bytes < state.target_read_bytes) { |
|
|
|
|
grpc_pollset_worker* worker = nullptr; |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
CHECK(GRPC_LOG_IF_ERROR("pollset_work", |
|
|
|
|
grpc_pollset_work(g_pollset, &worker, deadline))); |
|
|
|
|
gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR, |
|
|
|
|
state.read_bytes, state.target_read_bytes); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
grpc_core::ExecCtx::Get()->Flush(); |
|
|
|
|
gpr_mu_lock(g_mu); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(state.read_bytes == state.target_read_bytes); |
|
|
|
|
CHECK(state.read_bytes == state.target_read_bytes); |
|
|
|
|
gpr_mu_unlock(g_mu); |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer_destroy(&state.incoming); |
|
|
|
|
grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb); |
|
|
|
|
grpc_core::ExecCtx::Get()->Flush(); |
|
|
|
|
rel_fd.notify.WaitForNotificationWithTimeout(absl::Seconds(20)); |
|
|
|
|
GPR_ASSERT(rel_fd.fd_released_done == 1); |
|
|
|
|
GPR_ASSERT(fd == sv[1]); |
|
|
|
|
CHECK_EQ(rel_fd.fd_released_done, 1); |
|
|
|
|
CHECK(fd == sv[1]); |
|
|
|
|
written_bytes = fill_socket_partial(sv[0], num_bytes); |
|
|
|
|
drain_socket_blocking(fd, written_bytes, written_bytes); |
|
|
|
|
written_bytes = fill_socket_partial(fd, num_bytes); |
|
|
|
|