Merge remote-tracking branch 'upstream/master' into service_config

pull/8429/head
Mark D. Roth 8 years ago
commit 67c1c6be01
  1. 121
      doc/epoll-polling-engine.md
  2. BIN
      doc/images/new_epoll_impl.png
  3. BIN
      doc/images/old_epoll_impl.png
  4. 2
      src/core/lib/channel/handshaker.c
  5. 9
      src/core/lib/iomgr/ev_poll_posix.c
  6. 33
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  7. 2
      src/csharp/Grpc.Core/Server.cs
  8. 12
      src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj
  9. 2
      src/objective-c/examples/Sample/Sample.xcodeproj/xcshareddata/xcschemes/Sample.xcscheme
  10. 18
      src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj
  11. 2
      src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/xcshareddata/xcschemes/SwiftSample.xcscheme
  12. 25
      src/objective-c/examples/SwiftSample/ViewController.swift
  13. 2
      src/objective-c/tests/Info.plist
  14. 16
      src/objective-c/tests/Tests.xcodeproj/project.pbxproj
  15. 2
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/AllTests.xcscheme
  16. 2
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/CoreCronetEnd2EndTests.xcscheme
  17. 2
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/InteropTestsLocalCleartext.xcscheme
  18. 2
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/InteropTestsLocalSSL.xcscheme
  19. 2
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/InteropTestsRemote.xcscheme
  20. 2
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/InteropTestsRemoteWithCronet.xcscheme
  21. 2
      src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/RxLibraryUnitTests.xcscheme
  22. 13
      src/php/tests/interop/interop_client.php
  23. 12
      src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
  24. 4
      src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py
  25. 2
      test/core/end2end/gen_build_yaml.py
  26. 29
      test/cpp/interop/interop_client.cc
  27. 5
      test/cpp/interop/interop_server.cc
  28. 13
      tools/run_tests/run_interop_tests.py
  29. 156
      tools/run_tests/tests.json

