diff --git a/BUILD b/BUILD index 81de8203a03..98622fd7ecc 100644 --- a/BUILD +++ b/BUILD @@ -724,6 +724,7 @@ grpc_cc_library( "src/core/lib/iomgr/endpoint_pair_windows.cc", "src/core/lib/iomgr/error.cc", "src/core/lib/iomgr/error_cfstream.cc", + "src/core/lib/iomgr/ev_apple.cc", "src/core/lib/iomgr/ev_epoll1_linux.cc", "src/core/lib/iomgr/ev_epollex_linux.cc", "src/core/lib/iomgr/ev_poll_posix.cc", @@ -885,6 +886,7 @@ grpc_cc_library( "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/error_cfstream.h", "src/core/lib/iomgr/error_internal.h", + "src/core/lib/iomgr/ev_apple.h", "src/core/lib/iomgr/ev_epoll1_linux.h", "src/core/lib/iomgr/ev_epollex_linux.h", "src/core/lib/iomgr/ev_poll_posix.h", @@ -989,7 +991,6 @@ grpc_cc_library( ], language = "c++", public_hdrs = GRPC_PUBLIC_HDRS, - use_cfstream = True, deps = [ "eventmanager_libuv", "gpr_base", diff --git a/BUILD.gn b/BUILD.gn index 129ad2b2639..bc56c6e571f 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -604,6 +604,8 @@ config("grpc_config") { "src/core/lib/iomgr/error_cfstream.cc", "src/core/lib/iomgr/error_cfstream.h", "src/core/lib/iomgr/error_internal.h", + "src/core/lib/iomgr/ev_apple.cc", + "src/core/lib/iomgr/ev_apple.h", "src/core/lib/iomgr/ev_epoll1_linux.cc", "src/core/lib/iomgr/ev_epoll1_linux.h", "src/core/lib/iomgr/ev_epollex_linux.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 4fc05f7c174..7c2ed457f9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1512,6 +1512,7 @@ add_library(grpc src/core/lib/iomgr/endpoint_pair_windows.cc src/core/lib/iomgr/error.cc src/core/lib/iomgr/error_cfstream.cc + src/core/lib/iomgr/ev_apple.cc src/core/lib/iomgr/ev_epoll1_linux.cc src/core/lib/iomgr/ev_epollex_linux.cc src/core/lib/iomgr/ev_poll_posix.cc @@ -2165,6 +2166,7 @@ add_library(grpc_unsecure src/core/lib/iomgr/endpoint_pair_windows.cc src/core/lib/iomgr/error.cc src/core/lib/iomgr/error_cfstream.cc + src/core/lib/iomgr/ev_apple.cc src/core/lib/iomgr/ev_epoll1_linux.cc src/core/lib/iomgr/ev_epollex_linux.cc src/core/lib/iomgr/ev_poll_posix.cc diff --git a/Makefile b/Makefile index 92ac929d3bc..3c196a328e9 100644 --- a/Makefile +++ b/Makefile @@ -3837,6 +3837,7 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/endpoint_pair_windows.cc \ src/core/lib/iomgr/error.cc \ src/core/lib/iomgr/error_cfstream.cc \ + src/core/lib/iomgr/ev_apple.cc \ src/core/lib/iomgr/ev_epoll1_linux.cc \ src/core/lib/iomgr/ev_epollex_linux.cc \ src/core/lib/iomgr/ev_poll_posix.cc \ @@ -4464,6 +4465,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/endpoint_pair_windows.cc \ src/core/lib/iomgr/error.cc \ src/core/lib/iomgr/error_cfstream.cc \ + src/core/lib/iomgr/ev_apple.cc \ src/core/lib/iomgr/ev_epoll1_linux.cc \ src/core/lib/iomgr/ev_epollex_linux.cc \ src/core/lib/iomgr/ev_poll_posix.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 0f98d3a24ae..0747c4a079b 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -566,6 +566,7 @@ libs: - src/core/lib/iomgr/error.h - src/core/lib/iomgr/error_cfstream.h - src/core/lib/iomgr/error_internal.h + - src/core/lib/iomgr/ev_apple.h - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_epollex_linux.h - src/core/lib/iomgr/ev_poll_posix.h @@ -938,6 +939,7 @@ libs: - src/core/lib/iomgr/endpoint_pair_windows.cc - src/core/lib/iomgr/error.cc - src/core/lib/iomgr/error_cfstream.cc + - src/core/lib/iomgr/ev_apple.cc - src/core/lib/iomgr/ev_epoll1_linux.cc - src/core/lib/iomgr/ev_epollex_linux.cc - src/core/lib/iomgr/ev_poll_posix.cc @@ -1465,6 +1467,7 @@ libs: - src/core/lib/iomgr/error.h - src/core/lib/iomgr/error_cfstream.h - src/core/lib/iomgr/error_internal.h + - src/core/lib/iomgr/ev_apple.h - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_epollex_linux.h - src/core/lib/iomgr/ev_poll_posix.h @@ -1769,6 +1772,7 @@ libs: - src/core/lib/iomgr/endpoint_pair_windows.cc - src/core/lib/iomgr/error.cc - src/core/lib/iomgr/error_cfstream.cc + - src/core/lib/iomgr/ev_apple.cc - src/core/lib/iomgr/ev_epoll1_linux.cc - src/core/lib/iomgr/ev_epollex_linux.cc - src/core/lib/iomgr/ev_poll_posix.cc diff --git a/config.m4 b/config.m4 index 6b450315f4f..b4013a02fbd 100644 --- a/config.m4 +++ b/config.m4 @@ -286,6 +286,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/endpoint_pair_windows.cc \ src/core/lib/iomgr/error.cc \ src/core/lib/iomgr/error_cfstream.cc \ + src/core/lib/iomgr/ev_apple.cc \ src/core/lib/iomgr/ev_epoll1_linux.cc \ src/core/lib/iomgr/ev_epollex_linux.cc \ src/core/lib/iomgr/ev_poll_posix.cc \ diff --git a/config.w32 b/config.w32 index 59cdbdb1ce4..c0b062aafea 100644 --- a/config.w32 +++ b/config.w32 @@ -255,6 +255,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\iomgr\\endpoint_pair_windows.cc " + "src\\core\\lib\\iomgr\\error.cc " + "src\\core\\lib\\iomgr\\error_cfstream.cc " + + "src\\core\\lib\\iomgr\\ev_apple.cc " + "src\\core\\lib\\iomgr\\ev_epoll1_linux.cc " + "src\\core\\lib\\iomgr\\ev_epollex_linux.cc " + "src\\core\\lib\\iomgr\\ev_poll_posix.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 77980999b59..7d546034f95 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -445,6 +445,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/error_cfstream.h', 'src/core/lib/iomgr/error_internal.h', + 'src/core/lib/iomgr/ev_apple.h', 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_epollex_linux.h', 'src/core/lib/iomgr/ev_poll_posix.h', @@ -896,6 +897,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/error_cfstream.h', 'src/core/lib/iomgr/error_internal.h', + 'src/core/lib/iomgr/ev_apple.h', 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_epollex_linux.h', 'src/core/lib/iomgr/ev_poll_posix.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 749e77b3fa7..056c7f8a6fd 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -654,6 +654,8 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error_cfstream.cc', 'src/core/lib/iomgr/error_cfstream.h', 'src/core/lib/iomgr/error_internal.h', + 'src/core/lib/iomgr/ev_apple.cc', + 'src/core/lib/iomgr/ev_apple.h', 'src/core/lib/iomgr/ev_epoll1_linux.cc', 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_epollex_linux.cc', @@ -1249,6 +1251,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/error_cfstream.h', 'src/core/lib/iomgr/error_internal.h', + 'src/core/lib/iomgr/ev_apple.h', 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_epollex_linux.h', 'src/core/lib/iomgr/ev_poll_posix.h', diff --git a/grpc.gemspec b/grpc.gemspec index 29e5d3d436b..3dcf652466f 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -576,6 +576,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/error_cfstream.cc ) s.files += %w( src/core/lib/iomgr/error_cfstream.h ) s.files += %w( src/core/lib/iomgr/error_internal.h ) + s.files += %w( src/core/lib/iomgr/ev_apple.cc ) + s.files += %w( src/core/lib/iomgr/ev_apple.h ) s.files += %w( src/core/lib/iomgr/ev_epoll1_linux.cc ) s.files += %w( src/core/lib/iomgr/ev_epoll1_linux.h ) s.files += %w( src/core/lib/iomgr/ev_epollex_linux.cc ) diff --git a/grpc.gyp b/grpc.gyp index a62c034d65a..ce39a8ed214 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -640,6 +640,7 @@ 'src/core/lib/iomgr/endpoint_pair_windows.cc', 'src/core/lib/iomgr/error.cc', 'src/core/lib/iomgr/error_cfstream.cc', + 'src/core/lib/iomgr/ev_apple.cc', 'src/core/lib/iomgr/ev_epoll1_linux.cc', 'src/core/lib/iomgr/ev_epollex_linux.cc', 'src/core/lib/iomgr/ev_poll_posix.cc', @@ -1129,6 +1130,7 @@ 'src/core/lib/iomgr/endpoint_pair_windows.cc', 'src/core/lib/iomgr/error.cc', 'src/core/lib/iomgr/error_cfstream.cc', + 'src/core/lib/iomgr/ev_apple.cc', 'src/core/lib/iomgr/ev_epoll1_linux.cc', 'src/core/lib/iomgr/ev_epollex_linux.cc', 'src/core/lib/iomgr/ev_poll_posix.cc', diff --git a/package.xml b/package.xml index 06d1669ec48..a8430464cf7 100644 --- a/package.xml +++ b/package.xml @@ -556,6 +556,8 @@ + + diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc index 40fb3779651..7a78ccd8769 100644 --- a/src/core/lib/iomgr/cfstream_handle.cc +++ b/src/core/lib/iomgr/cfstream_handle.cc @@ -32,6 +32,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error_cfstream.h" +#include "src/core/lib/iomgr/ev_apple.h" #include "src/core/lib/iomgr/exec_ctx.h" extern grpc_core::TraceFlag grpc_tcp_trace; @@ -147,8 +148,8 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, CFStreamHandle::WriteCallback, &ctx); - CFReadStreamSetDispatchQueue(read_stream, dispatch_queue_); - CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue_); + grpc_apple_register_read_stream(read_stream, dispatch_queue_); + grpc_apple_register_write_stream(write_stream, dispatch_queue_); } CFStreamHandle::~CFStreamHandle() { diff --git a/src/core/lib/iomgr/ev_apple.cc b/src/core/lib/iomgr/ev_apple.cc new file mode 100644 index 00000000000..d1525828865 --- /dev/null +++ b/src/core/lib/iomgr/ev_apple.cc @@ -0,0 +1,356 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/// Event engine based on Apple's CFRunLoop API family. If the CFRunLoop engine +/// is enabled (see iomgr_posix_cfstream.cc), a global thread is started to +/// handle and trigger all the CFStream events. The CFStream streams register +/// themselves with the run loop with functions grpc_apple_register_read_stream +/// and grpc_apple_register_read_stream. Pollsets are dummy and block on a +/// condition variable in pollset_work(). + +#include + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_APPLE_EV + +#include + +#include + +#include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/ev_apple.h" + +grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling"); + +#ifndef NDEBUG +#define GRPC_POLLING_TRACE(format, ...) \ + if (GRPC_TRACE_FLAG_ENABLED(grpc_apple_polling_trace)) { \ + gpr_log(GPR_DEBUG, "(polling) " format, __VA_ARGS__); \ + } +#else +#define GRPC_POLLING_TRACE(...) +#endif // NDEBUG + +#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1) + +struct GlobalRunLoopContext { + grpc_core::CondVar init_cv; + grpc_core::CondVar input_source_cv; + + grpc_core::Mutex mu; + + // Whether an input source registration is pending. Protected by mu. + bool input_source_registered = false; + + // The reference to the global run loop object. Protected by mu. + CFRunLoopRef run_loop; + + // Whether the pollset has been globally shut down. Protected by mu. + bool is_shutdown = false; +}; + +struct GrpcAppleWorker { + // The condition varible to kick the worker. Works with the pollset's lock + // (GrpcApplePollset.mu). + grpc_core::CondVar cv; + + // Whether the worker is kicked. Protected by the pollset's lock + // (GrpcApplePollset.mu). + bool kicked = false; +}; + +struct GrpcApplePollset { + grpc_core::Mutex mu; + + // Tracks the current workers in the pollset. Protected by mu. + std::list workers; + + // Whether the pollset is shut down. Protected by mu. + bool is_shutdown = false; + + // Closure to call when shutdown is done. Protected by mu. + grpc_closure* shutdown_closure; + + // Whether there's an outstanding kick that was not processed. Protected by + // mu. + bool kicked_without_poller = false; +}; + +static GlobalRunLoopContext* gGlobalRunLoopContext = nullptr; +static grpc_core::Thread* gGlobalRunLoopThread = nullptr; + +/// Register the stream with the dispatch queue. Callbacks of the stream will be +/// issued to the dispatch queue when a network event happens and will be +/// managed by Grand Central Dispatch. +static void grpc_apple_register_read_stream_queue( + CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) { + CFReadStreamSetDispatchQueue(read_stream, dispatch_queue); +} + +/// Register the stream with the dispatch queue. Callbacks of the stream will be +/// issued to the dispatch queue when a network event happens and will be +/// managed by Grand Central Dispatch. +static void grpc_apple_register_write_stream_queue( + CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) { + CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue); +} + +/// Register the stream with the global run loop. Callbacks of the stream will +/// be issued to the run loop when a network event happens and will be driven by +/// the global run loop thread gGlobalRunLoopThread. +static void grpc_apple_register_read_stream_run_loop( + CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) { + GRPC_POLLING_TRACE("Register read stream: %p", read_stream); + grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); + CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop, + kCFRunLoopDefaultMode); + gGlobalRunLoopContext->input_source_registered = true; + gGlobalRunLoopContext->input_source_cv.Signal(); +} + +/// Register the stream with the global run loop. Callbacks of the stream will +/// be issued to the run loop when a network event happens, and will be driven +/// by the global run loop thread gGlobalRunLoopThread. +static void grpc_apple_register_write_stream_run_loop( + CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) { + GRPC_POLLING_TRACE("Register write stream: %p", write_stream); + grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); + CFWriteStreamScheduleWithRunLoop( + write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode); + gGlobalRunLoopContext->input_source_registered = true; + gGlobalRunLoopContext->input_source_cv.Signal(); +} + +/// The default implementation of stream registration is to register the stream +/// to a dispatch queue. However, if the CFRunLoop based pollset is enabled (by +/// macro and environment variable, see docs in iomgr_posix_cfstream.cc), the +/// CFStream streams are registered with the global run loop instead (see +/// pollset_global_init below). +static void (*grpc_apple_register_read_stream_impl)( + CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue; +static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef, + dispatch_queue_t) = + grpc_apple_register_write_stream_queue; + +void grpc_apple_register_read_stream(CFReadStreamRef read_stream, + dispatch_queue_t dispatch_queue) { + grpc_apple_register_read_stream_impl(read_stream, dispatch_queue); +} + +void grpc_apple_register_write_stream(CFWriteStreamRef write_stream, + dispatch_queue_t dispatch_queue) { + grpc_apple_register_write_stream_impl(write_stream, dispatch_queue); +} + +/// Drive the run loop in a global singleton thread until the global run loop is +/// shutdown. +static void GlobalRunLoopFunc(void* arg) { + grpc_core::ReleasableMutexLock lock(&gGlobalRunLoopContext->mu); + gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent(); + gGlobalRunLoopContext->init_cv.Signal(); + + while (!gGlobalRunLoopContext->is_shutdown) { + // CFRunLoopRun() will return immediately if no stream is registered on it. + // So we wait on a conditional variable until a stream is registered; + // otherwise we'll be running a spinning loop. + while (!gGlobalRunLoopContext->input_source_registered) { + gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu); + } + gGlobalRunLoopContext->input_source_registered = false; + lock.Unlock(); + CFRunLoopRun(); + lock.Lock(); + } + lock.Unlock(); +} + +// pollset implementation + +static void pollset_global_init(void) { + gGlobalRunLoopContext = new GlobalRunLoopContext; + + grpc_apple_register_read_stream_impl = + grpc_apple_register_read_stream_run_loop; + grpc_apple_register_write_stream_impl = + grpc_apple_register_write_stream_run_loop; + + grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); + gGlobalRunLoopThread = + new grpc_core::Thread("apple_ev", GlobalRunLoopFunc, nullptr); + gGlobalRunLoopThread->Start(); + while (gGlobalRunLoopContext->run_loop == NULL) + gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu); +} + +static void pollset_global_shutdown(void) { + { + grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); + gGlobalRunLoopContext->is_shutdown = true; + CFRunLoopStop(gGlobalRunLoopContext->run_loop); + } + gGlobalRunLoopThread->Join(); + delete gGlobalRunLoopThread; + delete gGlobalRunLoopContext; +} + +/// The caller must acquire the lock GrpcApplePollset.mu before calling this +/// function. The lock may be temporarily released when waiting on the condition +/// variable but will be re-acquired before the function returns. +/// +/// The Apple pollset simply waits on a condition variable until it is kicked. +/// The network events are handled in the global run loop thread. Processing of +/// these events will eventually trigger the kick. +static grpc_error* pollset_work(grpc_pollset* pollset, + grpc_pollset_worker** worker, + grpc_millis deadline) { + GRPC_POLLING_TRACE("pollset work: %p, worker: %p, deadline: %" PRIu64, + pollset, worker, deadline); + GrpcApplePollset* apple_pollset = + reinterpret_cast(pollset); + GrpcAppleWorker actual_worker; + if (worker) { + *worker = reinterpret_cast(&actual_worker); + } + + if (apple_pollset->kicked_without_poller) { + // Process the outstanding kick and reset the flag. Do not block. + apple_pollset->kicked_without_poller = false; + } else { + // Block until kicked, timed out, or the pollset shuts down. + apple_pollset->workers.push_front(&actual_worker); + auto it = apple_pollset->workers.begin(); + + while (!actual_worker.kicked && !apple_pollset->is_shutdown) { + if (actual_worker.cv.Wait( + &apple_pollset->mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { + // timed out + break; + } + } + + apple_pollset->workers.erase(it); + + // If the pollset is shut down asynchronously and this is the last pending + // worker, the shutdown process is complete at this moment and the shutdown + // callback will be called. + if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) { + grpc_core::ExecCtx::Run(DEBUG_LOCATION, apple_pollset->shutdown_closure, + GRPC_ERROR_NONE); + } + } + + return GRPC_ERROR_NONE; +} + +/// Kick a specific worker. The caller must acquire the lock GrpcApplePollset.mu +/// before calling this function. +static void kick_worker(GrpcAppleWorker* worker) { + worker->kicked = true; + worker->cv.Signal(); +} + +/// The caller must acquire the lock GrpcApplePollset.mu before calling this +/// function. The kick action simply signals the condition variable of the +/// worker. +static grpc_error* pollset_kick(grpc_pollset* pollset, + grpc_pollset_worker* specific_worker) { + GrpcApplePollset* apple_pollset = + reinterpret_cast(pollset); + + GRPC_POLLING_TRACE("pollset kick: %p, worker:%p", pollset, specific_worker); + + if (specific_worker == nullptr) { + if (apple_pollset->workers.empty()) { + apple_pollset->kicked_without_poller = true; + } else { + GrpcAppleWorker* actual_worker = apple_pollset->workers.front(); + kick_worker(actual_worker); + } + } else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + for (auto& actual_worker : apple_pollset->workers) { + kick_worker(actual_worker); + } + } else { + GrpcAppleWorker* actual_worker = + reinterpret_cast(specific_worker); + kick_worker(actual_worker); + } + + return GRPC_ERROR_NONE; +} + +static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { + GRPC_POLLING_TRACE("pollset init: %p", pollset); + GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset(); + *mu = apple_pollset->mu.get(); +} + +/// The caller must acquire the lock GrpcApplePollset.mu before calling this +/// function. +static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { + GRPC_POLLING_TRACE("pollset shutdown: %p", pollset); + + GrpcApplePollset* apple_pollset = + reinterpret_cast(pollset); + apple_pollset->is_shutdown = true; + pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); + + // If there is any worker blocked, shutdown will be done asynchronously. + if (apple_pollset->workers.empty()) { + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); + } else { + apple_pollset->shutdown_closure = closure; + } +} + +static void pollset_destroy(grpc_pollset* pollset) { + GRPC_POLLING_TRACE("pollset destroy: %p", pollset); + GrpcApplePollset* apple_pollset = + reinterpret_cast(pollset); + apple_pollset->~GrpcApplePollset(); +} + +size_t pollset_size(void) { return sizeof(GrpcApplePollset); } + +grpc_pollset_vtable grpc_apple_pollset_vtable = { + pollset_global_init, pollset_global_shutdown, + pollset_init, pollset_shutdown, + pollset_destroy, pollset_work, + pollset_kick, pollset_size}; + +// pollset_set implementation + +grpc_pollset_set* pollset_set_create(void) { return nullptr; } +void pollset_set_destroy(grpc_pollset_set* pollset_set) {} +void pollset_set_add_pollset(grpc_pollset_set* pollset_set, + grpc_pollset* pollset) {} +void pollset_set_del_pollset(grpc_pollset_set* pollset_set, + grpc_pollset* pollset) {} +void pollset_set_add_pollset_set(grpc_pollset_set* bag, + grpc_pollset_set* item) {} +void pollset_set_del_pollset_set(grpc_pollset_set* bag, + grpc_pollset_set* item) {} + +grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = { + pollset_set_create, pollset_set_destroy, + pollset_set_add_pollset, pollset_set_del_pollset, + pollset_set_add_pollset_set, pollset_set_del_pollset_set}; + +#endif diff --git a/src/core/lib/iomgr/ev_apple.h b/src/core/lib/iomgr/ev_apple.h new file mode 100644 index 00000000000..a05f91ce15f --- /dev/null +++ b/src/core/lib/iomgr/ev_apple.h @@ -0,0 +1,43 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_EV_APPLE_H +#define GRPC_CORE_LIB_IOMGR_EV_APPLE_H + +#include + +#ifdef GRPC_APPLE_EV + +#include + +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" + +void grpc_apple_register_read_stream(CFReadStreamRef read_stream, + dispatch_queue_t dispatch_queue); + +void grpc_apple_register_write_stream(CFWriteStreamRef write_stream, + dispatch_queue_t dispatch_queue); + +extern grpc_pollset_vtable grpc_apple_pollset_vtable; + +extern grpc_pollset_set_vtable grpc_apple_pollset_set_vtable; + +#endif + +#endif diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index 72b2ae5eb00..24f9f4101b2 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -16,6 +16,20 @@ * */ +/// CFStream is build-enabled on iOS by default and disabled by default on other +/// platforms (see port_platform.h). To enable CFStream build on another +/// platform, the users need to define macro "GRPC_CFSTREAM=1" when building +/// gRPC. +/// +/// When CFStream is to be built (either by default on iOS or by macro on other +/// platforms), the users can disable CFStream with environment variable +/// "grpc_cfstream=0". This will let gRPC to fallback to use POSIX sockets. In +/// addition, the users may choose to use an alternative CFRunLoop based pollset +/// "ev_apple" by setting environment variable "grpc_cfstream_run_loop=1". This +/// pollset resolves a bug from Apple when CFStream streams dispatch events to +/// dispatch queues. The caveat of this pollset is that users may not be able to +/// run a gRPC server in the same process. + #include #include "src/core/lib/iomgr/port.h" @@ -23,6 +37,7 @@ #ifdef GRPC_CFSTREAM_IOMGR #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/ev_apple.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/iomgr_posix.h" @@ -33,6 +48,7 @@ #include "src/core/lib/iomgr/timer.h" static const char* grpc_cfstream_env_var = "grpc_cfstream"; +static const char* grpc_cfstream_run_loop_env_var = "GRPC_CFSTREAM_RUN_LOOP"; extern grpc_tcp_server_vtable grpc_posix_tcp_server_vtable; extern grpc_tcp_client_vtable grpc_posix_tcp_client_vtable; @@ -42,6 +58,33 @@ extern grpc_pollset_vtable grpc_posix_pollset_vtable; extern grpc_pollset_set_vtable grpc_posix_pollset_set_vtable; extern grpc_address_resolver_vtable grpc_posix_resolver_vtable; +static void apple_iomgr_platform_init(void) { grpc_pollset_global_init(); } + +static void apple_iomgr_platform_flush(void) {} + +static void apple_iomgr_platform_shutdown(void) { + grpc_pollset_global_shutdown(); +} + +static void apple_iomgr_platform_shutdown_background_closure(void) {} + +static bool apple_iomgr_platform_is_any_background_poller_thread(void) { + return false; +} + +static bool apple_iomgr_platform_add_closure_to_background_poller( + grpc_closure* closure, grpc_error* error) { + return false; +} + +static grpc_iomgr_platform_vtable apple_vtable = { + apple_iomgr_platform_init, + apple_iomgr_platform_flush, + apple_iomgr_platform_shutdown, + apple_iomgr_platform_shutdown_background_closure, + apple_iomgr_platform_is_any_background_poller_thread, + apple_iomgr_platform_add_closure_to_background_poller}; + static void iomgr_platform_init(void) { grpc_wakeup_fd_global_init(); grpc_event_engine_init(); @@ -76,32 +119,53 @@ static grpc_iomgr_platform_vtable vtable = { iomgr_platform_add_closure_to_background_poller}; void grpc_set_default_iomgr_platform() { - char* enable_cfstream = getenv(grpc_cfstream_env_var); - grpc_tcp_client_vtable* client_vtable = &grpc_posix_tcp_client_vtable; - // CFStream is enabled by default on iOS, and disabled by default on other - // platforms. Defaults can be overriden by setting the grpc_cfstream - // environment variable. -#if TARGET_OS_IPHONE - if (enable_cfstream == nullptr || enable_cfstream[0] == '1') { - client_vtable = &grpc_cfstream_client_vtable; + char* enable_cfstream_str = getenv(grpc_cfstream_env_var); + bool enable_cfstream = + enable_cfstream_str == nullptr || enable_cfstream_str[0] != '0'; + char* enable_cfstream_run_loop_str = getenv(grpc_cfstream_run_loop_env_var); + // CFStream run-loop is disabled by default. The user has to enable it + // explicitly with environment variable. + bool enable_cfstream_run_loop = enable_cfstream_run_loop_str != nullptr && + enable_cfstream_run_loop_str[0] == '1'; + if (!enable_cfstream) { + // Use POSIX sockets for both client and server + grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable); + grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable); + grpc_set_pollset_vtable(&grpc_posix_pollset_vtable); + grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable); + grpc_set_iomgr_platform_vtable(&vtable); + } else if (enable_cfstream && !enable_cfstream_run_loop) { + // Use CFStream with dispatch queue for client; use POSIX sockets for server + grpc_set_tcp_client_impl(&grpc_cfstream_client_vtable); + grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable); + grpc_set_pollset_vtable(&grpc_posix_pollset_vtable); + grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable); + grpc_set_iomgr_platform_vtable(&vtable); + } else { + // Use CFStream with CFRunLoop for client; server not supported + grpc_set_tcp_client_impl(&grpc_cfstream_client_vtable); + grpc_set_pollset_vtable(&grpc_apple_pollset_vtable); + grpc_set_pollset_set_vtable(&grpc_apple_pollset_set_vtable); + grpc_set_iomgr_platform_vtable(&apple_vtable); } -#else - if (enable_cfstream != nullptr && enable_cfstream[0] == '1') { - client_vtable = &grpc_cfstream_client_vtable; - } -#endif - - grpc_set_tcp_client_impl(client_vtable); - grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable); grpc_set_timer_impl(&grpc_generic_timer_vtable); - grpc_set_pollset_vtable(&grpc_posix_pollset_vtable); - grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable); grpc_set_resolver_impl(&grpc_posix_resolver_vtable); - grpc_set_iomgr_platform_vtable(&vtable); } bool grpc_iomgr_run_in_background() { - return grpc_event_engine_run_in_background(); + char* enable_cfstream_str = getenv(grpc_cfstream_env_var); + bool enable_cfstream = + enable_cfstream_str == nullptr || enable_cfstream_str[0] != '0'; + char* enable_cfstream_run_loop_str = getenv(grpc_cfstream_run_loop_env_var); + // CFStream run-loop is disabled by default. The user has to enable it + // explicitly with environment variable. + bool enable_cfstream_run_loop = enable_cfstream_run_loop_str != nullptr && + enable_cfstream_run_loop_str[0] == '1'; + if (enable_cfstream && enable_cfstream_run_loop) { + return false; + } else { + return grpc_event_engine_run_in_background(); + } } #endif /* GRPC_CFSTREAM_IOMGR */ diff --git a/src/core/lib/iomgr/pollset_set_custom.cc b/src/core/lib/iomgr/pollset_set_custom.cc index 099388f7c5b..2c1df608197 100644 --- a/src/core/lib/iomgr/pollset_set_custom.cc +++ b/src/core/lib/iomgr/pollset_set_custom.cc @@ -22,23 +22,23 @@ #include "src/core/lib/iomgr/pollset_set.h" -grpc_pollset_set* pollset_set_create(void) { +static grpc_pollset_set* pollset_set_create(void) { return (grpc_pollset_set*)((intptr_t)0xdeafbeef); } -void pollset_set_destroy(grpc_pollset_set* /*pollset_set*/) {} +static void pollset_set_destroy(grpc_pollset_set* /*pollset_set*/) {} -void pollset_set_add_pollset(grpc_pollset_set* /*pollset_set*/, - grpc_pollset* /*pollset*/) {} +static void pollset_set_add_pollset(grpc_pollset_set* /*pollset_set*/, + grpc_pollset* /*pollset*/) {} -void pollset_set_del_pollset(grpc_pollset_set* /*pollset_set*/, - grpc_pollset* /*pollset*/) {} +static void pollset_set_del_pollset(grpc_pollset_set* /*pollset_set*/, + grpc_pollset* /*pollset*/) {} -void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/, - grpc_pollset_set* /*item*/) {} +static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/, + grpc_pollset_set* /*item*/) {} -void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/, - grpc_pollset_set* /*item*/) {} +static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/, + grpc_pollset_set* /*item*/) {} static grpc_pollset_set_vtable vtable = { pollset_set_create, pollset_set_destroy, diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 1d2e9506451..211423e643d 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -129,6 +129,7 @@ #define GRPC_CFSTREAM_IOMGR 1 #define GRPC_CFSTREAM_CLIENT 1 #define GRPC_CFSTREAM_ENDPOINT 1 +#define GRPC_APPLE_EV 1 #define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1 #define GRPC_POSIX_SOCKET_EV 1 #define GRPC_POSIX_SOCKET_EV_EPOLL1 1 diff --git a/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/InteropTests.xcscheme b/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/InteropTests.xcscheme index cbde360a338..d4557937965 100644 --- a/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/InteropTests.xcscheme +++ b/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/InteropTests.xcscheme @@ -66,8 +66,13 @@ ReferencedContainer = "container:Tests.xcodeproj"> - - + + + + - - + + + + - - + + + + - - + + + + - - + + + +