Event Engine related minor cleanup (#31188)

* Minor event engine related cleanup

* regenerate_projects

* fix test

* iwyu
pull/31187/head^2
Vignesh Babu 3 years ago committed by GitHub
parent 92d380735d
commit e7ad460c7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
  2. 20
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
  3. 6
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
  4. 30
      test/core/event_engine/posix/event_poller_posix_test.cc

@ -37,17 +37,21 @@ bool PollStrategyMatches(absl::string_view strategy, absl::string_view want) {
} // namespace
PosixEventPoller* GetDefaultPoller(Scheduler* scheduler) {
grpc_core::UniquePtr<char> poll_strategy =
GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
static const char* poll_strategy =
GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy).release();
PosixEventPoller* poller = nullptr;
auto strings = absl::StrSplit(poll_strategy.get(), ',');
auto strings = absl::StrSplit(poll_strategy, ',');
for (auto it = strings.begin(); it != strings.end() && poller == nullptr;
it++) {
if (PollStrategyMatches(*it, "epoll1")) {
poller = GetEpoll1Poller(scheduler);
} else if (PollStrategyMatches(*it, "poll")) {
}
if (poller == nullptr && PollStrategyMatches(*it, "poll")) {
// If epoll1 fails and if poll strategy matches "poll", use Poll poller
poller = GetPollPoller(scheduler, /*use_phony_poll=*/false);
} else if (PollStrategyMatches(*it, "none")) {
} else if (poller == nullptr && PollStrategyMatches(*it, "none")) {
// If epoll1 fails and if poll strategy matches "none", use phony poll
// poller.
poller = GetPollPoller(scheduler, /*use_phony_poll=*/true);
}
}

@ -16,7 +16,6 @@
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include <arpa/inet.h>
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
@ -33,6 +32,7 @@
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include <arpa/inet.h> // IWYU pragma: keep
#ifdef GRPC_LINUX_TCP_H
#include <linux/tcp.h>
#else
@ -336,7 +336,8 @@ absl::StatusOr<std::string> SockaddrToString(
if (ip != nullptr &&
inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != nullptr) {
if (sin6_scope_id != 0) {
// Enclose sin6_scope_id with the format defined in RFC 6874 section 2.
// Enclose sin6_scope_id with the format defined in RFC 6874
// section 2.
std::string host_with_scope =
absl::StrFormat("%s%%%" PRIu32, ntop_buf, sin6_scope_id);
out = grpc_core::JoinHostPort(host_with_scope, port);
@ -347,7 +348,8 @@ absl::StatusOr<std::string> SockaddrToString(
return absl::InvalidArgumentError(
absl::StrCat("Unknown sockaddr family: ", addr->sa_family));
}
// This is probably redundant, but we wouldn't want to log the wrong error.
// This is probably redundant, but we wouldn't want to log the wrong
// error.
errno = save_errno;
return out;
}
@ -550,8 +552,8 @@ bool PosixSocketWrapper::IsSocketReusePortSupported() {
int s = socket(AF_INET, SOCK_STREAM, 0);
if (s < 0) {
// This might be an ipv6-only environment in which case
// 'socket(AF_INET,..)' call would fail. Try creating IPv6 socket in that
// case
// 'socket(AF_INET,..)' call would fail. Try creating IPv6 socket in
// that case
s = socket(AF_INET6, SOCK_STREAM, 0);
}
if (s >= 0) {
@ -930,16 +932,15 @@ bool PosixSocketWrapper::SetSocketDualStack() {
GPR_ASSERT(false && "unimplemented");
}
static bool PosixSocketWrapper::IsSocketReusePortSupported() {
bool PosixSocketWrapper::IsSocketReusePortSupported() {
GPR_ASSERT(false && "unimplemented");
}
static bool PosixSocketWrapper::IsIpv6LoopbackAvailable() {
bool PosixSocketWrapper::IsIpv6LoopbackAvailable() {
GPR_ASSERT(false && "unimplemented");
}
static absl::StatusOr<PosixSocketWrapper>
PosixSocketWrapper::CreateDualStackSocket(
absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
std::function<int(int /*domain*/, int /*type*/, int /*protocol*/)>
/* socket_factory */,
const experimental::EventEngine::ResolvedAddress& /*addr*/, int /*type*/,
@ -953,7 +954,6 @@ PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
const EventEngine::ResolvedAddress& /*target_addr*/) {
GPR_ASSERT(false && "unimplemented");
}
}
#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */

@ -17,8 +17,6 @@
#include <grpc/support/port_platform.h>
#include <sys/socket.h>
#include <functional>
#include <string>
#include <utility>
@ -36,6 +34,10 @@
#include "src/core/lib/iomgr/socket_mutator.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include <sys/socket.h>
#endif
#ifdef GRPC_LINUX_ERRQUEUE
#ifndef SO_ZEROCOPY
#define SO_ZEROCOPY 60

@ -25,6 +25,7 @@
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "gtest/gtest.h"
@ -375,12 +376,7 @@ void WaitAndShutdown(server* sv, client* cl) {
gpr_mu_unlock(&g_mu);
}
std::string TestScenarioName(
const ::testing::TestParamInfo<std::string>& info) {
return info.param;
}
class EventPollerTest : public ::testing::TestWithParam<std::string> {
class EventPollerTest : public ::testing::Test {
void SetUp() override {
engine_ =
std::make_unique<grpc_event_engine::experimental::PosixEventEngine>();
@ -389,10 +385,9 @@ class EventPollerTest : public ::testing::TestWithParam<std::string> {
std::make_unique<grpc_event_engine::posix_engine::TestScheduler>(
engine_.get());
EXPECT_NE(scheduler_, nullptr);
GPR_GLOBAL_CONFIG_SET(grpc_poll_strategy, GetParam().c_str());
g_event_poller = GetDefaultPoller(scheduler_.get());
if (g_event_poller != nullptr) {
EXPECT_EQ(g_event_poller->Name(), GetParam());
gpr_log(GPR_INFO, "Using poller: %s", g_event_poller->Name().c_str());
}
}
@ -413,7 +408,7 @@ class EventPollerTest : public ::testing::TestWithParam<std::string> {
// Test grpc_fd. Start an upload server and client, upload a stream of bytes
// from the client to the server, and verify that the total number of sent
// bytes is equal to the total number of received bytes.
TEST_P(EventPollerTest, TestEventPollerHandle) {
TEST_F(EventPollerTest, TestEventPollerHandle) {
server sv;
client cl;
int port;
@ -455,7 +450,7 @@ void SecondReadCallback(FdChangeData* fdc, absl::Status /*status*/) {
// Note that we have two different but almost identical callbacks above -- the
// point is to have two different function pointers and two different data
// pointers and make sure that changing both really works.
TEST_P(EventPollerTest, TestEventPollerHandleChange) {
TEST_F(EventPollerTest, TestEventPollerHandleChange) {
EventHandle* em_fd;
FdChangeData a, b;
int flags;
@ -692,7 +687,7 @@ class Worker : public grpc_core::DualRefCounted<Worker> {
// immediately and schedule the wait for the next read event. A new read event
// is also generated for each fd in parallel after the previous one is
// processed.
TEST_P(EventPollerTest, TestMultipleHandles) {
TEST_F(EventPollerTest, TestMultipleHandles) {
static constexpr int kNumHandles = 100;
static constexpr int kNumWakeupsPerHandle = 100;
if (g_event_poller == nullptr) {
@ -704,11 +699,6 @@ TEST_P(EventPollerTest, TestMultipleHandles) {
worker->Wait();
}
INSTANTIATE_TEST_SUITE_P(PosixEventPoller, EventPollerTest,
::testing::ValuesIn({std::string("epoll1"),
std::string("poll")}),
&TestScenarioName);
} // namespace
} // namespace posix_engine
} // namespace grpc_event_engine
@ -716,6 +706,14 @@ INSTANTIATE_TEST_SUITE_P(PosixEventPoller, EventPollerTest,
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
gpr_mu_init(&g_mu);
grpc_core::UniquePtr<char> poll_strategy =
GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
auto strings = absl::StrSplit(poll_strategy.get(), ',');
if (std::find(strings.begin(), strings.end(), "none") != strings.end()) {
// Skip the test entirely if poll strategy is none.
return 0;
}
return RUN_ALL_TESTS();
}

Loading…
Cancel
Save