Merge branch 'master' into failwrites

pull/22668/head
Yash Tibrewal 5 years ago
commit 86c03c1710
  1. 2
      .gitignore
  2. 3
      BUILD
  3. 2
      BUILD.gn
  4. 2
      CMakeLists.txt
  5. 2
      Makefile
  6. 4
      build_autogenerated.yaml
  7. 1
      config.m4
  8. 1
      config.w32
  9. 266
      examples/cpp/helloworld/README.md
  10. 2
      gRPC-C++.podspec
  11. 3
      gRPC-Core.podspec
  12. 2
      grpc.gemspec
  13. 2
      grpc.gyp
  14. 2
      package.xml
  15. 5
      src/core/lib/iomgr/cfstream_handle.cc
  16. 356
      src/core/lib/iomgr/ev_apple.cc
  17. 43
      src/core/lib/iomgr/ev_apple.h
  18. 100
      src/core/lib/iomgr/iomgr_posix_cfstream.cc
  19. 12
      src/core/lib/iomgr/pollset_set_custom.cc
  20. 1
      src/core/lib/iomgr/port.h
  21. 9
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/InteropTests.xcscheme
  22. 9
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/MacTests.xcscheme
  23. 9
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/PerfTests.xcscheme
  24. 9
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/TvTests.xcscheme
  25. 9
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/UnitTests.xcscheme
  26. 1
      src/python/grpcio/grpc_core_dependencies.py
  27. 2
      test/cpp/ios/Podfile
  28. 2
      tools/doxygen/Doxyfile.c++.internal
  29. 2
      tools/doxygen/Doxyfile.core.internal

2
.gitignore vendored

