[Fork] Add ObjectGroupForkHandler fork-handling system (#33733)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->

---------

Co-authored-by: Bradley Hess <bdhess@google.com>
Co-authored-by: AJ Heller <hork@google.com>
pull/34400/head^2
Yijie Ma 1 year ago committed by GitHub
parent f87ce8b413
commit d3828ebfbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 60
      CMakeLists.txt
  2. 21
      build_autogenerated.yaml
  3. 10
      src/core/BUILD
  4. 99
      src/core/lib/event_engine/forkable.cc
  5. 60
      src/core/lib/event_engine/forkable.h
  6. 13
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  7. 5
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
  8. 26
      src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
  9. 15
      src/core/lib/event_engine/posix_engine/ev_poll_posix.h
  10. 4
      src/core/lib/event_engine/posix_engine/event_poller.h
  11. 25
      src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
  12. 4
      src/core/lib/event_engine/posix_engine/event_poller_posix_default.h
  13. 42
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  14. 34
      src/core/lib/event_engine/posix_engine/posix_engine.h
  15. 20
      src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
  16. 2
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  17. 2
      src/core/lib/surface/init.cc
  18. 1
      test/core/event_engine/BUILD
  19. 51
      test/core/event_engine/forkable_test.cc
  20. 6
      test/core/event_engine/posix/event_poller_posix_test.cc
  21. 4
      test/core/event_engine/posix/posix_endpoint_test.cc
  22. 24
      test/cpp/end2end/BUILD
  23. 172
      test/cpp/end2end/client_fork_test.cc
  24. 41
      tools/internal_ci/linux/pull_request/grpc_fork_asan.cfg
  25. 22
      tools/run_tests/generated/tests.json

60
CMakeLists.txt generated

@ -946,6 +946,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx client_channel_service_config_test)
add_dependencies(buildtests_cxx client_channel_test)
add_dependencies(buildtests_cxx client_context_test_peer_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx client_fork_test)
endif()
add_dependencies(buildtests_cxx client_interceptors_end2end_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx client_lb_end2end_test)
@ -9153,6 +9156,63 @@ target_link_libraries(client_context_test_peer_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(client_fork_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h
test/cpp/end2end/client_fork_test.cc
)
target_compile_features(client_fork_test PUBLIC cxx_std_14)
target_include_directories(client_fork_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(client_fork_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc++
grpc_test_util
grpc++_test_config
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -6976,6 +6976,27 @@ targets:
deps:
- grpc++_test
- grpc++_test_util
- name: client_fork_test
gtest: true
build: test
language: c++
headers: []
src:
- src/proto/grpc/testing/duplicate/echo_duplicate.proto
- src/proto/grpc/testing/echo.proto
- src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto
- src/proto/grpc/testing/xds/v3/orca_load_report.proto
- test/cpp/end2end/client_fork_test.cc
deps:
- gtest
- grpc++
- grpc_test_util
- grpc++_test_config
platforms:
- linux
- posix
- mac
- name: client_interceptors_end2end_test
gtest: true
build: test

@ -1456,9 +1456,7 @@ grpc_cc_library(
hdrs = [
"lib/event_engine/forkable.h",
],
external_deps = ["absl/base:core_headers"],
deps = [
"no_destruct",
"//:config_vars",
"//:gpr",
"//:gpr_platform",
@ -1601,6 +1599,7 @@ grpc_cc_library(
"event_engine_trace",
"event_engine_work_queue",
"forkable",
"no_destruct",
"notification",
"time",
"useful",
@ -1664,6 +1663,7 @@ grpc_cc_library(
],
deps = [
"event_engine_poller",
"forkable",
"posix_event_engine_closure",
"//:event_engine_base_hdrs",
"//:gpr_platform",
@ -1796,7 +1796,6 @@ grpc_cc_library(
deps = [
"event_engine_poller",
"event_engine_time_util",
"forkable",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
@ -1834,7 +1833,6 @@ grpc_cc_library(
"common_event_engine_closures",
"event_engine_poller",
"event_engine_time_util",
"forkable",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
@ -1859,7 +1857,9 @@ grpc_cc_library(
],
external_deps = ["absl/strings"],
deps = [
"forkable",
"iomgr_port",
"no_destruct",
"posix_event_engine_event_poller",
"posix_event_engine_poller_posix_epoll1",
"posix_event_engine_poller_posix_poll",
@ -2073,8 +2073,10 @@ grpc_cc_library(
"event_engine_thread_pool",
"event_engine_trace",
"event_engine_utils",
"forkable",
"init_internally",
"iomgr_port",
"no_destruct",
"posix_event_engine_base_hdrs",
"posix_event_engine_closure",
"posix_event_engine_endpoint",

@ -26,11 +26,7 @@
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace experimental {
@ -38,86 +34,71 @@ namespace experimental {
grpc_core::TraceFlag grpc_trace_fork(false, "fork");
namespace {
grpc_core::NoDestruct<grpc_core::Mutex> g_mu;
bool g_registered ABSL_GUARDED_BY(g_mu){false};
// This must be ordered because there are ordering dependencies between
// certain fork handlers.
grpc_core::NoDestruct<std::vector<Forkable*>> g_forkables ABSL_GUARDED_BY(g_mu);
bool IsForkEnabled() {
static bool enabled = grpc_core::ConfigVars::Get().EnableForkSupport();
return enabled;
}
} // namespace
Forkable::Forkable() { ManageForkable(this); }
Forkable::~Forkable() { StopManagingForkable(this); }
void RegisterForkHandlers() {
if (IsForkEnabled()) {
grpc_core::MutexLock lock(g_mu.get());
if (!std::exchange(g_registered, true)) {
void ObjectGroupForkHandler::RegisterForkable(
std::shared_ptr<Forkable> forkable, GRPC_UNUSED void (*prepare)(void),
GRPC_UNUSED void (*parent)(void), GRPC_UNUSED void (*child)(void)) {
GPR_ASSERT(!is_forking_);
forkables_.emplace_back(forkable);
#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
GRPC_FORK_TRACE_LOG_STRING("RegisterForkHandlers");
pthread_atfork(PrepareFork, PostforkParent, PostforkChild);
#endif
}
if (!std::exchange(registered_, true)) {
pthread_atfork(prepare, parent, child);
}
#endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
}
void PrepareFork() {
void ObjectGroupForkHandler::Prefork() {
if (IsForkEnabled()) {
GPR_ASSERT(!std::exchange(is_forking_, true));
GRPC_FORK_TRACE_LOG_STRING("PrepareFork");
grpc_core::MutexLock lock(g_mu.get());
for (auto forkable_iter = g_forkables->rbegin();
forkable_iter != g_forkables->rend(); ++forkable_iter) {
(*forkable_iter)->PrepareFork();
for (auto it = forkables_.begin(); it != forkables_.end();) {
auto shared = it->lock();
if (shared) {
shared->PrepareFork();
++it;
} else {
it = forkables_.erase(it);
}
}
GRPC_FORK_TRACE_LOG_STRING("PrepareFork finished");
}
}
void PostforkParent() {
void ObjectGroupForkHandler::PostforkParent() {
if (IsForkEnabled()) {
GPR_ASSERT(is_forking_);
GRPC_FORK_TRACE_LOG_STRING("PostforkParent");
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
GRPC_FORK_TRACE_LOG("Calling PostforkParent for forkable::%p", forkable);
forkable->PostforkParent();
for (auto it = forkables_.begin(); it != forkables_.end();) {
auto shared = it->lock();
if (shared) {
shared->PostforkParent();
++it;
} else {
it = forkables_.erase(it);
}
}
GRPC_FORK_TRACE_LOG_STRING("PostforkParent finished");
is_forking_ = false;
}
}
void PostforkChild() {
void ObjectGroupForkHandler::PostforkChild() {
if (IsForkEnabled()) {
GPR_ASSERT(is_forking_);
GRPC_FORK_TRACE_LOG_STRING("PostforkChild");
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
GRPC_FORK_TRACE_LOG("Calling PostforkChild for forkable::%p", forkable);
forkable->PostforkChild();
for (auto it = forkables_.begin(); it != forkables_.end();) {
auto shared = it->lock();
if (shared) {
shared->PostforkChild();
++it;
} else {
it = forkables_.erase(it);
}
}
GRPC_FORK_TRACE_LOG_STRING("PostforkChild finished");
}
}
void ManageForkable(Forkable* forkable) {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG("Manage forkable::%p", forkable);
grpc_core::MutexLock lock(g_mu.get());
g_forkables->push_back(forkable);
}
}
void StopManagingForkable(Forkable* forkable) {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG("Stop managing forkable::%p", forkable);
grpc_core::MutexLock lock(g_mu.get());
auto iter = std::find(g_forkables->begin(), g_forkables->end(), forkable);
GPR_ASSERT(iter != g_forkables->end());
g_forkables->erase(iter);
is_forking_ = false;
}
}

@ -16,6 +16,9 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include <vector>
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
@ -34,41 +37,44 @@ extern grpc_core::TraceFlag grpc_trace_fork;
#define GRPC_FORK_TRACE_LOG_STRING(format) GRPC_FORK_TRACE_LOG("%s", format)
// Register fork handlers with the system, enabling fork support.
//
// This provides pthread-based support for fork events. Any objects that
// implement Forkable can register themselves with this system using
// ManageForkable, and their respective methods will be called upon fork.
//
// This should be called once upon grpc_initialization.
void RegisterForkHandlers();
// Global callback for pthread_atfork's *prepare argument
void PrepareFork();
// Global callback for pthread_atfork's *parent argument
void PostforkParent();
// Global callback for pthread_atfork's *child argument
void PostforkChild();
// An interface to be implemented by EventEngines that wish to have managed fork
// support.
// support. The child class must guarantee that those methods are thread-safe.
class Forkable {
public:
Forkable();
virtual ~Forkable();
virtual ~Forkable() = default;
virtual void PrepareFork() = 0;
virtual void PostforkParent() = 0;
virtual void PostforkChild() = 0;
};
// Add Forkables from the set of objects that are supported.
// Upon fork, each forkable will have its respective fork hooks called on
// the thread that invoked the fork.
//
// Relative ordering of fork callback operations is not guaranteed.
void ManageForkable(Forkable* forkable);
// Remove a forkable from the managed set.
void StopManagingForkable(Forkable* forkable);
// ObjectGroupForkHandler is meant to be used as a static object in each
// translation unit where Forkables are created and registered with the
// ObjectGroupForkHandler. It essentially provides storage for Forkables'
// instances (as a vector of weak pointers) and helper methods that are meant to
// be invoked inside the fork handlers (see pthread_atfork(3)). The idea is to
// have different Forkables (e.g. PosixEventPoller) to store their instances
// (e.g. a PosixEventPoller object) in a single place separated from other
// Forkables (a sharded approach). Forkables need to register their pthread fork
// handlers and manage the relative ordering themselves. This object is
// thread-unsafe.
class ObjectGroupForkHandler {
public:
// Registers a Forkable with this ObjectGroupForkHandler, the Forkable must be
// created as a shared pointer.
void RegisterForkable(std::shared_ptr<Forkable> forkable,
GRPC_UNUSED void (*prepare)(void),
GRPC_UNUSED void (*parent)(void),
GRPC_UNUSED void (*child)(void));
void Prefork();
void PostforkParent();
void PostforkChild();
private:
GRPC_UNUSED bool registered_ = false;
bool is_forking_ = false;
std::vector<std::weak_ptr<Forkable> > forkables_;
};
} // namespace experimental
} // namespace grpc_event_engine

@ -367,10 +367,7 @@ Epoll1Poller::Epoll1Poller(Scheduler* scheduler)
ForkPollerListAddPoller(this);
}
void Epoll1Poller::Shutdown() {
ForkPollerListRemovePoller(this);
delete this;
}
void Epoll1Poller::Shutdown() { ForkPollerListRemovePoller(this); }
void Epoll1Poller::Close() {
grpc_core::MutexLock lock(&mu_);
@ -564,10 +561,10 @@ void Epoll1Poller::Kick() {
GPR_ASSERT(wakeup_fd_->Wakeup().ok());
}
Epoll1Poller* MakeEpoll1Poller(Scheduler* scheduler) {
std::shared_ptr<Epoll1Poller> MakeEpoll1Poller(Scheduler* scheduler) {
static bool kEpoll1PollerSupported = InitEpoll1PollerLinux();
if (kEpoll1PollerSupported) {
return new Epoll1Poller(scheduler);
return std::make_shared<Epoll1Poller>(scheduler);
}
return nullptr;
}
@ -624,7 +621,9 @@ void Epoll1Poller::Kick() { grpc_core::Crash("unimplemented"); }
// If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
// nullptr.
Epoll1Poller* MakeEpoll1Poller(Scheduler* /*scheduler*/) { return nullptr; }
std::shared_ptr<Epoll1Poller> MakeEpoll1Poller(Scheduler* /*scheduler*/) {
return nullptr;
}
void Epoll1Poller::PrepareFork() {}

@ -27,7 +27,6 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
@ -47,7 +46,7 @@ namespace experimental {
class Epoll1EventHandle;
// Definition of epoll1 based poller.
class Epoll1Poller : public PosixEventPoller, public Forkable {
class Epoll1Poller : public PosixEventPoller {
public:
explicit Epoll1Poller(Scheduler* scheduler);
EventHandle* CreateHandle(int fd, absl::string_view name,
@ -131,7 +130,7 @@ class Epoll1Poller : public PosixEventPoller, public Forkable {
// Return an instance of a epoll1 based poller tied to the specified event
// engine.
Epoll1Poller* MakeEpoll1Poller(Scheduler* scheduler);
std::shared_ptr<Epoll1Poller> MakeEpoll1Poller(Scheduler* scheduler);
} // namespace experimental
} // namespace grpc_event_engine

@ -73,13 +73,13 @@ using Events = absl::InlinedVector<PollEventHandle*, 5>;
class PollEventHandle : public EventHandle {
public:
PollEventHandle(int fd, PollPoller* poller)
PollEventHandle(int fd, std::shared_ptr<PollPoller> poller)
: fd_(fd),
pending_actions_(0),
fork_fd_list_(this),
poller_handles_list_(this),
poller_(poller),
scheduler_(poller->GetScheduler()),
poller_(std::move(poller)),
is_orphaned_(false),
is_shutdown_(false),
closed_(false),
@ -92,11 +92,10 @@ class PollEventHandle : public EventHandle {
read_closure_(reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)),
write_closure_(
reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
poller_->Ref();
grpc_core::MutexLock lock(&poller_->mu_);
poller_->PollerHandlesListAddHandle(this);
}
PollPoller* Poller() override { return poller_; }
PollPoller* Poller() override { return poller_.get(); }
bool SetPendingActions(bool pending_read, bool pending_write) {
pending_actions_ |= pending_read;
if (pending_write) {
@ -184,7 +183,6 @@ class PollEventHandle : public EventHandle {
if (on_done_ != nullptr) {
scheduler_->Run(on_done_);
}
poller_->Unref();
delete this;
}
}
@ -210,8 +208,8 @@ class PollEventHandle : public EventHandle {
int pending_actions_;
PollPoller::HandlesList fork_fd_list_;
PollPoller::HandlesList poller_handles_list_;
PollPoller* poller_;
Scheduler* scheduler_;
std::shared_ptr<PollPoller> poller_;
bool is_orphaned_;
bool is_shutdown_;
bool closed_;
@ -343,7 +341,7 @@ EventHandle* PollPoller::CreateHandle(int fd, absl::string_view /*name*/,
// Avoid unused-parameter warning for debug-only parameter
(void)track_err;
GPR_DEBUG_ASSERT(track_err == false);
PollEventHandle* handle = new PollEventHandle(fd, this);
PollEventHandle* handle = new PollEventHandle(fd, shared_from_this());
ForkFdListAddHandle(handle);
// We need to send a kick to the thread executing Work(..) so that it can
// add this new Fd into the list of Fds to poll.
@ -828,10 +826,7 @@ Poller::WorkResult PollPoller::Work(
return was_kicked_ext ? Poller::WorkResult::kKicked : Poller::WorkResult::kOk;
}
void PollPoller::Shutdown() {
ForkPollerListRemovePoller(this);
Unref();
}
void PollPoller::Shutdown() { ForkPollerListRemovePoller(this); }
void PollPoller::PrepareFork() { Kick(); }
// TODO(vigneshbabu): implement
@ -844,10 +839,11 @@ void PollPoller::Close() {
closed_ = true;
}
PollPoller* MakePollPoller(Scheduler* scheduler, bool use_phony_poll) {
std::shared_ptr<PollPoller> MakePollPoller(Scheduler* scheduler,
bool use_phony_poll) {
static bool kPollPollerSupported = InitPollPollerPosix();
if (kPollPollerSupported) {
return new PollPoller(scheduler, use_phony_poll);
return std::make_shared<PollPoller>(scheduler, use_phony_poll);
}
return nullptr;
}
@ -885,8 +881,8 @@ void PollPoller::Kick() { grpc_core::Crash("unimplemented"); }
// If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
// nullptr.
PollPoller* MakePollPoller(Scheduler* /*scheduler*/,
bool /* use_phony_poll */) {
std::shared_ptr<PollPoller> MakePollPoller(Scheduler* /*scheduler*/,
bool /* use_phony_poll */) {
return nullptr;
}

@ -17,7 +17,6 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <memory>
#include <string>
@ -27,7 +26,6 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
@ -39,7 +37,8 @@ namespace experimental {
class PollEventHandle;
// Definition of poll based poller.
class PollPoller : public PosixEventPoller, public Forkable {
class PollPoller : public PosixEventPoller,
public std::enable_shared_from_this<PollPoller> {
public:
explicit PollPoller(Scheduler* scheduler);
PollPoller(Scheduler* scheduler, bool use_phony_poll);
@ -63,12 +62,6 @@ class PollPoller : public PosixEventPoller, public Forkable {
void Close();
private:
void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); }
void Unref() {
if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
delete this;
}
}
void KickExternal(bool ext);
void PollerHandlesListAddHandle(PollEventHandle* handle)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
@ -84,7 +77,6 @@ class PollPoller : public PosixEventPoller, public Forkable {
};
grpc_core::Mutex mu_;
Scheduler* scheduler_;
std::atomic<int> ref_count_{1};
bool use_phony_poll_;
bool was_kicked_ ABSL_GUARDED_BY(mu_);
bool was_kicked_ext_ ABSL_GUARDED_BY(mu_);
@ -98,7 +90,8 @@ class PollPoller : public PosixEventPoller, public Forkable {
// It use_phony_poll is true, it implies that the poller is declared
// non-polling and any attempt to schedule a blocking poll will result in a
// crash failure.
PollPoller* MakePollPoller(Scheduler* scheduler, bool use_phony_poll);
std::shared_ptr<PollPoller> MakePollPoller(Scheduler* scheduler,
bool use_phony_poll);
} // namespace experimental
} // namespace grpc_event_engine

@ -24,6 +24,7 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
@ -86,7 +87,8 @@ class EventHandle {
virtual ~EventHandle() = default;
};
class PosixEventPoller : public grpc_event_engine::experimental::Poller {
class PosixEventPoller : public grpc_event_engine::experimental::Poller,
public Forkable {
public:
// Return an opaque handle to perform actions on the provided file descriptor.
virtual EventHandle* CreateHandle(int fd, absl::string_view name,

@ -14,15 +14,18 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include <string>
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h"
#include "src/core/lib/event_engine/posix_engine/ev_poll_posix.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/iomgr/port.h"
namespace grpc_event_engine {
@ -30,15 +33,25 @@ namespace experimental {
#ifdef GRPC_POSIX_SOCKET_TCP
namespace {
// TODO(yijiem): this object is thread-unsafe, if we are creating pollers in
// multiple threads (e.g. multiple event engines) or if we are creating pollers
// while we are forking then we will run into issues.
grpc_core::NoDestruct<ObjectGroupForkHandler> g_poller_fork_manager;
class PollerForkCallbackMethods {
public:
static void Prefork() { g_poller_fork_manager->Prefork(); }
static void PostforkParent() { g_poller_fork_manager->PostforkParent(); }
static void PostforkChild() { g_poller_fork_manager->PostforkChild(); }
};
bool PollStrategyMatches(absl::string_view strategy, absl::string_view want) {
return strategy == "all" || strategy == want;
}
} // namespace
PosixEventPoller* MakeDefaultPoller(Scheduler* scheduler) {
PosixEventPoller* poller = nullptr;
std::shared_ptr<PosixEventPoller> MakeDefaultPoller(Scheduler* scheduler) {
std::shared_ptr<PosixEventPoller> poller;
auto strings =
absl::StrSplit(grpc_core::ConfigVars::Get().PollStrategy(), ',');
for (auto it = strings.begin(); it != strings.end() && poller == nullptr;
@ -55,12 +68,16 @@ PosixEventPoller* MakeDefaultPoller(Scheduler* scheduler) {
poller = MakePollPoller(scheduler, /*use_phony_poll=*/true);
}
}
g_poller_fork_manager->RegisterForkable(
poller, PollerForkCallbackMethods::Prefork,
PollerForkCallbackMethods::PostforkParent,
PollerForkCallbackMethods::PostforkChild);
return poller;
}
#else // GRPC_POSIX_SOCKET_TCP
PosixEventPoller* MakeDefaultPoller(Scheduler* /*scheduler*/) {
std::shared_ptr<PosixEventPoller> MakeDefaultPoller(Scheduler* /*scheduler*/) {
return nullptr;
}

@ -17,6 +17,8 @@
#include <grpc/support/port_platform.h>
#include <memory>
namespace grpc_event_engine {
namespace experimental {
@ -25,7 +27,7 @@ class Scheduler;
// Return an instance of an event poller which is tied to the specified
// scheduler.
PosixEventPoller* MakeDefaultPoller(Scheduler* scheduler);
std::shared_ptr<PosixEventPoller> MakeDefaultPoller(Scheduler* scheduler);
} // namespace experimental
} // namespace grpc_event_engine

@ -37,6 +37,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/grpc_polled_fd.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix.h"
@ -48,6 +49,7 @@
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/sync.h"
#ifdef GRPC_POSIX_SOCKET_TCP
@ -77,6 +79,19 @@ using namespace std::chrono_literals;
namespace grpc_event_engine {
namespace experimental {
namespace {
grpc_core::NoDestruct<ObjectGroupForkHandler> g_timer_fork_manager;
class TimerForkCallbackMethods {
public:
static void Prefork() { g_timer_fork_manager->Prefork(); }
static void PostforkParent() { g_timer_fork_manager->PostforkParent(); }
static void PostforkChild() { g_timer_fork_manager->PostforkChild(); }
};
} // namespace
#ifdef GRPC_POSIX_SOCKET_TCP
void AsyncConnect::Start(EventEngine::Duration timeout) {
@ -303,8 +318,9 @@ PosixEnginePollerManager::PosixEnginePollerManager(
executor_(std::move(executor)),
trigger_shutdown_called_(false) {}
PosixEnginePollerManager::PosixEnginePollerManager(PosixEventPoller* poller)
: poller_(poller),
PosixEnginePollerManager::PosixEnginePollerManager(
std::shared_ptr<PosixEventPoller> poller)
: poller_(std::move(poller)),
poller_state_(PollerState::kExternal),
executor_(nullptr),
trigger_shutdown_called_(false) {
@ -343,10 +359,14 @@ PosixEnginePollerManager::~PosixEnginePollerManager() {
}
}
PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
PosixEventEngine::PosixEventEngine(std::shared_ptr<PosixEventPoller> poller)
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u))),
timer_manager_(executor_) {
timer_manager_(std::make_shared<TimerManager>(executor_)) {
g_timer_fork_manager->RegisterForkable(
timer_manager_, TimerForkCallbackMethods::Prefork,
TimerForkCallbackMethods::PostforkParent,
TimerForkCallbackMethods::PostforkChild);
#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
poller_manager_ = std::make_shared<PosixEnginePollerManager>(poller);
#endif
@ -355,7 +375,11 @@ PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
PosixEventEngine::PosixEventEngine()
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u))),
timer_manager_(executor_) {
timer_manager_(std::make_shared<TimerManager>(executor_)) {
g_timer_fork_manager->RegisterForkable(
timer_manager_, TimerForkCallbackMethods::Prefork,
TimerForkCallbackMethods::PostforkParent,
TimerForkCallbackMethods::PostforkChild);
#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
// The threadpool must be instantiated after the poller otherwise, the
@ -435,7 +459,7 @@ PosixEventEngine::~PosixEventEngine() {
}
GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
}
timer_manager_.Shutdown();
timer_manager_->Shutdown();
#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
if (poller_manager_ != nullptr) {
poller_manager_->TriggerShutdown();
@ -448,7 +472,7 @@ bool PosixEventEngine::Cancel(EventEngine::TaskHandle handle) {
grpc_core::MutexLock lock(&mu_);
if (!known_handles_.contains(handle)) return false;
auto* cd = reinterpret_cast<ClosureData*>(handle.keys[0]);
bool r = timer_manager_.TimerCancel(&cd->timer);
bool r = timer_manager_->TimerCancel(&cd->timer);
known_handles_.erase(handle);
if (r) delete cd;
return r;
@ -478,7 +502,7 @@ EventEngine::TaskHandle PosixEventEngine::RunAfterInternal(
Run(std::move(cb));
return TaskHandle::kInvalid;
}
auto when_ts = ToTimestamp(timer_manager_.Now(), when);
auto when_ts = ToTimestamp(timer_manager_->Now(), when);
auto* cd = new ClosureData;
cd->cb = std::move(cb);
cd->engine = this;
@ -489,7 +513,7 @@ EventEngine::TaskHandle PosixEventEngine::RunAfterInternal(
cd->handle = handle;
GRPC_EVENT_ENGINE_TRACE("PosixEventEngine:%p scheduling callback:%s", this,
HandleToString(handle).c_str());
timer_manager_.TimerInit(&cd->timer, when_ts, cd);
timer_manager_->TimerInit(&cd->timer, when_ts, cd);
return handle;
}

@ -104,9 +104,10 @@ class PosixEnginePollerManager
public:
explicit PosixEnginePollerManager(std::shared_ptr<ThreadPool> executor);
explicit PosixEnginePollerManager(
grpc_event_engine::experimental::PosixEventPoller* poller);
std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
poller);
grpc_event_engine::experimental::PosixEventPoller* Poller() {
return poller_;
return poller_.get();
}
ThreadPool* Executor() { return executor_.get(); }
@ -124,7 +125,7 @@ class PosixEnginePollerManager
private:
enum class PollerState { kExternal, kOk, kShuttingDown };
grpc_event_engine::experimental::PosixEventPoller* poller_ = nullptr;
std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller> poller_;
std::atomic<PollerState> poller_state_{PollerState::kOk};
std::shared_ptr<ThreadPool> executor_;
bool trigger_shutdown_called_;
@ -160,11 +161,13 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
};
#ifdef GRPC_POSIX_SOCKET_TCP
// Constructs an EventEngine which does not own the poller. Do not call this
// constructor directly. Instead use the MakeTestOnlyPosixEventEngine static
// method. Its expected to be used only in tests.
// Constructs an EventEngine which has a shared ownership of the poller. Do
// not call this constructor directly. Instead use the
// MakeTestOnlyPosixEventEngine static method. Its expected to be used only in
// tests.
explicit PosixEventEngine(
grpc_event_engine::experimental::PosixEventPoller* poller);
std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
poller);
PosixEventEngine();
#else // GRPC_POSIX_SOCKET_TCP
PosixEventEngine();
@ -210,14 +213,15 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
bool Cancel(TaskHandle handle) override;
#ifdef GRPC_POSIX_SOCKET_TCP
// The posix EventEngine returned by this method would not own the poller
// and would not be in-charge of driving the poller by calling its Work(..)
// method. Instead its upto the test to drive the poller. The returned posix
// EventEngine will also not attempt to shutdown the poller since it does not
// own it.
// The posix EventEngine returned by this method would have a shared ownership
// of the poller and would not be in-charge of driving the poller by calling
// its Work(..) method. Instead its upto the test to drive the poller. The
// returned posix EventEngine will also not attempt to shutdown the poller
// since it does not own it.
static std::shared_ptr<PosixEventEngine> MakeTestOnlyPosixEventEngine(
grpc_event_engine::experimental::PosixEventPoller* test_only_poller) {
return std::make_shared<PosixEventEngine>(test_only_poller);
std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
test_only_poller) {
return std::make_shared<PosixEventEngine>(std::move(test_only_poller));
}
#endif // GRPC_POSIX_SOCKET_TCP
@ -255,7 +259,7 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
std::atomic<intptr_t> aba_token_{0};
std::shared_ptr<ThreadPool> executor_;
TimerManager timer_manager_;
std::shared_ptr<TimerManager> timer_manager_;
#ifdef GRPC_POSIX_SOCKET_TCP
std::shared_ptr<PosixEnginePollerManager> poller_manager_;
#endif // GRPC_POSIX_SOCKET_TCP

@ -19,16 +19,34 @@
#include <grpc/support/cpu.h>
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/no_destruct.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
grpc_core::NoDestruct<ObjectGroupForkHandler> g_thread_pool_fork_manager;
class ThreadPoolForkCallbackMethods {
public:
static void Prefork() { g_thread_pool_fork_manager->Prefork(); }
static void PostforkParent() { g_thread_pool_fork_manager->PostforkParent(); }
static void PostforkChild() { g_thread_pool_fork_manager->PostforkChild(); }
};
} // namespace
std::shared_ptr<ThreadPool> MakeThreadPool(size_t /* reserve_threads */) {
return std::make_shared<WorkStealingThreadPool>(
auto thread_pool = std::make_shared<WorkStealingThreadPool>(
grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u));
g_thread_pool_fork_manager->RegisterForkable(
thread_pool, ThreadPoolForkCallbackMethods::Prefork,
ThreadPoolForkCallbackMethods::PostforkParent,
ThreadPoolForkCallbackMethods::PostforkChild);
return thread_pool;
}
} // namespace experimental

@ -213,6 +213,7 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread() {
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
gpr_log(GPR_INFO, "WorkStealingThreadPoolImpl::Quiesce");
SetShutdown(true);
// Wait until all threads have exited.
// Note that if this is a threadpool thread then we won't exit this thread
@ -258,6 +259,7 @@ bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsQuiesced() {
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() {
gpr_log(GPR_INFO, "WorkStealingThreadPoolImpl::PrepareFork");
SetForking(true);
work_signal_.SignalAll();
living_thread_count_.BlockUntilThreadCount(0, "forking");

@ -32,7 +32,6 @@
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/experiments/config.h"
#include "src/core/lib/gprpp/fork.h"
@ -96,7 +95,6 @@ static void do_basic_init(void) {
gpr_time_init();
grpc_core::PrintExperimentsList();
grpc_core::Fork::GlobalInit();
grpc_event_engine::experimental::RegisterForkHandlers();
grpc_fork_handlers_auto_register();
grpc_tracer_init();
grpc_client_channel_global_init_backup_polling();

@ -47,6 +47,7 @@ grpc_cc_test(
"//:gpr",
"//:gpr_platform",
"//src/core:forkable",
"//src/core:no_destruct",
],
)

@ -31,10 +31,20 @@
#include <grpc/support/log.h>
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/gprpp/no_destruct.h"
namespace {
using ::grpc_event_engine::experimental::Forkable;
using ::grpc_event_engine::experimental::RegisterForkHandlers;
using ::grpc_event_engine::experimental::ObjectGroupForkHandler;
grpc_core::NoDestruct<ObjectGroupForkHandler> g_forkable_manager;
class ForkCallbackMethods {
public:
static void Prefork() { g_forkable_manager->Prefork(); }
static void PostforkParent() { g_forkable_manager->PostforkParent(); }
static void PostforkChild() { g_forkable_manager->PostforkChild(); }
};
} // namespace
class ForkableTest : public testing::Test {};
@ -77,16 +87,19 @@ TEST_F(ForkableTest, BasicPthreadAtForkOperations) {
bool child_called_ = false;
};
SomeForkable forkable;
auto forkable = std::make_shared<SomeForkable>();
g_forkable_manager->RegisterForkable(forkable, ForkCallbackMethods::Prefork,
ForkCallbackMethods::PostforkParent,
ForkCallbackMethods::PostforkChild);
int child_pid = fork();
ASSERT_NE(child_pid, -1);
if (child_pid == 0) {
gpr_log(GPR_DEBUG, "I am child pid: %d", getpid());
forkable.CheckChild();
forkable->CheckChild();
exit(testing::Test::HasFailure());
} else {
gpr_log(GPR_DEBUG, "I am parent pid: %d", getpid());
forkable.CheckParent();
forkable->CheckParent();
int status;
gpr_log(GPR_DEBUG, "Waiting for child pid: %d", child_pid);
do {
@ -108,7 +121,7 @@ TEST_F(ForkableTest, NonPthreadManualForkOperations) {
// Manually simulates a fork event for non-pthread-enabled environments
#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
// This platform does not need to exercise fork support manually.
GTEST_SKIP("Unnecessary test, this platform supports pthreads.");
GTEST_SKIP() << "Unnecessary test, this platform supports pthreads.";
#endif
class SomeForkable : public Forkable {
@ -129,14 +142,25 @@ TEST_F(ForkableTest, NonPthreadManualForkOperations) {
bool child_called_ = false;
};
SomeForkable forkable;
forkable.AssertStates(/*prepare=*/false, /*parent=*/false, /*child=*/false);
grpc_event_engine::experimental::PrepareFork();
forkable.AssertStates(/*prepare=*/true, /*parent=*/false, /*child=*/false);
grpc_event_engine::experimental::PostforkParent();
forkable.AssertStates(/*prepare=*/true, /*parent=*/true, /*child=*/false);
grpc_event_engine::experimental::PostforkChild();
forkable.AssertStates(/*prepare=*/true, /*parent=*/true, /*child=*/true);
ObjectGroupForkHandler forkable_manager;
class NoopForkCallbackMethods {
public:
static void Prefork() {}
static void PostforkParent() {}
static void PostforkChild() {}
};
auto forkable = std::make_shared<SomeForkable>();
forkable_manager.RegisterForkable(forkable, NoopForkCallbackMethods::Prefork,
NoopForkCallbackMethods::PostforkParent,
NoopForkCallbackMethods::PostforkChild);
forkable->AssertStates(/*prepare=*/false, /*parent=*/false, /*child=*/false);
forkable_manager.Prefork();
forkable->AssertStates(/*prepare=*/true, /*parent=*/false, /*child=*/false);
forkable_manager.PostforkParent();
forkable->AssertStates(/*prepare=*/true, /*parent=*/true, /*child=*/false);
forkable_manager.Prefork();
forkable_manager.PostforkChild();
forkable->AssertStates(/*prepare=*/true, /*parent=*/true, /*child=*/true);
}
int main(int argc, char** argv) {
@ -145,7 +169,6 @@ int main(int argc, char** argv) {
grpc_core::ConfigVars::Overrides config_overrides;
config_overrides.enable_fork_support = true;
grpc_core::ConfigVars::SetOverrides(config_overrides);
RegisterForkHandlers();
auto result = RUN_ALL_TESTS();
return result;
}

@ -71,8 +71,8 @@
#include "test/core/util/port.h"
static gpr_mu g_mu;
static grpc_event_engine::experimental::PosixEventPoller* g_event_poller =
nullptr;
static std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
g_event_poller;
// buffer size used to send and receive data.
// 1024 is the minimal value to set TCP send and receive buffer.
@ -695,7 +695,7 @@ TEST_F(EventPollerTest, TestMultipleHandles) {
if (g_event_poller == nullptr) {
return;
}
Worker* worker = new Worker(Scheduler(), g_event_poller, kNumHandles,
Worker* worker = new Worker(Scheduler(), g_event_poller.get(), kNumHandles,
kNumWakeupsPerHandle);
worker->Start();
worker->Wait();

@ -221,10 +221,10 @@ class PosixEndpointTest : public ::testing::TestWithParam<bool> {
std::shared_ptr<EventEngine> GetOracleEE() { return oracle_ee_; }
PosixEventPoller* PosixPoller() { return poller_; }
PosixEventPoller* PosixPoller() { return poller_.get(); }
private:
PosixEventPoller* poller_;
std::shared_ptr<PosixEventPoller> poller_;
std::unique_ptr<TestScheduler> scheduler_;
std::shared_ptr<EventEngine> posix_ee_;
std::shared_ptr<EventEngine> oracle_ee_;

@ -195,6 +195,30 @@ grpc_cc_binary(
],
)
grpc_cc_test(
name = "client_fork_test",
srcs = ["client_fork_test.cc"],
external_deps = [
"absl/strings",
"gtest",
],
tags = [
"fork_test",
"no_test_ios",
"no_windows",
],
deps = [
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_config",
],
)
grpc_cc_test(
name = "client_callback_end2end_test",
srcs = ["client_callback_end2end_test.cc"],

@ -0,0 +1,172 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#ifndef GRPC_ENABLE_FORK_SUPPORT
// No-op for builds without fork support.
int main(int /* argc */, char** /* argv */) { return 0; }
#else // GRPC_ENABLE_FORK_SUPPORT
#include <signal.h>
#include <gtest/gtest.h>
#include "absl/strings/str_cat.h"
#include <grpc/fork.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include "src/core/lib/gprpp/fork.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/test_config.h"
namespace grpc {
namespace testing {
namespace {
class ServiceImpl final : public EchoTestService::Service {
Status BidiStream(
ServerContext* /*context*/,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
EchoRequest request;
EchoResponse response;
while (stream->Read(&request)) {
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
stream->Write(response);
gpr_log(GPR_INFO, "wrote msg %s", response.message().c_str());
}
return Status::OK;
}
};
std::unique_ptr<EchoTestService::Stub> MakeStub(const std::string& addr) {
return EchoTestService::NewStub(
grpc::CreateChannel(addr, InsecureChannelCredentials()));
}
TEST(ClientForkTest, ClientCallsBeforeAndAfterForkSucceed) {
grpc_core::Fork::Enable(true);
int port = grpc_pick_unused_port_or_die();
std::string addr = absl::StrCat("localhost:", port);
pid_t server_pid = fork();
switch (server_pid) {
case -1: // fork failed
GTEST_FAIL() << "failure forking";
case 0: // post-fork child
{
ServiceImpl impl;
grpc::ServerBuilder builder;
builder.AddListeningPort(addr, grpc::InsecureServerCredentials());
builder.RegisterService(&impl);
std::unique_ptr<Server> server(builder.BuildAndStart());
server->Wait();
return;
}
default: // post-fork parent
break;
}
// Do a round trip before we fork.
// NOTE: without this scope, test running with the epoll1 poller will fail.
{
std::unique_ptr<EchoTestService::Stub> stub = MakeStub(addr);
EchoRequest request;
EchoResponse response;
ClientContext context;
context.set_wait_for_ready(true);
auto stream = stub->BidiStream(&context);
request.set_message("Hello");
ASSERT_TRUE(stream->Write(request));
ASSERT_TRUE(stream->Read(&response));
ASSERT_EQ(response.message(), request.message());
}
// Fork and do round trips in the post-fork parent and child.
pid_t child_client_pid = fork();
switch (child_client_pid) {
case -1: // fork failed
GTEST_FAIL() << "fork failed";
case 0: // post-fork child
{
gpr_log(GPR_DEBUG, "In post-fork child");
EchoRequest request;
EchoResponse response;
ClientContext context;
context.set_wait_for_ready(true);
std::unique_ptr<EchoTestService::Stub> stub = MakeStub(addr);
auto stream = stub->BidiStream(&context);
request.set_message("Hello again from child");
ASSERT_TRUE(stream->Write(request));
ASSERT_TRUE(stream->Read(&response));
ASSERT_EQ(response.message(), request.message());
exit(0);
}
default: // post-fork parent
{
gpr_log(GPR_DEBUG, "In post-fork parent");
EchoRequest request;
EchoResponse response;
ClientContext context;
context.set_wait_for_ready(true);
std::unique_ptr<EchoTestService::Stub> stub = MakeStub(addr);
auto stream = stub->BidiStream(&context);
request.set_message("Hello again from parent");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message());
// Wait for the post-fork child to exit; ensure it exited cleanly.
int child_status;
ASSERT_EQ(waitpid(child_client_pid, &child_status, 0), child_client_pid)
<< "failed to get status of child client";
ASSERT_EQ(WEXITSTATUS(child_status), 0) << "child did not exit cleanly";
}
}
kill(server_pid, SIGINT);
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
grpc::testing::InitTest(&argc, &argv, true);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
int res = RUN_ALL_TESTS();
grpc_shutdown();
return res;
}
#endif // GRPC_ENABLE_FORK_SUPPORT

@ -0,0 +1,41 @@
# Copyright 2022 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/linux/grpc_bazel_rbe.sh"
timeout_mins: 90
action {
define_artifacts {
regex: "**/*sponge_log.*"
regex: "github/grpc/reports/**"
}
}
gfile_resources: "/bigstore/grpc-testing-secrets/gcp_credentials/resultstore_api_key"
bazel_setting {
# In order for Kokoro to recognize this as a bazel build and publish the bazel resultstore link,
# the bazel_setting section needs to be present and "upsalite_frontend_address" needs to be
# set. The rest of configuration from bazel_setting is unused (we configure everything when bazel
# command is invoked).
upsalite_frontend_address: "https://source.cloud.google.com"
}
env_vars {
# flags will be passed to bazel invocation
key: "BAZEL_FLAGS"
value: "--config=asan --test_tag_filter=fork_test --config=fork_support"
}

@ -2113,6 +2113,28 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "client_fork_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save