New channel pool design

pull/16190/head
Muxi Yan 6 years ago
parent 87abab45c9
commit f0cbcde731
  1. 2
      src/objective-c/GRPCClient/GRPCCall+ChannelArg.m
  2. 2
      src/objective-c/GRPCClient/GRPCCall.m
  3. 36
      src/objective-c/GRPCClient/private/GRPCChannel.h
  4. 117
      src/objective-c/GRPCClient/private/GRPCChannel.m
  5. 74
      src/objective-c/GRPCClient/private/GRPCChannelPool.h
  6. 257
      src/objective-c/GRPCClient/private/GRPCChannelPool.m
  7. 15
      src/objective-c/GRPCClient/private/GRPCWrappedCall.m
  8. 222
      src/objective-c/tests/ChannelTests/ChannelPoolTest.m
  9. 126
      src/objective-c/tests/ChannelTests/ChannelTests.m

@ -36,7 +36,7 @@
}
+ (void)closeOpenConnections {
[GRPCChannelPool closeOpenConnections];
[[GRPCChannelPool sharedInstance] closeOpenConnections];
}
+ (void)setDefaultCompressMethod:(GRPCCompressAlgorithm)algorithm forhost:(nonnull NSString *)host {

@ -488,7 +488,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
requestsWriter:(GRXWriter *)requestWriter
callOptions:(GRPCCallOptions *)callOptions {
// Purposely using pointer rather than length ([host length] == 0) for backwards compatibility.
NSAssert(host && path, @"Neither host nor path can be nil.");
NSAssert(host != nil && path != nil, @"Neither host nor path can be nil.");
NSAssert(safety <= GRPCCallSafetyCacheableRequest,
@"Invalid call safety value.");
NSAssert(requestWriter.state == GRXWriterStateNotStarted,

@ -61,47 +61,19 @@ NS_ASSUME_NONNULL_BEGIN
+ (nullable instancetype) new NS_UNAVAILABLE;
/**
* Create a channel with remote \a host and signature \a channelConfigurations. Destroy delay is
* defaulted to 30 seconds.
*/
- (nullable instancetype)initWithChannelConfiguration:
(GRPCChannelConfiguration *)channelConfiguration;
/**
* Create a channel with remote \a host, signature \a channelConfigurations, and destroy delay of
* \a destroyDelay.
* Create a channel with remote \a host and signature \a channelConfigurations.
*/
- (nullable instancetype)initWithChannelConfiguration:
(GRPCChannelConfiguration *)channelConfiguration
destroyDelay:(NSTimeInterval)destroyDelay
NS_DESIGNATED_INITIALIZER;
/**
* Create a grpc core call object from this channel. The channel's refcount is added by 1. If no
* call is created, NULL is returned, and if the reason is because the channel is already
* disconnected, \a disconnected is set to YES. When the returned call is unreffed, the caller is
* obligated to call \a unref method once. \a disconnected may be null.
* Create a grpc core call object (grpc_call) from this channel. If no call is created, NULL is
* returned.
*/
- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions
disconnected:(nullable BOOL *)disconnected;
/**
* Unref the channel when a call is done. It also decreases the channel's refcount. If the refcount
* of the channel decreases to 0, the channel is destroyed after the destroy delay.
*/
- (void)unref;
/**
* Force the channel to be disconnected and destroyed.
*/
- (void)disconnect;
/**
* Return whether the channel is already disconnected.
*/
@property(readonly) BOOL disconnected;
callOptions:(GRPCCallOptions *)callOptions;
@end

@ -34,9 +34,6 @@
#import <GRPCClient/GRPCCall+Cronet.h>
#import <GRPCClient/GRPCCallOptions.h>
/** When all calls of a channel are destroyed, destroy the channel after this much seconds. */
static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
@implementation GRPCChannelConfiguration
- (instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
@ -178,43 +175,18 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
@implementation GRPCChannel {
GRPCChannelConfiguration *_configuration;
dispatch_queue_t _dispatchQueue;
grpc_channel *_unmanagedChannel;
NSTimeInterval _destroyDelay;
NSUInteger _refcount;
NSDate *_lastDispatch;
}
@synthesize disconnected = _disconnected;
- (instancetype)initWithChannelConfiguration:
(GRPCChannelConfiguration *)channelConfiguration {
return [self initWithChannelConfiguration:channelConfiguration
destroyDelay:kDefaultChannelDestroyDelay];
}
- (instancetype)initWithChannelConfiguration:
(GRPCChannelConfiguration *)channelConfiguration
destroyDelay:(NSTimeInterval)destroyDelay {
NSAssert(channelConfiguration != nil,
@"channelConfiguration must not be empty.");
NSAssert(destroyDelay > 0, @"destroyDelay must be greater than 0.");
if (channelConfiguration == nil) return nil;
if (destroyDelay <= 0) return nil;
if ((self = [super init])) {
_configuration = [channelConfiguration copy];
#if __IPHONE_OS_VERSION_MAX_ALLOWED < 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED < 101300
if (@available(iOS 8.0, macOS 10.10, *)) {
_dispatchQueue = dispatch_queue_create(
NULL,
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
} else {
#else
{
#endif
_dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
}
// Create gRPC core channel object.
NSString *host = channelConfiguration.host;
NSAssert(host.length != 0, @"host cannot be nil");
@ -233,38 +205,31 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
NSLog(@"Unable to create channel.");
return nil;
}
_destroyDelay = destroyDelay;
_disconnected = NO;
}
return self;
}
- (grpc_call *)unmanagedCallWithPath:(NSString *)path
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions
disconnected:(BOOL *)disconnected {
callOptions:(GRPCCallOptions *)callOptions {
NSAssert(path.length > 0, @"path must not be empty.");
NSAssert(queue != nil, @"completionQueue must not be empty.");
NSAssert(callOptions != nil, @"callOptions must not be empty.");
if (path.length == 0) return nil;
if (queue == nil) return nil;
if (callOptions == nil) return nil;
if (path.length == 0) return NULL;
if (queue == nil) return NULL;
if (callOptions == nil) return NULL;
__block BOOL isDisconnected = NO;
__block grpc_call *call = NULL;
dispatch_sync(_dispatchQueue, ^{
if (self->_disconnected) {
isDisconnected = YES;
} else {
NSAssert(self->_unmanagedChannel != NULL,
grpc_call *call = NULL;
@synchronized(self) {
NSAssert(_unmanagedChannel != NULL,
@"Channel should have valid unmanaged channel.");
if (self->_unmanagedChannel == NULL) return;
if (_unmanagedChannel == NULL) return NULL;
NSString *serverAuthority =
callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority;
NSTimeInterval timeout = callOptions.timeout;
NSAssert(timeout >= 0, @"Invalid timeout");
if (timeout < 0) return;
if (timeout < 0) return NULL;
grpc_slice host_slice = grpc_empty_slice();
if (serverAuthority) {
host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String);
@ -275,79 +240,21 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
? gpr_inf_future(GPR_CLOCK_REALTIME)
: gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN));
call = grpc_channel_create_call(self->_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS,
call = grpc_channel_create_call(_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS,
queue.unmanagedQueue, path_slice,
serverAuthority ? &host_slice : NULL, deadline_ms, NULL);
if (serverAuthority) {
grpc_slice_unref(host_slice);
}
grpc_slice_unref(path_slice);
NSAssert(call != nil, @"Unable to create call.");
if (call == NULL) {
NSLog(@"Unable to create call.");
} else {
// Ref the channel;
[self ref];
}
}
});
if (disconnected != nil) {
*disconnected = isDisconnected;
}
return call;
}
// This function should be called on _dispatchQueue.
- (void)ref {
_refcount++;
if (_refcount == 1 && _lastDispatch != nil) {
_lastDispatch = nil;
}
}
- (void)unref {
dispatch_async(_dispatchQueue, ^{
NSAssert(self->_refcount > 0, @"Illegal reference count.");
if (self->_refcount == 0) {
NSLog(@"Illegal reference count.");
return;
}
self->_refcount--;
if (self->_refcount == 0 && !self->_disconnected) {
// Start timer.
dispatch_time_t delay =
dispatch_time(DISPATCH_TIME_NOW, (int64_t)self->_destroyDelay * NSEC_PER_SEC);
NSDate *now = [NSDate date];
self->_lastDispatch = now;
dispatch_after(delay, self->_dispatchQueue, ^{
// Timed disconnection.
if (!self->_disconnected && self->_lastDispatch == now) {
grpc_channel_destroy(self->_unmanagedChannel);
self->_unmanagedChannel = NULL;
self->_disconnected = YES;
}
});
}
});
}
- (void)disconnect {
dispatch_async(_dispatchQueue, ^{
if (!self->_disconnected) {
grpc_channel_destroy(self->_unmanagedChannel);
self->_unmanagedChannel = nil;
self->_disconnected = YES;
}
});
}
- (BOOL)disconnected {
__block BOOL disconnected;
dispatch_sync(_dispatchQueue, ^{
disconnected = self->_disconnected;
});
return disconnected;
}
- (void)dealloc {
if (_unmanagedChannel) {
grpc_channel_destroy(_unmanagedChannel);

@ -27,7 +27,53 @@
NS_ASSUME_NONNULL_BEGIN
@protocol GRPCChannel;
@class GRPCChannel;
@class GRPCChannelPool;
@class GRPCCompletionQueue;
@class GRPCChannelConfiguration;
/**
* Channel proxy that can be retained and automatically reestablish connection when the channel is
* disconnected.
*/
@interface GRPCPooledChannel : NSObject
/**
* Initialize with an actual channel object \a channel and a reference to the channel pool.
*/
- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
channelPool:(GRPCChannelPool *)channelPool;
/**
* Create a grpc core call object (grpc_call) from this channel. If channel is disconnected, get a
* new channel object from the channel pool.
*/
- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions;
/**
* Return ownership and destroy the grpc_call object created by
* \a unmanagedCallWithPath:completionQueue:callOptions: and decrease channel refcount. If refcount
* of the channel becomes 0, return the channel object to channel pool.
*/
- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall;
/**
* Force the channel to disconnect immediately.
*/
- (void)disconnect;
// The following methods and properties are for test only
/**
* Return the pointer to the real channel wrapped by the proxy.
*/
@property(atomic, readonly) GRPCChannel *wrappedChannel;
@end
/**
* Manage the pool of connected channels. When a channel is no longer referenced by any call,
@ -36,37 +82,41 @@ NS_ASSUME_NONNULL_BEGIN
@interface GRPCChannelPool : NSObject
/**
* Get the singleton instance
* Get the global channel pool.
*/
+ (nullable instancetype)sharedInstance;
/**
* Return a channel with a particular configuration. If the channel does not exist, execute \a
* createChannel then add it in the pool. If the channel exists, increase its reference count.
* Return a channel with a particular configuration. The channel may be a cached channel.
*/
- (GRPCChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions;
- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions;
/**
* This method is deprecated.
*
* Destroy all open channels and close their connections.
*/
+ (void)closeOpenConnections;
- (void)closeOpenConnections;
// Test-only methods below
/**
* Return a channel with a special destroy delay. If \a destroyDelay is 0, use the default destroy
* delay.
* Get an instance of pool isolated from the global shared pool. This method is for test only.
* Global pool should be used in production.
*/
- (nullable instancetype)init;
/**
* Simulate a network transition event and destroy all channels. This method is for internal and
* test only.
*/
- (GRPCChannel *)channelWithHost:(NSString *)host
callOptions:(GRPCCallOptions *)callOptions
destroyDelay:(NSTimeInterval)destroyDelay;
- (void)disconnectAllChannels;
/**
* Simulate a network transition event and destroy all channels.
* Set the destroy delay of channels. A channel should be destroyed if it stayed idle (no active
* call on it) for this period of time. This property is for test only.
*/
- (void)destroyAllChannels;
@property(atomic) NSTimeInterval destroyDelay;
@end

@ -37,8 +37,150 @@ extern const char *kCFStreamVarName;
static GRPCChannelPool *gChannelPool;
static dispatch_once_t gInitChannelPool;
/** When all calls of a channel are destroyed, destroy the channel after this much seconds. */
static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
@interface GRPCChannelPool()
- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration;
- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration;
@end
@implementation GRPCPooledChannel {
__weak GRPCChannelPool *_channelPool;
GRPCChannelConfiguration *_channelConfiguration;
NSMutableSet *_unmanagedCalls;
}
@synthesize wrappedChannel = _wrappedChannel;
- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
channelPool:(GRPCChannelPool *)channelPool {
NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
NSAssert(channelPool != nil, @"channelPool cannot be empty.");
if (channelPool == nil || channelConfiguration == nil) {
return nil;
}
if ((self = [super init])) {
_channelPool = channelPool;
_channelConfiguration = channelConfiguration;
_unmanagedCalls = [NSMutableSet set];
}
return self;
}
- (grpc_call *)unmanagedCallWithPath:(NSString *)path
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions {
NSAssert(path.length > 0, @"path must not be empty.");
NSAssert(queue != nil, @"completionQueue must not be empty.");
NSAssert(callOptions, @"callOptions must not be empty.");
if (path.length == 0 || queue == nil || callOptions == nil) return NULL;
grpc_call *call = NULL;
@synchronized(self) {
if (_wrappedChannel == nil) {
__strong GRPCChannelPool *strongPool = _channelPool;
if (strongPool) {
_wrappedChannel = [strongPool refChannelWithConfiguration:_channelConfiguration];
}
NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy.");
}
call = [_wrappedChannel unmanagedCallWithPath:path completionQueue:queue callOptions:callOptions];
if (call != NULL) {
[_unmanagedCalls addObject:[NSValue valueWithPointer:call]];
}
}
return call;
}
- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall {
if (unmanagedCall == nil) return;
grpc_call_unref(unmanagedCall);
BOOL timedDestroy = NO;
@synchronized(self) {
if ([_unmanagedCalls containsObject:[NSValue valueWithPointer:unmanagedCall]]) {
[_unmanagedCalls removeObject:[NSValue valueWithPointer:unmanagedCall]];
if ([_unmanagedCalls count] == 0) {
timedDestroy = YES;
}
}
}
if (timedDestroy) {
[self timedDestroy];
}
}
- (void)disconnect {
@synchronized(self) {
_wrappedChannel = nil;
[_unmanagedCalls removeAllObjects];
}
}
- (GRPCChannel *)wrappedChannel {
GRPCChannel *channel = nil;
@synchronized (self) {
channel = _wrappedChannel;
}
return channel;
}
- (void)timedDestroy {
__strong GRPCChannelPool *pool = nil;
@synchronized(self) {
// Check if we still want to destroy the channel.
if ([_unmanagedCalls count] == 0) {
pool = _channelPool;
_wrappedChannel = nil;
}
}
[pool unrefChannelWithConfiguration:_channelConfiguration];
}
@end
/**
* A convenience value type for cached channel.
*/
@interface GRPCChannelRecord : NSObject <NSCopying>
/** Pointer to the raw channel. May be nil when the channel has been destroyed. */
@property GRPCChannel *channel;
/** Channel proxy corresponding to this channel configuration. */
@property GRPCPooledChannel *proxy;
/** Last time when a timed destroy is initiated on the channel. */
@property NSDate *timedDestroyDate;
/** Reference count of the proxy to the channel. */
@property NSUInteger refcount;
@end
@implementation GRPCChannelRecord
- (id)copyWithZone:(NSZone *)zone {
GRPCChannelRecord *newRecord = [[GRPCChannelRecord allocWithZone:zone] init];
newRecord.channel = _channel;
newRecord.proxy = _proxy;
newRecord.timedDestroyDate = _timedDestroyDate;
newRecord.refcount = _refcount;
return newRecord;
}
@end
@implementation GRPCChannelPool {
NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannel *> *_channelPool;
NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelRecord *> *_channelPool;
dispatch_queue_t _dispatchQueue;
}
+ (instancetype)sharedInstance {
@ -52,6 +194,18 @@ static dispatch_once_t gInitChannelPool;
- (instancetype)init {
if ((self = [super init])) {
_channelPool = [NSMutableDictionary dictionary];
#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
if (@available(iOS 8.0, macOS 10.10, *)) {
_dispatchQueue = dispatch_queue_create(
NULL,
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
} else {
#else
{
#endif
_dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
}
_destroyDelay = kDefaultChannelDestroyDelay;
// Connectivity monitor is not required for CFStream
char *enableCFStream = getenv(kCFStreamVarName);
@ -62,51 +216,106 @@ static dispatch_once_t gInitChannelPool;
return self;
}
- (GRPCChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
return [self channelWithHost:host callOptions:callOptions destroyDelay:0];
}
- (GRPCChannel *)channelWithHost:(NSString *)host
callOptions:(GRPCCallOptions *)callOptions
destroyDelay:(NSTimeInterval)destroyDelay {
- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
NSAssert(host.length > 0, @"Host must not be empty.");
NSAssert(callOptions != nil, @"callOptions must not be empty.");
if (host.length == 0) return nil;
if (callOptions == nil) return nil;
GRPCChannel *channel;
GRPCPooledChannel *channelProxy = nil;
GRPCChannelConfiguration *configuration =
[[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions];
@synchronized(self) {
channel = _channelPool[configuration];
if (channel == nil || channel.disconnected) {
if (destroyDelay == 0) {
channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration];
GRPCChannelRecord *record = _channelPool[configuration];
if (record == nil) {
record = [[GRPCChannelRecord alloc] init];
record.proxy = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration
channelPool:self];
record.timedDestroyDate = nil;
_channelPool[configuration] = record;
channelProxy = record.proxy;
} else {
channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration
destroyDelay:destroyDelay];
channelProxy = record.proxy;
}
_channelPool[configuration] = channel;
}
return channelProxy;
}
return channel;
- (void)closeOpenConnections {
[self disconnectAllChannels];
}
+ (void)closeOpenConnections {
[[GRPCChannelPool sharedInstance] destroyAllChannels];
- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
GRPCChannel *ret = nil;
@synchronized (self) {
NSAssert(configuration != nil, @"configuration cannot be empty.");
if (configuration == nil) return nil;
GRPCChannelRecord *record = _channelPool[configuration];
NSAssert(record != nil, @"No record corresponding to a proxy.");
if (record == nil) return nil;
if (record.channel == nil) {
// Channel is already destroyed;
record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration];
record.timedDestroyDate = nil;
record.refcount = 1;
ret = record.channel;
} else {
ret = record.channel;
record.timedDestroyDate = nil;
record.refcount++;
}
}
return ret;
}
- (void)destroyAllChannels {
- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
@synchronized (self) {
for (id key in _channelPool) {
[_channelPool[key] disconnect];
GRPCChannelRecord *record = _channelPool[configuration];
NSAssert(record != nil, @"No record corresponding to a proxy.");
if (record == nil) return;
NSAssert(record.refcount > 0, @"Inconsistent channel refcount.");
if (record.refcount > 0) {
record.refcount--;
if (record.refcount == 0) {
NSDate *now = [NSDate date];
record.timedDestroyDate = now;
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)),
_dispatchQueue,
^{
@synchronized (self) {
if (now == record.timedDestroyDate) {
// Destroy the raw channel and reset related records.
record.timedDestroyDate = nil;
record.refcount = 0;
record.channel = nil;
}
_channelPool = [NSMutableDictionary dictionary];
}
});
}
}
}
}
- (void)disconnectAllChannels {
NSMutableSet<GRPCPooledChannel *> *proxySet = [NSMutableSet set];
@synchronized (self) {
[_channelPool enumerateKeysAndObjectsUsingBlock:^(GRPCChannelConfiguration * _Nonnull key, GRPCChannelRecord * _Nonnull obj, BOOL * _Nonnull stop) {
obj.channel = nil;
obj.timedDestroyDate = nil;
obj.refcount = 0;
[proxySet addObject:obj.proxy];
}];
}
// Disconnect proxies
[proxySet enumerateObjectsUsingBlock:^(GRPCPooledChannel * _Nonnull obj, BOOL * _Nonnull stop) {
[obj disconnect];
}];
}
- (void)connectivityChange:(NSNotification *)note {
[self destroyAllChannels];
[self disconnectAllChannels];
}
- (void)dealloc {

@ -238,7 +238,7 @@
@implementation GRPCWrappedCall {
GRPCCompletionQueue *_queue;
GRPCChannel *_channel;
GRPCPooledChannel *_channel;
grpc_call *_call;
}
@ -257,8 +257,6 @@
// consuming too many threads and having contention of multiple calls in a single completion
// queue. Currently we use a singleton queue.
_queue = [GRPCCompletionQueue completionQueue];
BOOL disconnected = NO;
do {
_channel = [[GRPCChannelPool sharedInstance] channelWithHost:host callOptions:callOptions];
if (_channel == nil) {
NSAssert(_channel != nil, @"Failed to get a channel for the host.");
@ -267,11 +265,7 @@
}
_call = [_channel unmanagedCallWithPath:path
completionQueue:_queue
callOptions:callOptions
disconnected:&disconnected];
// Try create another channel if the current channel is disconnected (due to idleness or
// connectivity monitor disconnection).
} while (_call == NULL && disconnected);
callOptions:callOptions];
if (_call == nil) {
NSAssert(_channel != nil, @"Failed to get a channel for the host.");
NSLog(@"Failed to create a call.");
@ -326,10 +320,7 @@
}
- (void)dealloc {
if (_call) {
grpc_call_unref(_call);
}
[_channel unref];
[_channel unrefUnmanagedCall:_call];
_channel = nil;
}

@ -25,6 +25,8 @@
#define TEST_TIMEOUT 32
NSString *kDummyHost = @"dummy.host";
NSString *kDummyHost2 = @"dummy.host.2";
NSString *kDummyPath = @"/dummy/path";
@interface ChannelPoolTest : XCTestCase
@ -36,94 +38,156 @@ NSString *kDummyHost = @"dummy.host";
grpc_init();
}
- (void)testChannelPooling {
NSString *kDummyHost = @"dummy.host";
NSString *kDummyHost2 = @"dummy.host2";
- (void)testCreateChannelAndCall {
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost
callOptions:options];
XCTAssertNil(channel.wrappedChannel);
GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
grpc_call *call = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq callOptions:options];
XCTAssert(call != NULL);
XCTAssertNotNil(channel.wrappedChannel);
[channel unrefUnmanagedCall:call];
XCTAssertNil(channel.wrappedChannel);
}
GRPCMutableCallOptions *options1 = [[GRPCMutableCallOptions alloc] init];
- (void)testCacheChannel {
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
GRPCCallOptions *options1 = [[GRPCCallOptions alloc] init];
GRPCCallOptions *options2 = [options1 copy];
GRPCMutableCallOptions *options3 = [options2 mutableCopy];
GRPCMutableCallOptions *options3 = [options1 mutableCopy];
options3.transportType = GRPCTransportTypeInsecure;
GRPCChannelPool *pool = [GRPCChannelPool sharedInstance];
GRPCChannel *channel1 = [pool channelWithHost:kDummyHost callOptions:options1];
GRPCChannel *channel2 = [pool channelWithHost:kDummyHost callOptions:options2];
GRPCChannel *channel3 = [pool channelWithHost:kDummyHost2 callOptions:options1];
GRPCChannel *channel4 = [pool channelWithHost:kDummyHost callOptions:options3];
XCTAssertEqual(channel1, channel2);
XCTAssertNotEqual(channel1, channel3);
XCTAssertNotEqual(channel1, channel4);
XCTAssertNotEqual(channel3, channel4);
GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
GRPCPooledChannel *channel1 = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost
callOptions:options1];
grpc_call *call1 = [channel1 unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options1];
GRPCPooledChannel *channel2 = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost
callOptions:options2];
grpc_call *call2 = [channel2 unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options2];
GRPCPooledChannel *channel3 = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost
callOptions:options3];
grpc_call *call3 = [channel3 unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options3];
GRPCPooledChannel *channel4 = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost2
callOptions:options1];
grpc_call *call4 = [channel4 unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options1];
XCTAssertEqual(channel1.wrappedChannel, channel2.wrappedChannel);
XCTAssertNotEqual(channel1.wrappedChannel, channel3.wrappedChannel);
XCTAssertNotEqual(channel1.wrappedChannel, channel4.wrappedChannel);
XCTAssertNotEqual(channel3.wrappedChannel, channel4.wrappedChannel);
[channel1 unrefUnmanagedCall:call1];
[channel2 unrefUnmanagedCall:call2];
[channel3 unrefUnmanagedCall:call3];
[channel4 unrefUnmanagedCall:call4];
}
- (void)testDestroyAllChannels {
NSString *kDummyHost = @"dummy.host";
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
GRPCChannelPool *pool = [GRPCChannelPool sharedInstance];
GRPCChannel *channel = [pool channelWithHost:kDummyHost callOptions:options];
grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path"
completionQueue:[GRPCCompletionQueue completionQueue]
callOptions:options
disconnected:nil];
[pool destroyAllChannels];
XCTAssertTrue(channel.disconnected);
GRPCChannel *channel2 = [pool channelWithHost:kDummyHost callOptions:options];
XCTAssertNotEqual(channel, channel2);
grpc_call_unref(call);
}
- (void)testGetChannelBeforeChannelTimedDisconnection {
NSString *kDummyHost = @"dummy.host";
const NSTimeInterval kDestroyDelay = 1;
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
GRPCChannelPool *pool = [GRPCChannelPool sharedInstance];
GRPCChannel *channel =
[pool channelWithHost:kDummyHost callOptions:options destroyDelay:kDestroyDelay];
grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path"
completionQueue:[GRPCCompletionQueue completionQueue]
callOptions:options
disconnected:nil];
grpc_call_unref(call);
[channel unref];
// Test that we can still get the channel at this time
GRPCChannel *channel2 =
[pool channelWithHost:kDummyHost callOptions:options destroyDelay:kDestroyDelay];
XCTAssertEqual(channel, channel2);
call = [channel2 unmanagedCallWithPath:@"dummy.path"
completionQueue:[GRPCCompletionQueue completionQueue]
callOptions:options
disconnected:nil];
// Test that after the destroy delay, the channel is still alive
- (void)testTimedDestroyChannel {
const NSTimeInterval kDestroyDelay = 1.0;
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
pool.destroyDelay = kDestroyDelay;
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost
callOptions:options];
GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
grpc_call *call = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq callOptions:options];
GRPCChannel *wrappedChannel = channel.wrappedChannel;
[channel unrefUnmanagedCall:call];
// Confirm channel is not destroyed at this time
call = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options];
XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
[channel unrefUnmanagedCall:call];
sleep(kDestroyDelay + 1);
XCTAssertFalse(channel.disconnected);
// Confirm channel is new at this time
call = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options];
XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel);
// Confirm the new channel can create call
call = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options];
XCTAssert(call != NULL);
[channel unrefUnmanagedCall:call];
}
- (void)testGetChannelAfterChannelTimedDisconnection {
NSString *kDummyHost = @"dummy.host";
const NSTimeInterval kDestroyDelay = 1;
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
GRPCChannelPool *pool = [GRPCChannelPool sharedInstance];
GRPCChannel *channel =
[pool channelWithHost:kDummyHost callOptions:options destroyDelay:kDestroyDelay];
grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path"
completionQueue:[GRPCCompletionQueue completionQueue]
callOptions:options
disconnected:nil];
grpc_call_unref(call);
[channel unref];
sleep(kDestroyDelay + 1);
- (void)testPoolDisconnection {
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost
callOptions:options];
GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
grpc_call *call = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options];
XCTAssertNotNil(channel.wrappedChannel);
GRPCChannel *wrappedChannel = channel.wrappedChannel;
// Test a new channel is created by requesting a channel from pool
[pool disconnectAllChannels];
channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost
callOptions:options];
call = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options];
XCTAssertNotNil(channel.wrappedChannel);
XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel);
wrappedChannel = channel.wrappedChannel;
// Test a new channel is created by requesting a new call from the previous proxy
[pool disconnectAllChannels];
grpc_call *call2 = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options];
XCTAssertNotNil(channel.wrappedChannel);
XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel);
[channel unrefUnmanagedCall:call];
[channel unrefUnmanagedCall:call2];
}
// Test that we get new channel to the same host and with the same callOptions
GRPCChannel *channel2 =
[pool channelWithHost:kDummyHost callOptions:options destroyDelay:kDestroyDelay];
XCTAssertNotEqual(channel, channel2);
- (void)testUnrefCallFromStaleChannel {
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost
callOptions:options];
GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
grpc_call *call = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options];
[pool disconnectAllChannels];
channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost
callOptions:options];
grpc_call *call2 = [channel unmanagedCallWithPath:kDummyPath
completionQueue:cq
callOptions:options];
// Test unref the call of a stale channel will not cause the current channel going into timed
// destroy state
XCTAssertNotNil(channel.wrappedChannel);
GRPCChannel *wrappedChannel = channel.wrappedChannel;
[channel unrefUnmanagedCall:call];
XCTAssertNotNil(channel.wrappedChannel);
XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
// Test unref the call of the current channel will cause the channel going into timed destroy
// state
[channel unrefUnmanagedCall:call2];
XCTAssertNil(channel.wrappedChannel);
}
@end