@ -136,7 +136,7 @@ bm_diff_old/
bm_*.json
# cmake build files
/cmake/build
**/cmake/build/
# Visual Studio Code artifacts
.vscode/*

@ -724,6 +724,7 @@ grpc_cc_library(
"src/core/lib/iomgr/endpoint_pair_windows.cc",
"src/core/lib/iomgr/error.cc",
"src/core/lib/iomgr/error_cfstream.cc",
"src/core/lib/iomgr/ev_apple.cc",
"src/core/lib/iomgr/ev_epoll1_linux.cc",
"src/core/lib/iomgr/ev_epollex_linux.cc",
"src/core/lib/iomgr/ev_poll_posix.cc",
@ -885,6 +886,7 @@ grpc_cc_library(
"src/core/lib/iomgr/error.h",
"src/core/lib/iomgr/error_cfstream.h",
"src/core/lib/iomgr/error_internal.h",
"src/core/lib/iomgr/ev_apple.h",
"src/core/lib/iomgr/ev_epoll1_linux.h",
"src/core/lib/iomgr/ev_epollex_linux.h",
"src/core/lib/iomgr/ev_poll_posix.h",
@ -989,7 +991,6 @@ grpc_cc_library(
],
language = "c++",
public_hdrs = GRPC_PUBLIC_HDRS,
use_cfstream = True,
deps = [
"eventmanager_libuv",
"gpr_base",

@ -604,6 +604,8 @@ config("grpc_config") {
"src/core/lib/iomgr/error_cfstream.cc",
"src/core/lib/iomgr/error_cfstream.h",
"src/core/lib/iomgr/error_internal.h",
"src/core/lib/iomgr/ev_apple.cc",
"src/core/lib/iomgr/ev_apple.h",
"src/core/lib/iomgr/ev_epoll1_linux.cc",
"src/core/lib/iomgr/ev_epoll1_linux.h",
"src/core/lib/iomgr/ev_epollex_linux.cc",

@ -1514,6 +1514,7 @@ add_library(grpc
src/core/lib/iomgr/endpoint_pair_windows.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/error_cfstream.cc
src/core/lib/iomgr/ev_apple.cc
src/core/lib/iomgr/ev_epoll1_linux.cc
src/core/lib/iomgr/ev_epollex_linux.cc
src/core/lib/iomgr/ev_poll_posix.cc
@ -2167,6 +2168,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/endpoint_pair_windows.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/error_cfstream.cc
src/core/lib/iomgr/ev_apple.cc
src/core/lib/iomgr/ev_epoll1_linux.cc
src/core/lib/iomgr/ev_epollex_linux.cc
src/core/lib/iomgr/ev_poll_posix.cc

@ -3839,6 +3839,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/endpoint_pair_windows.cc \
src/core/lib/iomgr/error.cc \
src/core/lib/iomgr/error_cfstream.cc \
src/core/lib/iomgr/ev_apple.cc \
src/core/lib/iomgr/ev_epoll1_linux.cc \
src/core/lib/iomgr/ev_epollex_linux.cc \
src/core/lib/iomgr/ev_poll_posix.cc \
@ -4466,6 +4467,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/endpoint_pair_windows.cc \
src/core/lib/iomgr/error.cc \
src/core/lib/iomgr/error_cfstream.cc \
src/core/lib/iomgr/ev_apple.cc \
src/core/lib/iomgr/ev_epoll1_linux.cc \
src/core/lib/iomgr/ev_epollex_linux.cc \
src/core/lib/iomgr/ev_poll_posix.cc \

@ -568,6 +568,7 @@ libs:
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_cfstream.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/ev_apple.h
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_epollex_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
@ -940,6 +941,7 @@ libs:
- src/core/lib/iomgr/endpoint_pair_windows.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/error_cfstream.cc
- src/core/lib/iomgr/ev_apple.cc
- src/core/lib/iomgr/ev_epoll1_linux.cc
- src/core/lib/iomgr/ev_epollex_linux.cc
- src/core/lib/iomgr/ev_poll_posix.cc
@ -1467,6 +1469,7 @@ libs:
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/error_cfstream.h
- src/core/lib/iomgr/error_internal.h
- src/core/lib/iomgr/ev_apple.h
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_epollex_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
@ -1771,6 +1774,7 @@ libs:
- src/core/lib/iomgr/endpoint_pair_windows.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/error_cfstream.cc
- src/core/lib/iomgr/ev_apple.cc
- src/core/lib/iomgr/ev_epoll1_linux.cc
- src/core/lib/iomgr/ev_epollex_linux.cc
- src/core/lib/iomgr/ev_poll_posix.cc

@ -286,6 +286,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/endpoint_pair_windows.cc \
src/core/lib/iomgr/error.cc \
src/core/lib/iomgr/error_cfstream.cc \
src/core/lib/iomgr/ev_apple.cc \
src/core/lib/iomgr/ev_epoll1_linux.cc \
src/core/lib/iomgr/ev_epollex_linux.cc \
src/core/lib/iomgr/ev_poll_posix.cc \

@ -255,6 +255,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\endpoint_pair_windows.cc " +
"src\\core\\lib\\iomgr\\error.cc " +
"src\\core\\lib\\iomgr\\error_cfstream.cc " +
"src\\core\\lib\\iomgr\\ev_apple.cc " +
"src\\core\\lib\\iomgr\\ev_epoll1_linux.cc " +
"src\\core\\lib\\iomgr\\ev_epollex_linux.cc " +
"src\\core\\lib\\iomgr\\ev_poll_posix.cc " +

@ -1,264 +1,6 @@
# gRPC C++ Hello World Tutorial
# gRPC C++ Hello World Example
### Install gRPC
Make sure you have installed gRPC on your system. Follow the
[BUILDING.md](../../../BUILDING.md) instructions.
You can find a complete set of instructions for building gRPC and running the
Hello World app in the [C++ Quick Start][].
### Get the tutorial source code
The example code for this and our other examples lives in the `examples`
directory. Clone this repository at the [latest stable release tag](https://github.com/grpc/grpc/releases)
to your local machine by running the following command:
```sh
$ git clone -b RELEASE_TAG_HERE https://github.com/grpc/grpc
```
Change your current directory to examples/cpp/helloworld
```sh
$ cd examples/cpp/helloworld/
```
### Defining a service
The first step in creating our example is to define a *service*: an RPC
service specifies the methods that can be called remotely with their parameters
and return types. As you saw in the
[overview](#protocolbuffers) above, gRPC does this using [protocol
buffers](https://developers.google.com/protocol-buffers/docs/overview). We
use the protocol buffers interface definition language (IDL) to define our
service methods, and define the parameters and return
types as protocol buffer message types. Both the client and the
server use interface code generated from the service definition.
Here's our example service definition, defined using protocol buffers IDL in
[helloworld.proto](../../protos/helloworld.proto). The `Greeting`
service has one method, `hello`, that lets the server receive a single
`HelloRequest`
message from the remote client containing the user's name, then send back
a greeting in a single `HelloReply`. This is the simplest type of RPC you
can specify in gRPC - we'll look at some other types later in this document.
```protobuf
syntax = "proto3";
option java_package = "ex.grpc";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
```
<a name="generating"></a>
### Generating gRPC code
Once we've defined our service, we use the protocol buffer compiler
`protoc` to generate the special client and server code we need to create
our application. The generated code contains both stub code for clients to
use and an abstract interface for servers to implement, both with the method
defined in our `Greeting` service.
To generate the client and server side interfaces:
```sh
$ make helloworld.grpc.pb.cc helloworld.pb.cc
```
Which internally invokes the proto-compiler as:
```sh
$ protoc -I ../../protos/ --grpc_out=. --plugin=protoc-gen-grpc=grpc_cpp_plugin ../../protos/helloworld.proto
$ protoc -I ../../protos/ --cpp_out=. ../../protos/helloworld.proto
```
### Writing a client
- Create a channel. A channel is a logical connection to an endpoint. A gRPC
channel can be created with the target address, credentials to use and
arguments as follows
```cpp
auto channel = CreateChannel("localhost:50051", InsecureChannelCredentials());
```
- Create a stub. A stub implements the rpc methods of a service and in the
generated code, a method is provided to create a stub with a channel:
```cpp
auto stub = helloworld::Greeter::NewStub(channel);
```
- Make a unary rpc, with `ClientContext` and request/response proto messages.
```cpp
ClientContext context;
HelloRequest request;
request.set_name("hello");
HelloReply reply;
Status status = stub->SayHello(&context, request, &reply);
```
- Check returned status and response.
```cpp
if (status.ok()) {
// check reply.message()
} else {
// rpc failed.
}
```
For a working example, refer to [greeter_client.cc](greeter_client.cc).
### Writing a server
- Implement the service interface
```cpp
class GreeterServiceImpl final : public Greeter::Service {
Status SayHello(ServerContext* context, const HelloRequest* request,
HelloReply* reply) override {
std::string prefix("Hello ");
reply->set_message(prefix + request->name());
return Status::OK;
}
};
```
- Build a server exporting the service
```cpp
GreeterServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort("0.0.0.0:50051", grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
```
For a working example, refer to [greeter_server.cc](greeter_server.cc).
### Writing asynchronous client and server
gRPC uses `CompletionQueue` API for asynchronous operations. The basic work flow
is
- bind a `CompletionQueue` to a rpc call
- do something like a read or write, present with a unique `void*` tag
- call `CompletionQueue::Next` to wait for operations to complete. If a tag
appears, it indicates that the corresponding operation is complete.
#### Async client
The channel and stub creation code is the same as the sync client.
- Initiate the rpc and create a handle for the rpc. Bind the rpc to a
`CompletionQueue`.
```cpp
CompletionQueue cq;
auto rpc = stub->AsyncSayHello(&context, request, &cq);
```
- Ask for reply and final status, with a unique tag
```cpp
Status status;
rpc->Finish(&reply, &status, (void*)1);
```
- Wait for the completion queue to return the next tag. The reply and status are
ready once the tag passed into the corresponding `Finish()` call is returned.
```cpp
void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)1) {
// check reply and status
}
```
For a working example, refer to [greeter_async_client.cc](greeter_async_client.cc).
#### Async server
The server implementation requests a rpc call with a tag and then wait for the
completion queue to return the tag. The basic flow is
- Build a server exporting the async service
```cpp
helloworld::Greeter::AsyncService service;
ServerBuilder builder;
builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());
builder.RegisterService(&service);
auto cq = builder.AddCompletionQueue();
auto server = builder.BuildAndStart();
```
- Request one rpc
```cpp
ServerContext context;
HelloRequest request;
ServerAsyncResponseWriter<HelloReply> responder;
service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
```
- Wait for the completion queue to return the tag. The context, request and
responder are ready once the tag is retrieved.
```cpp
HelloReply reply;
Status status;
void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)1) {
// set reply and status
responder.Finish(reply, status, (void*)2);
}
```
- Wait for the completion queue to return the tag. The rpc is finished when the
tag is back.
```cpp
void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)2) {
// clean up
}
```
To handle multiple rpcs, the async server creates an object `CallData` to
maintain the state of each rpc and use the address of it as the unique tag. For
simplicity the server only uses one completion queue for all events, and runs a
main loop in `HandleRpcs` to query the queue.
For a working example, refer to [greeter_async_server.cc](greeter_async_server.cc).
#### Flags for the client
```sh
./greeter_client --target="a target string used to create a GRPC client channel"
```
The Default value for --target is "localhost:50051".
[C++ Quick Start]: https://grpc.io/docs/quickstart/cpp

@ -445,6 +445,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/error.h',
'src/core/lib/iomgr/error_cfstream.h',
'src/core/lib/iomgr/error_internal.h',
'src/core/lib/iomgr/ev_apple.h',
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_epollex_linux.h',
'src/core/lib/iomgr/ev_poll_posix.h',
@ -896,6 +897,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/error.h',
'src/core/lib/iomgr/error_cfstream.h',
'src/core/lib/iomgr/error_internal.h',
'src/core/lib/iomgr/ev_apple.h',
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_epollex_linux.h',
'src/core/lib/iomgr/ev_poll_posix.h',

@ -654,6 +654,8 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/error_cfstream.cc',
'src/core/lib/iomgr/error_cfstream.h',
'src/core/lib/iomgr/error_internal.h',
'src/core/lib/iomgr/ev_apple.cc',
'src/core/lib/iomgr/ev_apple.h',
'src/core/lib/iomgr/ev_epoll1_linux.cc',
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_epollex_linux.cc',
@ -1249,6 +1251,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/error.h',
'src/core/lib/iomgr/error_cfstream.h',
'src/core/lib/iomgr/error_internal.h',
'src/core/lib/iomgr/ev_apple.h',
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_epollex_linux.h',
'src/core/lib/iomgr/ev_poll_posix.h',

@ -576,6 +576,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/error_cfstream.cc )
s.files += %w( src/core/lib/iomgr/error_cfstream.h )
s.files += %w( src/core/lib/iomgr/error_internal.h )
s.files += %w( src/core/lib/iomgr/ev_apple.cc )
s.files += %w( src/core/lib/iomgr/ev_apple.h )
s.files += %w( src/core/lib/iomgr/ev_epoll1_linux.cc )
s.files += %w( src/core/lib/iomgr/ev_epoll1_linux.h )
s.files += %w( src/core/lib/iomgr/ev_epollex_linux.cc )

@ -642,6 +642,7 @@
'src/core/lib/iomgr/endpoint_pair_windows.cc',
'src/core/lib/iomgr/error.cc',
'src/core/lib/iomgr/error_cfstream.cc',
'src/core/lib/iomgr/ev_apple.cc',
'src/core/lib/iomgr/ev_epoll1_linux.cc',
'src/core/lib/iomgr/ev_epollex_linux.cc',
'src/core/lib/iomgr/ev_poll_posix.cc',
@ -1131,6 +1132,7 @@
'src/core/lib/iomgr/endpoint_pair_windows.cc',
'src/core/lib/iomgr/error.cc',
'src/core/lib/iomgr/error_cfstream.cc',
'src/core/lib/iomgr/ev_apple.cc',
'src/core/lib/iomgr/ev_epoll1_linux.cc',
'src/core/lib/iomgr/ev_epollex_linux.cc',
'src/core/lib/iomgr/ev_poll_posix.cc',

@ -556,6 +556,8 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/error_cfstream.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/error_cfstream.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/error_internal.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_apple.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_apple.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_epoll1_linux.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_epoll1_linux.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_epollex_linux.cc" role="src" />

@ -32,6 +32,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error_cfstream.h"
#include "src/core/lib/iomgr/ev_apple.h"
#include "src/core/lib/iomgr/exec_ctx.h"
extern grpc_core::TraceFlag grpc_tcp_trace;
@ -147,8 +148,8 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
CFStreamHandle::WriteCallback, &ctx);
CFReadStreamSetDispatchQueue(read_stream, dispatch_queue_);
CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue_);
grpc_apple_register_read_stream(read_stream, dispatch_queue_);
grpc_apple_register_write_stream(write_stream, dispatch_queue_);
}
CFStreamHandle::~CFStreamHandle() {

@ -0,0 +1,356 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/// Event engine based on Apple's CFRunLoop API family. If the CFRunLoop engine
/// is enabled (see iomgr_posix_cfstream.cc), a global thread is started to
/// handle and trigger all the CFStream events. The CFStream streams register
/// themselves with the run loop with functions grpc_apple_register_read_stream
/// and grpc_apple_register_read_stream. Pollsets are dummy and block on a
/// condition variable in pollset_work().
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_APPLE_EV
#include <CoreFoundation/CoreFoundation.h>
#include <list>
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/ev_apple.h"
grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling");
#ifndef NDEBUG
#define GRPC_POLLING_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(grpc_apple_polling_trace)) { \
gpr_log(GPR_DEBUG, "(polling) " format, __VA_ARGS__); \
}
#else
#define GRPC_POLLING_TRACE(...)
#endif // NDEBUG
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
struct GlobalRunLoopContext {
grpc_core::CondVar init_cv;
grpc_core::CondVar input_source_cv;
grpc_core::Mutex mu;
// Whether an input source registration is pending. Protected by mu.
bool input_source_registered = false;
// The reference to the global run loop object. Protected by mu.
CFRunLoopRef run_loop;
// Whether the pollset has been globally shut down. Protected by mu.
bool is_shutdown = false;
};
struct GrpcAppleWorker {
// The condition varible to kick the worker. Works with the pollset's lock
// (GrpcApplePollset.mu).
grpc_core::CondVar cv;
// Whether the worker is kicked. Protected by the pollset's lock
// (GrpcApplePollset.mu).
bool kicked = false;
};
struct GrpcApplePollset {
grpc_core::Mutex mu;
// Tracks the current workers in the pollset. Protected by mu.
std::list<GrpcAppleWorker*> workers;
// Whether the pollset is shut down. Protected by mu.
bool is_shutdown = false;
// Closure to call when shutdown is done. Protected by mu.
grpc_closure* shutdown_closure;
// Whether there's an outstanding kick that was not processed. Protected by
// mu.
bool kicked_without_poller = false;
};
static GlobalRunLoopContext* gGlobalRunLoopContext = nullptr;
static grpc_core::Thread* gGlobalRunLoopThread = nullptr;
/// Register the stream with the dispatch queue. Callbacks of the stream will be
/// issued to the dispatch queue when a network event happens and will be
/// managed by Grand Central Dispatch.
static void grpc_apple_register_read_stream_queue(
CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
CFReadStreamSetDispatchQueue(read_stream, dispatch_queue);
}
/// Register the stream with the dispatch queue. Callbacks of the stream will be
/// issued to the dispatch queue when a network event happens and will be
/// managed by Grand Central Dispatch.
static void grpc_apple_register_write_stream_queue(
CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue);
}
/// Register the stream with the global run loop. Callbacks of the stream will
/// be issued to the run loop when a network event happens and will be driven by
/// the global run loop thread gGlobalRunLoopThread.
static void grpc_apple_register_read_stream_run_loop(
CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
GRPC_POLLING_TRACE("Register read stream: %p", read_stream);
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop,
kCFRunLoopDefaultMode);
gGlobalRunLoopContext->input_source_registered = true;
gGlobalRunLoopContext->input_source_cv.Signal();
}
/// Register the stream with the global run loop. Callbacks of the stream will
/// be issued to the run loop when a network event happens, and will be driven
/// by the global run loop thread gGlobalRunLoopThread.
static void grpc_apple_register_write_stream_run_loop(
CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
GRPC_POLLING_TRACE("Register write stream: %p", write_stream);
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
CFWriteStreamScheduleWithRunLoop(
write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode);
gGlobalRunLoopContext->input_source_registered = true;
gGlobalRunLoopContext->input_source_cv.Signal();
}
/// The default implementation of stream registration is to register the stream
/// to a dispatch queue. However, if the CFRunLoop based pollset is enabled (by
/// macro and environment variable, see docs in iomgr_posix_cfstream.cc), the
/// CFStream streams are registered with the global run loop instead (see
/// pollset_global_init below).
static void (*grpc_apple_register_read_stream_impl)(
CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue;
static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef,
dispatch_queue_t) =
grpc_apple_register_write_stream_queue;
void grpc_apple_register_read_stream(CFReadStreamRef read_stream,
dispatch_queue_t dispatch_queue) {
grpc_apple_register_read_stream_impl(read_stream, dispatch_queue);
}
void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
dispatch_queue_t dispatch_queue) {
grpc_apple_register_write_stream_impl(write_stream, dispatch_queue);
}
/// Drive the run loop in a global singleton thread until the global run loop is
/// shutdown.
static void GlobalRunLoopFunc(void* arg) {
grpc_core::ReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
gGlobalRunLoopContext->init_cv.Signal();
while (!gGlobalRunLoopContext->is_shutdown) {
// CFRunLoopRun() will return immediately if no stream is registered on it.
// So we wait on a conditional variable until a stream is registered;
// otherwise we'll be running a spinning loop.
while (!gGlobalRunLoopContext->input_source_registered) {
gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu);
}
gGlobalRunLoopContext->input_source_registered = false;
lock.Unlock();
CFRunLoopRun();
lock.Lock();
}
lock.Unlock();
}
// pollset implementation
static void pollset_global_init(void) {
gGlobalRunLoopContext = new GlobalRunLoopContext;
grpc_apple_register_read_stream_impl =
grpc_apple_register_read_stream_run_loop;
grpc_apple_register_write_stream_impl =
grpc_apple_register_write_stream_run_loop;
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
gGlobalRunLoopThread =
new grpc_core::Thread("apple_ev", GlobalRunLoopFunc, nullptr);
gGlobalRunLoopThread->Start();
while (gGlobalRunLoopContext->run_loop == NULL)
gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu);
}
static void pollset_global_shutdown(void) {
{
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
gGlobalRunLoopContext->is_shutdown = true;
CFRunLoopStop(gGlobalRunLoopContext->run_loop);
}
gGlobalRunLoopThread->Join();
delete gGlobalRunLoopThread;
delete gGlobalRunLoopContext;
}
/// The caller must acquire the lock GrpcApplePollset.mu before calling this
/// function. The lock may be temporarily released when waiting on the condition
/// variable but will be re-acquired before the function returns.
///
/// The Apple pollset simply waits on a condition variable until it is kicked.
/// The network events are handled in the global run loop thread. Processing of
/// these events will eventually trigger the kick.
static grpc_error* pollset_work(grpc_pollset* pollset,
grpc_pollset_worker** worker,
grpc_millis deadline) {
GRPC_POLLING_TRACE("pollset work: %p, worker: %p, deadline: %" PRIu64,
pollset, worker, deadline);
GrpcApplePollset* apple_pollset =
reinterpret_cast<GrpcApplePollset*>(pollset);
GrpcAppleWorker actual_worker;
if (worker) {
*worker = reinterpret_cast<grpc_pollset_worker*>(&actual_worker);
}
if (apple_pollset->kicked_without_poller) {
// Process the outstanding kick and reset the flag. Do not block.
apple_pollset->kicked_without_poller = false;
} else {
// Block until kicked, timed out, or the pollset shuts down.
apple_pollset->workers.push_front(&actual_worker);
auto it = apple_pollset->workers.begin();
while (!actual_worker.kicked && !apple_pollset->is_shutdown) {
if (actual_worker.cv.Wait(
&apple_pollset->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
// timed out
break;
}
}
apple_pollset->workers.erase(it);
// If the pollset is shut down asynchronously and this is the last pending
// worker, the shutdown process is complete at this moment and the shutdown
// callback will be called.
if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, apple_pollset->shutdown_closure,
GRPC_ERROR_NONE);
}
}
return GRPC_ERROR_NONE;
}
/// Kick a specific worker. The caller must acquire the lock GrpcApplePollset.mu
/// before calling this function.
static void kick_worker(GrpcAppleWorker* worker) {
worker->kicked = true;
worker->cv.Signal();
}
/// The caller must acquire the lock GrpcApplePollset.mu before calling this
/// function. The kick action simply signals the condition variable of the
/// worker.
static grpc_error* pollset_kick(grpc_pollset* pollset,
grpc_pollset_worker* specific_worker) {
GrpcApplePollset* apple_pollset =
reinterpret_cast<GrpcApplePollset*>(pollset);
GRPC_POLLING_TRACE("pollset kick: %p, worker:%p", pollset, specific_worker);
if (specific_worker == nullptr) {
if (apple_pollset->workers.empty()) {
apple_pollset->kicked_without_poller = true;
} else {
GrpcAppleWorker* actual_worker = apple_pollset->workers.front();
kick_worker(actual_worker);
}
} else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (auto& actual_worker : apple_pollset->workers) {
kick_worker(actual_worker);
}
} else {
GrpcAppleWorker* actual_worker =
reinterpret_cast<GrpcAppleWorker*>(specific_worker);
kick_worker(actual_worker);
}
return GRPC_ERROR_NONE;
}
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
GRPC_POLLING_TRACE("pollset init: %p", pollset);
GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset();
*mu = apple_pollset->mu.get();
}
/// The caller must acquire the lock GrpcApplePollset.mu before calling this
/// function.
static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
GRPC_POLLING_TRACE("pollset shutdown: %p", pollset);
GrpcApplePollset* apple_pollset =
reinterpret_cast<GrpcApplePollset*>(pollset);
apple_pollset->is_shutdown = true;
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
// If there is any worker blocked, shutdown will be done asynchronously.
if (apple_pollset->workers.empty()) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
} else {
apple_pollset->shutdown_closure = closure;
}
}
static void pollset_destroy(grpc_pollset* pollset) {
GRPC_POLLING_TRACE("pollset destroy: %p", pollset);
GrpcApplePollset* apple_pollset =
reinterpret_cast<GrpcApplePollset*>(pollset);
apple_pollset->~GrpcApplePollset();
}
size_t pollset_size(void) { return sizeof(GrpcApplePollset); }
grpc_pollset_vtable grpc_apple_pollset_vtable = {
pollset_global_init, pollset_global_shutdown,
pollset_init, pollset_shutdown,
pollset_destroy, pollset_work,
pollset_kick, pollset_size};
// pollset_set implementation
grpc_pollset_set* pollset_set_create(void) { return nullptr; }
void pollset_set_destroy(grpc_pollset_set* pollset_set) {}
void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
grpc_pollset* pollset) {}
void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
grpc_pollset* pollset) {}
void pollset_set_add_pollset_set(grpc_pollset_set* bag,
grpc_pollset_set* item) {}
void pollset_set_del_pollset_set(grpc_pollset_set* bag,
grpc_pollset_set* item) {}
grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = {
pollset_set_create, pollset_set_destroy,
pollset_set_add_pollset, pollset_set_del_pollset,
pollset_set_add_pollset_set, pollset_set_del_pollset_set};
#endif

@ -0,0 +1,43 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_EV_APPLE_H
#define GRPC_CORE_LIB_IOMGR_EV_APPLE_H
#include <grpc/support/port_platform.h>
#ifdef GRPC_APPLE_EV
#include <CoreFoundation/CoreFoundation.h>
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
void grpc_apple_register_read_stream(CFReadStreamRef read_stream,
dispatch_queue_t dispatch_queue);
void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
dispatch_queue_t dispatch_queue);
extern grpc_pollset_vtable grpc_apple_pollset_vtable;
extern grpc_pollset_set_vtable grpc_apple_pollset_set_vtable;
#endif
#endif

@ -16,6 +16,20 @@
*
*/
/// CFStream is build-enabled on iOS by default and disabled by default on other
/// platforms (see port_platform.h). To enable CFStream build on another
/// platform, the users need to define macro "GRPC_CFSTREAM=1" when building
/// gRPC.
///
/// When CFStream is to be built (either by default on iOS or by macro on other
/// platforms), the users can disable CFStream with environment variable
/// "grpc_cfstream=0". This will let gRPC to fallback to use POSIX sockets. In
/// addition, the users may choose to use an alternative CFRunLoop based pollset
/// "ev_apple" by setting environment variable "grpc_cfstream_run_loop=1". This
/// pollset resolves a bug from Apple when CFStream streams dispatch events to
/// dispatch queues. The caveat of this pollset is that users may not be able to
/// run a gRPC server in the same process.
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
@ -23,6 +37,7 @@
#ifdef GRPC_CFSTREAM_IOMGR
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_apple.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/iomgr_posix.h"
@ -33,6 +48,7 @@
#include "src/core/lib/iomgr/timer.h"
static const char* grpc_cfstream_env_var = "grpc_cfstream";
static const char* grpc_cfstream_run_loop_env_var = "GRPC_CFSTREAM_RUN_LOOP";
extern grpc_tcp_server_vtable grpc_posix_tcp_server_vtable;
extern grpc_tcp_client_vtable grpc_posix_tcp_client_vtable;
@ -42,6 +58,33 @@ extern grpc_pollset_vtable grpc_posix_pollset_vtable;
extern grpc_pollset_set_vtable grpc_posix_pollset_set_vtable;
extern grpc_address_resolver_vtable grpc_posix_resolver_vtable;
static void apple_iomgr_platform_init(void) { grpc_pollset_global_init(); }
static void apple_iomgr_platform_flush(void) {}
static void apple_iomgr_platform_shutdown(void) {
grpc_pollset_global_shutdown();
}
static void apple_iomgr_platform_shutdown_background_closure(void) {}
static bool apple_iomgr_platform_is_any_background_poller_thread(void) {
return false;
}
static bool apple_iomgr_platform_add_closure_to_background_poller(
grpc_closure* closure, grpc_error* error) {
return false;
}
static grpc_iomgr_platform_vtable apple_vtable = {
apple_iomgr_platform_init,
apple_iomgr_platform_flush,
apple_iomgr_platform_shutdown,
apple_iomgr_platform_shutdown_background_closure,
apple_iomgr_platform_is_any_background_poller_thread,
apple_iomgr_platform_add_closure_to_background_poller};
static void iomgr_platform_init(void) {
grpc_wakeup_fd_global_init();
grpc_event_engine_init();
@ -76,32 +119,53 @@ static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_add_closure_to_background_poller};
void grpc_set_default_iomgr_platform() {
char* enable_cfstream = getenv(grpc_cfstream_env_var);
grpc_tcp_client_vtable* client_vtable = &grpc_posix_tcp_client_vtable;
// CFStream is enabled by default on iOS, and disabled by default on other
// platforms. Defaults can be overriden by setting the grpc_cfstream
// environment variable.
#if TARGET_OS_IPHONE
if (enable_cfstream == nullptr || enable_cfstream[0] == '1') {
client_vtable = &grpc_cfstream_client_vtable;
}
#else
if (enable_cfstream != nullptr && enable_cfstream[0] == '1') {
client_vtable = &grpc_cfstream_client_vtable;
}
#endif
grpc_set_tcp_client_impl(client_vtable);
char* enable_cfstream_str = getenv(grpc_cfstream_env_var);
bool enable_cfstream =
enable_cfstream_str == nullptr || enable_cfstream_str[0] != '0';
char* enable_cfstream_run_loop_str = getenv(grpc_cfstream_run_loop_env_var);
// CFStream run-loop is disabled by default. The user has to enable it
// explicitly with environment variable.
bool enable_cfstream_run_loop = enable_cfstream_run_loop_str != nullptr &&
enable_cfstream_run_loop_str[0] == '1';
if (!enable_cfstream) {
// Use POSIX sockets for both client and server
grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable);
grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable);
grpc_set_pollset_vtable(&grpc_posix_pollset_vtable);
grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable);
grpc_set_iomgr_platform_vtable(&vtable);
} else if (enable_cfstream && !enable_cfstream_run_loop) {
// Use CFStream with dispatch queue for client; use POSIX sockets for server
grpc_set_tcp_client_impl(&grpc_cfstream_client_vtable);
grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable);
grpc_set_timer_impl(&grpc_generic_timer_vtable);
grpc_set_pollset_vtable(&grpc_posix_pollset_vtable);
grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable);
grpc_set_resolver_impl(&grpc_posix_resolver_vtable);
grpc_set_iomgr_platform_vtable(&vtable);
} else {
// Use CFStream with CFRunLoop for client; server not supported
grpc_set_tcp_client_impl(&grpc_cfstream_client_vtable);
grpc_set_pollset_vtable(&grpc_apple_pollset_vtable);
grpc_set_pollset_set_vtable(&grpc_apple_pollset_set_vtable);
grpc_set_iomgr_platform_vtable(&apple_vtable);
}
grpc_set_timer_impl(&grpc_generic_timer_vtable);
grpc_set_resolver_impl(&grpc_posix_resolver_vtable);
}
bool grpc_iomgr_run_in_background() {
char* enable_cfstream_str = getenv(grpc_cfstream_env_var);
bool enable_cfstream =
enable_cfstream_str == nullptr || enable_cfstream_str[0] != '0';
char* enable_cfstream_run_loop_str = getenv(grpc_cfstream_run_loop_env_var);
// CFStream run-loop is disabled by default. The user has to enable it
// explicitly with environment variable.
bool enable_cfstream_run_loop = enable_cfstream_run_loop_str != nullptr &&
enable_cfstream_run_loop_str[0] == '1';
if (enable_cfstream && enable_cfstream_run_loop) {
return false;
} else {
return grpc_event_engine_run_in_background();
}
}
#endif /* GRPC_CFSTREAM_IOMGR */