@ -0,0 +1,121 @@
# `epoll`-based pollset implementation in gRPC
Sree Kuchibhotla (sreek@) [May - 2016]
(Design input from Craig Tiller and David Klempner)
> Status: As of June 2016, this change is implemented and merged.
> * The bulk of the functionality is in: [ev_poll_linux.c](https://github.com/grpc/grpc/blob/master/src/core/lib/iomgr/ev_epoll_linux.c)
> * Pull request: https://github.com/grpc/grpc/pull/6803
## 1. Introduction
The document talks about the proposed changes to `epoll`-based implementation of pollsets in gRPC. Section-2 gives an overview of the current implementation, Section-3 talks about the problems in the current implementation and finally Section-4 talks about the proposed changes.
## 2. Current `epoll`-based implementation in gRPC
![image](images/old_epoll_impl.png)
**Figure 1: Current implementation**
A gRPC client or a server can have more than one completion queue. Each completion queue creates a pollset.
The gRPC core library does not create any threads[^1] on its own and relies on the application using the gRPC core library to provide the threads. A thread starts to poll for events by calling the gRPC core surface APIs `grpc_completion_queue_next()` or `grpc_completion_queue_pluck()`. More than one thread can call `grpc_completion_queue_next()`on the same completion queue[^2].
A file descriptor can be in more than one completion queue. There are examples in the next section that show how this can happen.
When an event of interest happens in a pollset, multiple threads are woken up and there are no guarantees on which thread actually ends up performing the work i.e executing the callbacks associated with that event. The thread that performs the work finally queues a completion event `grpc_cq_completion` on the appropriate completion queue and "kicks" (i.e wakes ups) the thread that is actually interested in that event (which can be itself - in which case there is no thread hop)
For example, in **Figure 1**, if `fd1` becomes readable, any one of the threads i.e *Threads 1* to *Threads K* or *Thread P*, might be woken up. Let's say *Thread P* was calling a `grpc_completion_queue_pluck()` and was actually interested in the event on `fd1` but *Thread 1* woke up. In this case, *Thread 1* executes the callbacks and finally kicks *Thread P* by signalling `event_fd_P`. *Thread P* wakes up, realizes that there is a new completion event for it and returns from `grpc_completion_queue_pluck()` to its caller.
## 3. Issues in the current architecture
### _Thundering Herds_
If multiple threads concurrently call `epoll_wait()`, we are guaranteed that only one thread is woken up if one of the `fds` in the set becomes readable/writable. However, in our current implementation, the threads do not directly call a blocking `epoll_wait()`[^3]. Instead, they call `poll()` on the set containing `[event_fd`[^4]`, epoll_fd]`. **(see Figure 1)**
Considering the fact that an `fd` can be in multiple `pollsets` and that each `pollset` might have multiple poller threads, it means that whenever an `fd` becomes readable/writable, all the threads in all the `pollsets` (in which that `fd` is present) are woken up.
The performance impact of this would be more conspicuous on the server side. Here are a two examples of thundering herds on the server side.
Example 1: Listening fds on server
* A gRPC server can have multiple server completion queues (i.e completion queues which are used to listen for incoming channels).
* A gRPC server can also listen on more than one TCP-port.
* A listening socket is created for each port the gRPC server would be listening on.
* Every listening socket's fd is added to all the server completion queues' pollsets. (Currently we do not do any sharding of the listening fds across these pollsets).
This means that for every incoming new channel, all the threads waiting on all the pollsets are woken up.
Example 2: New Incoming-channel fds on server
* Currently, every new incoming channel's `fd` (i.e the socket `fd` that is returned by doing an `accept()` on the new incoming channel) is added to all the server completion queues' pollsets [^5]).
* Clearly, this would also cause all thundering herd problem for every read onthat fd
There are other scenarios especially on the client side where an fd can end up being on multiple pollsets which would cause thundering herds on the clients.
## 4. Proposed changes to the current `epoll`-based polling implementation:
The main idea in this proposal is to group 'related' `fds` into a single epoll-based set. This would ensure that only one thread wakes up in case of an event on one of the `fds` in the epoll set.
To accomplish this, we introduce a new abstraction called `polling_island` which will have an epoll set underneath (See **Figure 2** below). A `polling_island` contains the following:
* `epoll_fd`: The file descriptor of the underlying epoll set
* `fd_set`: The set of 'fds' in the pollset island i.e in the epoll set (The pollset island merging operation described later requires the list of fds in the pollset island and currently there is no API available to enumerate all the fds in an epoll set)
* `event_fd`: A level triggered _event fd_ that is used to wake up all the threads waiting on this epoll set (Note: This `event_fd` is added to the underlying epoll set during pollset island creation. This is useful in the pollset island merging operation described later)
* `merged_to`: The polling island into which this one merged. See section 4.2 (case 2) for more details on this. Also note that if `merged_to` is set, all the other fields in this polling island are not used anymore
In this new model, only one thread wakes up whenever an event of interest happens in an epoll set.
![drawing](images/new_epoll_impl.png)
**Figure 2: Proposed changes**
### 4.1 Relation between `fd`, `pollset` and `polling_island:`
* An `fd` may belong to multiple `pollsets` but belongs to exactly one `polling_island`
* A `pollset` belongs to exactly one `polling_island`
* An `fd` and the `pollset(s`) it belongs to, have same `polling_island`
### 4.2 Algorithm to add an `fd` to a `pollset`
There are two cases to check here:
* **Case 1:** Both `fd` and `pollset` already belong to the same `polling_island`
* This is straightforward and nothing really needs to be done here
* **Case 2:** The `fd `and `pollset` point to different `polling_islands`: In this case we _merge_ both the polling islands i.e:
* Add all the `fds` from the smaller `polling_island `to the larger `polling_island` and update the `merged_to` pointer on the smaller island to point to the larger island.
* Wake up all the threads waiting on the smaller `polling_island`'s `epoll_fd` (by signalling the `event_fd` on that island) and make them now wait on the larger `polling_island`'s `epoll_fd`
* Update `fd` and `pollset` to now point to the larger `polling_island`
### 4.3 Directed wakeups:
The new implementation, just like the current implementation, does not provide us any guarantees that the thread that is woken up is the thread that is actually interested in the event. So the thread that woke up executes the callbacks and finally has to 'kick' the appropriate polling thread interested in the event.
In the current implementation, every polling thread also had a `event_fd` on which it was listening to and hence waking it up was as simple as signalling that `event_fd`. However, using an `event_fd` also meant that every thread has to use a `poll()` (on `event_fd` and `epoll_fd`) instead of doing an `epoll_wait()` and this resulted in the thundering herd problems described above.
The proposal here is to use signals and kicking a thread would just be sending a signal to that thread. Unfortunately there are only a few signals available on posix systems and most of them have pre-determined behavior leaving only a few signals `SIGUSR1`, `SIGUSR2` and `SIGRTx (SIGRTMIN to SIGRTMAX)` for custom use.
The calling application might have registered other signal handlers for these signals. `We will provide a new API where the applications can "give a signal number" to gRPC library to use for this purpose.
```
void grpc_use_signal(int signal_num)
```
If the calling application does not provide a signal number, then the gRPC library will relegate to using a model similar to the current implementation (where every thread does a blocking `poll()` on its `wakeup_fd` and the `epoll_fd`). The function` psi_wait() `in figure 2 implements this logic.
**>> **(**NOTE**: Or alternatively, we can implement a turnstile polling (i.e having only one thread calling `epoll_wait()` on the epoll set at any time - which all other threads call poll on their `wakeup_fds`)
in case of not getting a signal number from the applications.
## Notes
[^1]: Only exception is in case of name-resolution
[^2]: However, a `grpc_completion_queue_next()` and `grpc_completion_queue_pluck()` must not be called in parallel on the same completion queue
[^3]: The threads first do a blocking` poll()` with `[wakeup_fd, epoll_fd]`. If the `poll()` returns due to an event of interest in the epoll set, they then call a non-blocking i.e a zero-timeout `epoll_wait()` on the `epoll_fd`
[^4]: `event_fd` is the linux platform specific implementation of `grpc_wakeup_fd`. A `wakeup_fd` is used to wake up polling threads typically when the event for which the polling thread is waiting is already completed by some other thread. It is also used to wake up the polling threads in case of shutdowns or to re-evaluate the poller's interest in the fds to poll (the last scenario is only in case of `poll`-based (not `epoll`-based) implementation of `pollsets`).
[^5]: See more details about the issue here https://github.com/grpc/grpc/issues/5470 and for a proposed fix here: https://github.com/grpc/grpc/pull/6149

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

@ -183,7 +183,7 @@ void grpc_handshake_manager_do_handshake(
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data) {
grpc_channel_args* args_copy = grpc_channel_args_copy(args);
gpr_slice_buffer* read_buffer = malloc(sizeof(*read_buffer));
gpr_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer));
gpr_slice_buffer_init(read_buffer);
if (mgr->count == 0) {
// No handshakers registered, so we just immediately call the done

@ -985,8 +985,15 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (errno != EINTR) {
work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
}
for (i = 2; i < pfd_count; i++) {
fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
if (watchers[i].fd == NULL) {
fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
} else {
// Wake up all the file descriptors, if we have an invalid one
// we can identify it on the next pollset_work()
fd_end_poll(exec_ctx, &watchers[i], 1, 1, pollset);
}
}
} else if (r == 0) {
for (i = 2; i < pfd_count; i++) {

@ -277,20 +277,31 @@ namespace Grpc.Core.Internal
}
}
internal class NoSuchMethodCallHandler : IServerCallHandler
internal class UnimplementedMethodCallHandler : IServerCallHandler
{
public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
public static readonly UnimplementedMethodCallHandler Instance = new UnimplementedMethodCallHandler();
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
DuplexStreamingServerCallHandler<byte[], byte[]> callHandlerImpl;
public UnimplementedMethodCallHandler()
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload, newRpc.Server);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
var marshaller = new Marshaller<byte[]>((payload) => payload, (payload) => payload);
var method = new Method<byte[], byte[]>(MethodType.DuplexStreaming, "", "", marshaller, marshaller);
this.callHandlerImpl = new DuplexStreamingServerCallHandler<byte[], byte[]>(method, new DuplexStreamingServerMethod<byte[], byte[]>(UnimplementedMethod));
}
/// <summary>
/// Handler used for unimplemented method.
/// </summary>
private Task UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx)
{
ctx.Status = new Status(StatusCode.Unimplemented, "");
return Task.FromResult<object>(null);
}
public Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
return callHandlerImpl.HandleCall(newRpc, cq);
}
}

@ -317,7 +317,7 @@ namespace Grpc.Core
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(newRpc.Method, out callHandler))
{
callHandler = NoSuchMethodCallHandler.Instance;
callHandler = UnimplementedMethodCallHandler.Instance;
}
await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
}

@ -3,7 +3,7 @@
archiveVersion = 1;
classes = {
};
objectVersion = 46;
objectVersion = 48;
objects = {
/* Begin PBXBuildFile section */
@ -129,7 +129,7 @@
6369A2621A9322E20015FC5C /* Project object */ = {
isa = PBXProject;
attributes = {
LastUpgradeCheck = 0730;
LastUpgradeCheck = 0800;
ORGANIZATIONNAME = gRPC;
TargetAttributes = {
6369A2691A9322E20015FC5C = {
@ -138,7 +138,7 @@
};
};
buildConfigurationList = 6369A2651A9322E20015FC5C /* Build configuration list for PBXProject "Sample" */;
compatibilityVersion = "Xcode 3.2";
compatibilityVersion = "Xcode 8.0";
developmentRegion = English;
hasScannedForEncodings = 0;
knownRegions = (
@ -253,8 +253,10 @@
CLANG_WARN_DIRECT_OBJC_ISA_USAGE = YES_ERROR;
CLANG_WARN_EMPTY_BODY = YES;
CLANG_WARN_ENUM_CONVERSION = YES;
CLANG_WARN_INFINITE_RECURSION = YES;
CLANG_WARN_INT_CONVERSION = YES;
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_SUSPICIOUS_MOVE = YES;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
@ -263,6 +265,7 @@
ENABLE_TESTABILITY = YES;
GCC_C_LANGUAGE_STANDARD = gnu99;
GCC_DYNAMIC_NO_PIC = NO;
GCC_NO_COMMON_BLOCKS = YES;
GCC_OPTIMIZATION_LEVEL = 0;
GCC_PREPROCESSOR_DEFINITIONS = (
"DEBUG=1",
@ -296,8 +299,10 @@
CLANG_WARN_DIRECT_OBJC_ISA_USAGE = YES_ERROR;
CLANG_WARN_EMPTY_BODY = YES;
CLANG_WARN_ENUM_CONVERSION = YES;
CLANG_WARN_INFINITE_RECURSION = YES;
CLANG_WARN_INT_CONVERSION = YES;
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_SUSPICIOUS_MOVE = YES;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
@ -305,6 +310,7 @@
ENABLE_NS_ASSERTIONS = NO;
ENABLE_STRICT_OBJC_MSGSEND = YES;
GCC_C_LANGUAGE_STANDARD = gnu99;
GCC_NO_COMMON_BLOCKS = YES;
GCC_WARN_64_TO_32_BIT_CONVERSION = YES;
GCC_WARN_ABOUT_RETURN_TYPE = YES_ERROR;
GCC_WARN_UNDECLARED_SELECTOR = YES;

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "0730"
LastUpgradeVersion = "0800"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

@ -3,7 +3,7 @@
archiveVersion = 1;
classes = {
};
objectVersion = 46;
objectVersion = 48;
objects = {
/* Begin PBXBuildFile section */
@ -123,16 +123,17 @@
isa = PBXProject;
attributes = {
LastSwiftUpdateCheck = 0710;
LastUpgradeCheck = 0730;
LastUpgradeCheck = 0800;
ORGANIZATIONNAME = gRPC;
TargetAttributes = {
633BFFC11B950B210007E424 = {
CreatedOnToolsVersion = 6.4;
LastSwiftMigration = 0800;
};
};
};
buildConfigurationList = 633BFFBD1B950B210007E424 /* Build configuration list for PBXProject "SwiftSample" */;
compatibilityVersion = "Xcode 3.2";
compatibilityVersion = "Xcode 8.0";
developmentRegion = English;
hasScannedForEncodings = 0;
knownRegions = (
@ -246,8 +247,10 @@
CLANG_WARN_DIRECT_OBJC_ISA_USAGE = YES_ERROR;
CLANG_WARN_EMPTY_BODY = YES;
CLANG_WARN_ENUM_CONVERSION = YES;
CLANG_WARN_INFINITE_RECURSION = YES;
CLANG_WARN_INT_CONVERSION = YES;
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_SUSPICIOUS_MOVE = YES;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
@ -292,8 +295,10 @@
CLANG_WARN_DIRECT_OBJC_ISA_USAGE = YES_ERROR;
CLANG_WARN_EMPTY_BODY = YES;
CLANG_WARN_ENUM_CONVERSION = YES;
CLANG_WARN_INFINITE_RECURSION = YES;
CLANG_WARN_INT_CONVERSION = YES;
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_SUSPICIOUS_MOVE = YES;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
@ -312,6 +317,7 @@
IPHONEOS_DEPLOYMENT_TARGET = 8.4;
MTL_ENABLE_DEBUG_INFO = NO;
SDKROOT = iphoneos;
SWIFT_OPTIMIZATION_LEVEL = "-Owholemodule";
TARGETED_DEVICE_FAMILY = "1,2";
VALIDATE_PRODUCT = YES;
};
@ -327,6 +333,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "io.grpc.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
SWIFT_OBJC_BRIDGING_HEADER = "";
SWIFT_VERSION = 3.0;
USER_HEADER_SEARCH_PATHS = "";
};
name = Debug;
@ -341,6 +348,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "io.grpc.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
SWIFT_OBJC_BRIDGING_HEADER = "";
SWIFT_VERSION = 3.0;
USER_HEADER_SEARCH_PATHS = "";
};
name = Release;
@ -355,7 +363,7 @@
633BFFE01B950B210007E424 /* Release */,
);
defaultConfigurationIsVisible = 0;
defaultConfigurationName = Release;
defaultConfigurationName = Debug;
};
633BFFE11B950B210007E424 /* Build configuration list for PBXNativeTarget "SwiftSample" */ = {
isa = XCConfigurationList;
@ -364,7 +372,7 @@
633BFFE31B950B210007E424 /* Release */,
);
defaultConfigurationIsVisible = 0;
defaultConfigurationName = Release;
defaultConfigurationName = Debug;
};
/* End XCConfigurationList section */
};

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "0730"
LastUpgradeVersion = "0800"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

@ -51,7 +51,7 @@ class ViewController: UIViewController {
// Example gRPC call using a generated proto client library:
let service = RMTTestService(host: RemoteHost)
service.unaryCallWithRequest(request) { response, error in
service.unaryCall(with: request) { response, error in
if let response = response {
NSLog("1. Finished successfully with response:\n\(response)")
} else {
@ -63,40 +63,39 @@ class ViewController: UIViewController {
// Same but manipulating headers:
var RPC : GRPCProtoCall! // Needed to convince Swift to capture by reference (__block)
RPC = service.RPCToUnaryCallWithRequest(request) { response, error in
RPC = service.rpcToUnaryCall(with: request) { response, error in
if let response = response {
NSLog("2. Finished successfully with response:\n\(response)")
} else {
NSLog("2. Finished with error: \(error!)")
}
NSLog("2. Response headers: \(RPC.responseHeaders)")
NSLog("2. Response trailers: \(RPC.responseTrailers)")
NSLog("2. Response headers: \(RPC.responseHeaders!)")
NSLog("2. Response trailers: \(RPC.responseTrailers!)")
}
// TODO(jcanizales): Revert to using subscript syntax once XCode 8 is released.
RPC.requestHeaders.setObject("My value", forKey: "My-Header")
RPC.requestHeaders["My-Header"] = "My value";
RPC.start()
// Same example call using the generic gRPC client library:
let method = GRPCProtoMethod(package: "grpc.testing", service: "TestService", method: "UnaryCall")
let method = GRPCProtoMethod(package: "grpc.testing", service: "TestService", method: "UnaryCall")!
let requestsWriter = GRXWriter(value: request.data())
let call = GRPCCall(host: RemoteHost, path: method.HTTPPath, requestsWriter: requestsWriter)
let call = GRPCCall(host: RemoteHost, path: method.httpPath, requestsWriter: requestsWriter)!
call.requestHeaders.setObject("My value", forKey: "My-Header")
call.requestHeaders["My-Header"] = "My value";
call.startWithWriteable(GRXWriteable { response, error in
if let response = response as? NSData {
call.start(with: GRXWriteable { response, error in
if let response = response as? Data {
NSLog("3. Received response:\n\(try! RMTSimpleResponse(data: response))")
} else {
NSLog("3. Finished with error: \(error!)")
}
NSLog("3. Response headers: \(call.responseHeaders)")
NSLog("3. Response trailers: \(call.responseTrailers)")
NSLog("3. Response headers: \(call.responseHeaders!)")
NSLog("3. Response trailers: \(call.responseTrailers!)")
})
}
}

@ -7,7 +7,7 @@
<key>CFBundleExecutable</key>
<string>$(EXECUTABLE_NAME)</string>
<key>CFBundleIdentifier</key>
<string>gRPC.$(PRODUCT_NAME:rfc1034identifier)</string>
<string>$(PRODUCT_BUNDLE_IDENTIFIER)</string>
<key>CFBundleInfoDictionaryVersion</key>
<string>6.0</string>
<key>CFBundleName</key>

@ -3,7 +3,7 @@
archiveVersion = 1;
classes = {
};
objectVersion = 46;
objectVersion = 48;
objects = {
/* Begin PBXBuildFile section */
@ -535,7 +535,7 @@
635697BF1B14FC11007A7283 /* Project object */ = {
isa = PBXProject;
attributes = {
LastUpgradeCheck = 0630;
LastUpgradeCheck = 0800;
ORGANIZATIONNAME = gRPC;
TargetAttributes = {
5E8A5DA31D3840B4000F8BC4 = {
@ -565,7 +565,7 @@
};
};
buildConfigurationList = 635697C21B14FC11007A7283 /* Build configuration list for PBXProject "Tests" */;
compatibilityVersion = "Xcode 3.2";
compatibilityVersion = "Xcode 8.0";
developmentRegion = English;
hasScannedForEncodings = 0;
knownRegions = (
@ -1151,8 +1151,10 @@
CLANG_WARN_DIRECT_OBJC_ISA_USAGE = YES_ERROR;
CLANG_WARN_EMPTY_BODY = YES;
CLANG_WARN_ENUM_CONVERSION = YES;
CLANG_WARN_INFINITE_RECURSION = YES;
CLANG_WARN_INT_CONVERSION = YES;
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_SUSPICIOUS_MOVE = YES;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
COPY_PHASE_STRIP = NO;
@ -1205,6 +1207,7 @@
);
INFOPLIST_FILE = Info.plist;
LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks @loader_path/Frameworks";
PRODUCT_BUNDLE_IDENTIFIER = "gRPC.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
};
name = Cronet;
@ -1362,6 +1365,7 @@
);
INFOPLIST_FILE = Info.plist;
LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks @loader_path/Frameworks";
PRODUCT_BUNDLE_IDENTIFIER = "gRPC.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
};
name = Debug;
@ -1376,6 +1380,7 @@
);
INFOPLIST_FILE = Info.plist;
LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks @loader_path/Frameworks";
PRODUCT_BUNDLE_IDENTIFIER = "gRPC.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
};
name = Release;
@ -1393,13 +1398,16 @@
CLANG_WARN_DIRECT_OBJC_ISA_USAGE = YES_ERROR;
CLANG_WARN_EMPTY_BODY = YES;
CLANG_WARN_ENUM_CONVERSION = YES;
CLANG_WARN_INFINITE_RECURSION = YES;
CLANG_WARN_INT_CONVERSION = YES;
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_SUSPICIOUS_MOVE = YES;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
COPY_PHASE_STRIP = NO;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
ENABLE_STRICT_OBJC_MSGSEND = YES;
ENABLE_TESTABILITY = YES;
GCC_C_LANGUAGE_STANDARD = gnu99;
GCC_DYNAMIC_NO_PIC = NO;
GCC_NO_COMMON_BLOCKS = YES;
@ -1436,8 +1444,10 @@
CLANG_WARN_DIRECT_OBJC_ISA_USAGE = YES_ERROR;
CLANG_WARN_EMPTY_BODY = YES;
CLANG_WARN_ENUM_CONVERSION = YES;
CLANG_WARN_INFINITE_RECURSION = YES;
CLANG_WARN_INT_CONVERSION = YES;
CLANG_WARN_OBJC_ROOT_CLASS = YES_ERROR;
CLANG_WARN_SUSPICIOUS_MOVE = YES;
CLANG_WARN_UNREACHABLE_CODE = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
COPY_PHASE_STRIP = NO;

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "0630"
LastUpgradeVersion = "0800"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "0730"
LastUpgradeVersion = "0800"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "0700"
LastUpgradeVersion = "0800"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "0700"
LastUpgradeVersion = "0800"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "0700"
LastUpgradeVersion = "0800"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "0730"
LastUpgradeVersion = "0800"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "0700"
LastUpgradeVersion = "0800"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

@ -477,9 +477,11 @@ function statusCodeAndMessage($stub)
list($result, $status) = $call->wait();
hardAssert($status->code === 2,
'Received unexpected status code');
'Received unexpected UnaryCall status code: ' .
$status->code);
hardAssert($status->details === 'test status message',
'Received unexpected status details');
'Received unexpected UnaryCall status details: ' .
$status->details);
$streaming_call = $stub->FullDuplexCall();
@ -487,12 +489,15 @@ function statusCodeAndMessage($stub)
$streaming_request->setResponseStatus($echo_status);
$streaming_call->write($streaming_request);
$streaming_call->writesDone();
$result = $streaming_call->read();
$status = $streaming_call->getStatus();
hardAssert($status->code === 2,
'Received unexpected status code');
'Received unexpected FullDuplexCall status code: ' .
$status->code);
hardAssert($status->details === 'test status message',
'Received unexpected status details');
'Received unexpected FullDuplexCall status details: ' .
$status->details);
}
function unimplementedMethod($stub)

@ -34,8 +34,6 @@ import time
import unittest
import grpc
from grpc import _channel
from grpc import _server
from tests.unit.framework.common import test_constants
from tests.unit import _thread_pool
@ -78,7 +76,7 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_lonely_channel_connectivity(self):
callback = _Callback()
channel = _channel.Channel('localhost:12345', (), None)
channel = grpc.insecure_channel('localhost:12345')
channel.subscribe(callback.update, try_to_connect=False)
first_connectivities = callback.block_until_connectivities_satisfy(bool)
channel.subscribe(callback.update, try_to_connect=True)
@ -105,13 +103,13 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_immediately_connectable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
server = _server.Server(thread_pool, (), ())
server = grpc.server(thread_pool)
port = server.add_insecure_port('[::]:0')
server.start()
first_callback = _Callback()
second_callback = _Callback()
channel = _channel.Channel('localhost:{}'.format(port), (), None)
channel = grpc.insecure_channel('localhost:{}'.format(port))
channel.subscribe(first_callback.update, try_to_connect=False)
first_connectivities = first_callback.block_until_connectivities_satisfy(
bool)
@ -146,12 +144,12 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_reachable_then_unreachable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
server = _server.Server(thread_pool, (), ())
server = grpc.server(thread_pool)
port = server.add_insecure_port('[::]:0')
server.start()
callback = _Callback()
channel = _channel.Channel('localhost:{}'.format(port), (), None)
channel = grpc.insecure_channel('localhost:{}'.format(port))
channel.subscribe(callback.update, try_to_connect=True)
callback.block_until_connectivities_satisfy(_ready_in_connectivities)
# Now take down the server and confirm that channel readiness is repudiated.

@ -33,8 +33,6 @@ import threading
import unittest
import grpc
from grpc import _channel
from grpc import _server
from tests.unit.framework.common import test_constants
from tests.unit import _thread_pool
@ -79,7 +77,7 @@ class ChannelReadyFutureTest(unittest.TestCase):
def test_immediately_connectable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
server = _server.Server(thread_pool, (), ())
server = grpc.server(thread_pool)
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:{}'.format(port))

@ -246,7 +246,7 @@ def main():
{
'name': '%s_test' % f,
'args': [t],
'exclude_configs': [],
'exclude_configs': END2END_FIXTURES[f].exclude_configs,
'platforms': END2END_FIXTURES[f].platforms,
'ci_platforms': (END2END_FIXTURES[f].platforms
if END2END_FIXTURES[f].ci_mac else without(

@ -827,21 +827,42 @@ bool InteropClient::DoStatusWithMessage() {
gpr_log(GPR_DEBUG,
"Sending RPC with a request for status code 2 and message");
const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
const grpc::string test_msg = "This is a test message";
// Test UnaryCall.
ClientContext context;
SimpleRequest request;
SimpleResponse response;
EchoStatus* requested_status = request.mutable_response_status();
requested_status->set_code(grpc::StatusCode::UNKNOWN);
grpc::string test_msg = "This is a test message";
requested_status->set_code(test_code);
requested_status->set_message(test_msg);
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN)) {
return false;
}
GPR_ASSERT(s.error_message() == test_msg);
// Test FullDuplexCall.
ClientContext stream_context;
std::shared_ptr<ClientReaderWriter<StreamingOutputCallRequest,
StreamingOutputCallResponse>>
stream(serviceStub_.Get()->FullDuplexCall(&stream_context));
StreamingOutputCallRequest streaming_request;
requested_status = streaming_request.mutable_response_status();
requested_status->set_code(test_code);
requested_status->set_message(test_msg);
stream->Write(streaming_request);
stream->WritesDone();
StreamingOutputCallResponse streaming_response;
while (stream->Read(&streaming_response))
;
s = stream->Finish();
if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN)) {
return false;
}
GPR_ASSERT(s.error_message() == test_msg);
gpr_log(GPR_DEBUG, "Done testing Status and Message");
return true;
}

@ -269,6 +269,11 @@ class TestServiceImpl : public TestService::Service {
StreamingOutputCallResponse response;
bool write_success = true;
while (write_success && stream->Read(&request)) {
if (request.has_response_status()) {
return Status(
static_cast<grpc::StatusCode>(request.response_status().code()),
request.response_status().message());
}
if (request.response_parameters_size() != 0) {
response.mutable_payload()->set_type(request.payload().type());
response.mutable_payload()->set_body(

@ -64,8 +64,9 @@ _SKIP_SERVER_COMPRESSION = ['server_compressed_unary',
_SKIP_COMPRESSION = _SKIP_CLIENT_COMPRESSION + _SKIP_SERVER_COMPRESSION
_SKIP_ADVANCED = ['custom_metadata', 'status_code_and_message',
'unimplemented_method']
_SKIP_ADVANCED_GO = ['custom_metadata', 'unimplemented_method']
_SKIP_ADVANCED = _SKIP_ADVANCED_GO + ['status_code_and_message']
_TEST_TIMEOUT = 3*60
@ -89,10 +90,10 @@ class CXXLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_ADVANCED
return []
def unimplemented_test_cases_server(self):
return _SKIP_ADVANCED
return []
def __str__(self):
return 'c++'
@ -206,10 +207,10 @@ class GoLanguage:
return {}
def unimplemented_test_cases(self):
return _SKIP_ADVANCED + _SKIP_COMPRESSION
return _SKIP_ADVANCED_GO + _SKIP_COMPRESSION
def unimplemented_test_cases_server(self):
return _SKIP_ADVANCED + _SKIP_COMPRESSION
return _SKIP_ADVANCED_GO + _SKIP_COMPRESSION
def __str__(self):
return 'go'

@ -17822,7 +17822,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -17844,7 +17846,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -17866,7 +17870,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -17888,7 +17894,9 @@
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -17910,7 +17918,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -17932,7 +17942,9 @@
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -17954,7 +17966,9 @@
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -17976,7 +17990,9 @@
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -17998,7 +18014,9 @@
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18020,7 +18038,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18108,7 +18128,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18130,7 +18152,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18152,7 +18176,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18174,7 +18200,9 @@
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18196,7 +18224,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18218,7 +18248,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18240,7 +18272,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18284,7 +18318,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18306,7 +18342,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18328,7 +18366,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18350,7 +18390,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18372,7 +18414,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18394,7 +18438,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18416,7 +18462,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18438,7 +18486,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18460,7 +18510,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18482,7 +18534,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18504,7 +18558,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18526,7 +18582,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18548,7 +18606,9 @@
"posix"
],
"cpu_cost": 0.1,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18570,7 +18630,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18592,7 +18654,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18614,7 +18678,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18636,7 +18702,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18658,7 +18726,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18702,7 +18772,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18724,7 +18796,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18746,7 +18820,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",
@ -18768,7 +18844,9 @@
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_configs": [
"msan"
],
"flaky": false,
"language": "c",
"name": "h2_ssl_test",

Loading…
Cancel
Save