@ -20,65 +20,93 @@
#import "../../GRPCClient/GRPCCallOptions.h"
#import "../../GRPCClient/private/GRPCChannel.h"
#import "../../GRPCClient/private/GRPCChannelPool.h"
#import "../../GRPCClient/private/GRPCCompletionQueue.h"
@interface ChannelTests : XCTestCase
/*
#define TEST_TIMEOUT 8
@interface GRPCChannelFake : NSObject
- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation
unrefExpectation:(XCTestExpectation *)unrefExpectation;
- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions;
- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall;
@end
@implementation ChannelTests
@implementation GRPCChannelFake {
__weak XCTestExpectation *_createExpectation;
__weak XCTestExpectation *_unrefExpectation;
long _grpcCallCounter;
}
+ (void)setUp {
grpc_init();
- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration {
return nil;
}
- (void)testTimedDisconnection {
NSString *const kHost = @"grpc-test.sandbox.googleapis.com";
const NSTimeInterval kDestroyDelay = 1;
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCChannelConfiguration *configuration =
[[GRPCChannelConfiguration alloc] initWithHost:kHost callOptions:options];
GRPCChannel *channel =
[[GRPCChannel alloc] initWithChannelConfiguration:configuration destroyDelay:kDestroyDelay];
BOOL disconnected;
grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path"
completionQueue:[GRPCCompletionQueue completionQueue]
callOptions:options
disconnected:&disconnected];
XCTAssertFalse(disconnected);
grpc_call_unref(call);
[channel unref];
XCTAssertFalse(channel.disconnected, @"Channel is pre-maturely disconnected.");
sleep(kDestroyDelay + 1);
XCTAssertTrue(channel.disconnected, @"Channel is not disconnected after delay.");
// Check another call creation returns null and indicates disconnected.
call = [channel unmanagedCallWithPath:@"dummy.path"
completionQueue:[GRPCCompletionQueue completionQueue]
callOptions:options
disconnected:&disconnected];
XCTAssert(call == NULL);
XCTAssertTrue(disconnected);
- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation
unrefExpectation:(XCTestExpectation *)unrefExpectation {
if ((self = [super init])) {
_createExpectation = createExpectation;
_unrefExpectation = unrefExpectation;
_grpcCallCounter = 0;
}
return self;
}
- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions {
if (_createExpectation) [_createExpectation fulfill];
return (grpc_call *)(++_grpcCallCounter);
}
- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall {
if (_unrefExpectation) [_unrefExpectation fulfill];
}
@end
@interface GRPCChannelPoolFake : NSObject
- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation;
- (GRPCChannel *)rawChannelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions;
- (void)testForceDisconnection {
NSString *const kHost = @"grpc-test.sandbox.googleapis.com";
const NSTimeInterval kDestroyDelay = 1;
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCChannelConfiguration *configuration =
[[GRPCChannelConfiguration alloc] initWithHost:kHost callOptions:options];
GRPCChannel *channel =
[[GRPCChannel alloc] initWithChannelConfiguration:configuration destroyDelay:kDestroyDelay];
grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path"
completionQueue:[GRPCCompletionQueue completionQueue]
callOptions:options
disconnected:nil];
grpc_call_unref(call);
[channel disconnect];
XCTAssertTrue(channel.disconnected, @"Channel is not disconnected.");
// Test calling another unref here will not crash
[channel unref];
- (void)delayedDestroyChannel;
@end
@implementation GRPCChannelPoolFake {
__weak XCTestExpectation *_delayedDestroyExpectation;
}
- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation {
if ((self = [super init])) {
_delayedDestroyExpectation = delayedDestroyExpectation;
}
return self;
}
- (void)delayedDestroyChannel {
if (_delayedDestroyExpectation) [_delayedDestroyExpectation fulfill];
}
@end */
@interface ChannelTests : XCTestCase
@end
@implementation ChannelTests
+ (void)setUp {
grpc_init();
}
@end

Loading…
Cancel
Save