@ -22,22 +22,22 @@
#include "src/core/lib/iomgr/pollset_set.h"
grpc_pollset_set* pollset_set_create(void) {
static grpc_pollset_set* pollset_set_create(void) {
return (grpc_pollset_set*)((intptr_t)0xdeafbeef);
}
void pollset_set_destroy(grpc_pollset_set* /*pollset_set*/) {}
static void pollset_set_destroy(grpc_pollset_set* /*pollset_set*/) {}
void pollset_set_add_pollset(grpc_pollset_set* /*pollset_set*/,
static void pollset_set_add_pollset(grpc_pollset_set* /*pollset_set*/,
grpc_pollset* /*pollset*/) {}
void pollset_set_del_pollset(grpc_pollset_set* /*pollset_set*/,
static void pollset_set_del_pollset(grpc_pollset_set* /*pollset_set*/,
grpc_pollset* /*pollset*/) {}
void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
grpc_pollset_set* /*item*/) {}
void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
grpc_pollset_set* /*item*/) {}
static grpc_pollset_set_vtable vtable = {

@ -129,6 +129,7 @@
#define GRPC_CFSTREAM_IOMGR 1
#define GRPC_CFSTREAM_CLIENT 1
#define GRPC_CFSTREAM_ENDPOINT 1
#define GRPC_APPLE_EV 1
#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
#define GRPC_POSIX_SOCKET_EV 1
#define GRPC_POSIX_SOCKET_EV_EPOLL1 1

@ -66,8 +66,13 @@
ReferencedContainer = "container:Tests.xcodeproj">
</BuildableReference>
</MacroExpansion>
<AdditionalOptions>
</AdditionalOptions>
<EnvironmentVariables>
<EnvironmentVariable
key = "GRPC_CFSTREAM_RUN_LOOP"
value = "1"
isEnabled = "YES">
</EnvironmentVariable>
</EnvironmentVariables>
</LaunchAction>
<ProfileAction
buildConfiguration = "Test"

@ -69,8 +69,13 @@
ReferencedContainer = "container:Tests.xcodeproj">
</BuildableReference>
</MacroExpansion>
<AdditionalOptions>
</AdditionalOptions>
<EnvironmentVariables>
<EnvironmentVariable
key = "GRPC_CFSTREAM_RUN_LOOP"
value = "1"
isEnabled = "YES">
</EnvironmentVariable>
</EnvironmentVariables>
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"

@ -96,8 +96,13 @@
ReferencedContainer = "container:Tests.xcodeproj">
</BuildableReference>
</MacroExpansion>
<AdditionalOptions>
</AdditionalOptions>
<EnvironmentVariables>
<EnvironmentVariable
key = "GRPC_CFSTREAM_RUN_LOOP"
value = "1"
isEnabled = "YES">
</EnvironmentVariable>
</EnvironmentVariables>
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"

@ -66,8 +66,13 @@
ReferencedContainer = "container:Tests.xcodeproj">
</BuildableReference>
</MacroExpansion>
<AdditionalOptions>
</AdditionalOptions>
<EnvironmentVariables>
<EnvironmentVariable
key = "GRPC_CFSTREAM_RUN_LOOP"
value = "1"
isEnabled = "YES">
</EnvironmentVariable>
</EnvironmentVariables>
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"

@ -61,8 +61,13 @@
ReferencedContainer = "container:Tests.xcodeproj">
</BuildableReference>
</MacroExpansion>
<AdditionalOptions>
</AdditionalOptions>
<EnvironmentVariables>
<EnvironmentVariable
key = "GRPC_CFSTREAM_RUN_LOOP"
value = "1"
isEnabled = "YES">
</EnvironmentVariable>
</EnvironmentVariables>
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"

@ -264,6 +264,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/endpoint_pair_windows.cc',
'src/core/lib/iomgr/error.cc',
'src/core/lib/iomgr/error_cfstream.cc',
'src/core/lib/iomgr/ev_apple.cc',
'src/core/lib/iomgr/ev_epoll1_linux.cc',
'src/core/lib/iomgr/ev_epollex_linux.cc',
'src/core/lib/iomgr/ev_poll_posix.cc',

@ -72,7 +72,7 @@ post_install do |installer|
# GPR_UNREACHABLE_CODE causes "Control may reach end of non-void
# function" warning
config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO'
config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1 GRPC_CFSTREAM=1'
config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1'
end
end

@ -1539,6 +1539,8 @@ src/core/lib/iomgr/error.h \
src/core/lib/iomgr/error_cfstream.cc \
src/core/lib/iomgr/error_cfstream.h \
src/core/lib/iomgr/error_internal.h \
src/core/lib/iomgr/ev_apple.cc \
src/core/lib/iomgr/ev_apple.h \
src/core/lib/iomgr/ev_epoll1_linux.cc \
src/core/lib/iomgr/ev_epoll1_linux.h \
src/core/lib/iomgr/ev_epollex_linux.cc \

@ -1351,6 +1351,8 @@ src/core/lib/iomgr/error.h \
src/core/lib/iomgr/error_cfstream.cc \
src/core/lib/iomgr/error_cfstream.h \
src/core/lib/iomgr/error_internal.h \
src/core/lib/iomgr/ev_apple.cc \
src/core/lib/iomgr/ev_apple.h \
src/core/lib/iomgr/ev_epoll1_linux.cc \
src/core/lib/iomgr/ev_epoll1_linux.h \
src/core/lib/iomgr/ev_epollex_linux.cc \

Loading…
Cancel
Save