Multiple fixes

pull/16190/head
Muxi Yan 6 years ago
parent 03c73e92f1
commit 5ae61f5a5a
  1. 2
      src/objective-c/GRPCClient/GRPCCall+ChannelArg.m
  2. 53
      src/objective-c/GRPCClient/GRPCCall.m
  3. 21
      src/objective-c/GRPCClient/private/ChannelArgsUtil.m
  4. 6
      src/objective-c/GRPCClient/private/GRPCChannel.h
  5. 66
      src/objective-c/GRPCClient/private/GRPCChannel.m
  6. 49
      src/objective-c/GRPCClient/private/GRPCChannelPool.h
  7. 145
      src/objective-c/GRPCClient/private/GRPCChannelPool.m
  8. 2
      src/objective-c/GRPCClient/private/GRPCWrappedCall.m
  9. 38
      src/objective-c/ProtoRPC/ProtoRPC.m
  10. 40
      src/objective-c/tests/ChannelTests/ChannelPoolTest.m
  11. 4
      src/objective-c/tests/ChannelTests/ChannelTests.m
  12. 7
      src/objective-c/tests/GRPCClientTests.m

@ -36,7 +36,7 @@
}
+ (void)closeOpenConnections {
[[GRPCChannelPool sharedInstance] closeOpenConnections];
[[GRPCChannelPool sharedInstance] disconnectAllChannels];
}
+ (void)setDefaultCompressMethod:(GRPCCompressAlgorithm)algorithm forhost:(nonnull NSString *)host {

@ -69,11 +69,8 @@ const char *kCFStreamVarName = "grpc_cfstream";
- (instancetype)initWithHost:(NSString *)host path:(NSString *)path safety:(GRPCCallSafety)safety {
NSAssert(host.length != 0 && path.length != 0, @"Host and Path cannot be empty");
if (host.length == 0) {
host = [NSString string];
}
if (path.length == 0) {
path = [NSString string];
if (host.length == 0 || path.length == 0) {
return nil;
}
if ((self = [super init])) {
_host = [host copy];
@ -173,7 +170,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)start {
GRPCCall *call = nil;
GRPCCall *copiedCall = nil;
@synchronized(self) {
NSAssert(!_started, @"Call already started.");
NSAssert(!_canceled, @"Call already canceled.");
@ -197,7 +194,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
if (_callOptions.initialMetadata) {
[_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
}
call = _call;
copiedCall = _call;
}
void (^valueHandler)(id value) = ^(id value) {
@ -235,11 +232,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
};
id<GRXWriteable> responseWriteable =
[[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
[call startWithWriteable:responseWriteable];
[copiedCall startWithWriteable:responseWriteable];
}
- (void)cancel {
GRPCCall *call = nil;
GRPCCall *copiedCall = nil;
@synchronized(self) {
if (_canceled) {
return;
@ -247,7 +244,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
_canceled = YES;
call = _call;
copiedCall = _call;
_call = nil;
_pipe = nil;
@ -268,13 +265,15 @@ const char *kCFStreamVarName = "grpc_cfstream";
@"Canceled by app"
}]];
});
} else {
_handler = nil;
}
}
[call cancel];
[copiedCall cancel];
}
- (void)writeData:(NSData *)data {
GRXBufferedPipe *pipe = nil;
GRXBufferedPipe *copiedPipe = nil;
@synchronized(self) {
NSAssert(!_canceled, @"Call arleady canceled.");
NSAssert(!_finished, @"Call is half-closed before sending data.");
@ -286,14 +285,14 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
if (_pipe) {
pipe = _pipe;
copiedPipe = _pipe;
}
}
[pipe writeValue:data];
[copiedPipe writeValue:data];
}
- (void)finish {
GRXBufferedPipe *pipe = nil;
GRXBufferedPipe *copiedPipe = nil;
@synchronized(self) {
NSAssert(_started, @"Call not started.");
NSAssert(!_canceled, @"Call arleady canceled.");
@ -309,12 +308,12 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
if (_pipe) {
pipe = _pipe;
copiedPipe = _pipe;
_pipe = nil;
}
_finished = YES;
}
[pipe writesFinishedWithError:nil];
[copiedPipe writesFinishedWithError:nil];
}
- (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
@ -322,11 +321,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
if (initialMetadata != nil &&
[_handler respondsToSelector:@selector(receivedInitialMetadata:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCResponseHandler> handler = nil;
id<GRPCResponseHandler> copiedHandler = nil;
@synchronized(self) {
handler = self->_handler;
copiedHandler = self->_handler;
}
[handler receivedInitialMetadata:initialMetadata];
[copiedHandler receivedInitialMetadata:initialMetadata];
});
}
}
@ -336,11 +335,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
@synchronized(self) {
if (message != nil && [_handler respondsToSelector:@selector(receivedRawMessage:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCResponseHandler> handler = nil;
id<GRPCResponseHandler> copiedHandler = nil;
@synchronized(self) {
handler = self->_handler;
copiedHandler = self->_handler;
}
[handler receivedRawMessage:message];
[copiedHandler receivedRawMessage:message];
});
}
}
@ -350,14 +349,16 @@ const char *kCFStreamVarName = "grpc_cfstream";
@synchronized(self) {
if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCResponseHandler> handler = nil;
id<GRPCResponseHandler> copiedHandler = nil;
@synchronized(self) {
handler = self->_handler;
copiedHandler = self->_handler;
// Clean up _handler so that no more responses are reported to the handler.
self->_handler = nil;
}
[handler closedWithTrailingMetadata:trailingMetadata error:error];
[copiedHandler closedWithTrailingMetadata:trailingMetadata error:error];
});
} else {
_handler = nil;
}
}
}

@ -60,36 +60,35 @@ grpc_channel_args *GRPCBuildChannelArgs(NSDictionary *dictionary) {
NSUInteger argCount = [keys count];
grpc_channel_args *channelArgs = gpr_malloc(sizeof(grpc_channel_args));
channelArgs->num_args = argCount;
channelArgs->args = gpr_malloc(argCount * sizeof(grpc_arg));
// TODO(kriswuollett) Check that keys adhere to GRPC core library requirements
NSUInteger j = 0;
for (NSUInteger i = 0; i < argCount; ++i) {
grpc_arg *arg = &channelArgs->args[i];
grpc_arg *arg = &channelArgs->args[j];
arg->key = gpr_strdup([keys[i] UTF8String]);
id value = dictionary[keys[i]];
if ([value respondsToSelector:@selector(UTF8String)]) {
arg->type = GRPC_ARG_STRING;
arg->value.string = gpr_strdup([value UTF8String]);
j++;
} else if ([value respondsToSelector:@selector(intValue)]) {
if ([value compare:[NSNumber numberWithInteger:INT_MAX]] == NSOrderedDescending ||
[value compare:[NSNumber numberWithInteger:INT_MIN]] == NSOrderedAscending) {
[NSException raise:NSInvalidArgumentException
format:@"Out of range for a value-typed channel argument: %@", value];
int64_t value64 = [value longLongValue];
if (value64 <= INT_MAX || value64 >= INT_MIN) {
arg->type = GRPC_ARG_INTEGER;
arg->value.integer = [value intValue];
j++;
}
arg->type = GRPC_ARG_INTEGER;
arg->value.integer = [value intValue];
} else if (value != nil) {
arg->type = GRPC_ARG_POINTER;
arg->value.pointer.p = (__bridge_retained void *)value;
arg->value.pointer.vtable = &objc_arg_vtable;
} else {
[NSException raise:NSInvalidArgumentException
format:@"Invalid channel argument type: %@", [value class]];
j++;
}
}
channelArgs->num_args = j;
return channelArgs;
}

@ -32,6 +32,10 @@ NS_ASSUME_NONNULL_BEGIN
/** Caching signature of a channel. */
@interface GRPCChannelConfiguration : NSObject<NSCopying>
- (instancetype)init NS_UNAVAILABLE;
+ (instancetype) new NS_UNAVAILABLE;
/** The host that this channel is connected to. */
@property(copy, readonly) NSString *host;
@ -47,7 +51,7 @@ NS_ASSUME_NONNULL_BEGIN
/** Acquire the dictionary of channel args with current configurations. */
@property(copy, readonly) NSDictionary *channelArgs;
- (nullable instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions;
- (nullable instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions NS_DESIGNATED_INITIALIZER;
@end

@ -39,8 +39,9 @@
- (instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
NSAssert(host.length > 0, @"Host must not be empty.");
NSAssert(callOptions != nil, @"callOptions must not be empty.");
if (host.length == 0) return nil;
if (callOptions == nil) return nil;
if (host.length == 0 || callOptions == nil) {
return nil;
}
if ((self = [super init])) {
_host = [host copy];
@ -180,7 +181,9 @@
- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration {
NSAssert(channelConfiguration != nil, @"channelConfiguration must not be empty.");
if (channelConfiguration == nil) return nil;
if (channelConfiguration == nil) {
return nil;
}
if ((self = [super init])) {
_configuration = [channelConfiguration copy];
@ -218,35 +221,34 @@
if (callOptions == nil) return NULL;
grpc_call *call = NULL;
@synchronized(self) {
NSAssert(_unmanagedChannel != NULL, @"Channel should have valid unmanaged channel.");
if (_unmanagedChannel == NULL) return NULL;
NSString *serverAuthority =
callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority;
NSTimeInterval timeout = callOptions.timeout;
NSAssert(timeout >= 0, @"Invalid timeout");
if (timeout < 0) return NULL;
grpc_slice host_slice = grpc_empty_slice();
if (serverAuthority) {
host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String);
}
grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String);
gpr_timespec deadline_ms =
timeout == 0 ? gpr_inf_future(GPR_CLOCK_REALTIME)
: gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN));
call = grpc_channel_create_call(_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS,
queue.unmanagedQueue, path_slice,
serverAuthority ? &host_slice : NULL, deadline_ms, NULL);
if (serverAuthority) {
grpc_slice_unref(host_slice);
}
grpc_slice_unref(path_slice);
NSAssert(call != nil, @"Unable to create call.");
if (call == NULL) {
NSLog(@"Unable to create call.");
}
// No need to lock here since _unmanagedChannel is only changed in _dealloc
NSAssert(_unmanagedChannel != NULL, @"Channel should have valid unmanaged channel.");
if (_unmanagedChannel == NULL) return NULL;
NSString *serverAuthority =
callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority;
NSTimeInterval timeout = callOptions.timeout;
NSAssert(timeout >= 0, @"Invalid timeout");
if (timeout < 0) return NULL;
grpc_slice host_slice = grpc_empty_slice();
if (serverAuthority) {
host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String);
}
grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String);
gpr_timespec deadline_ms =
timeout == 0 ? gpr_inf_future(GPR_CLOCK_REALTIME)
: gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN));
call = grpc_channel_create_call(_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS,
queue.unmanagedQueue, path_slice,
serverAuthority ? &host_slice : NULL, deadline_ms, NULL);
if (serverAuthority) {
grpc_slice_unref(host_slice);
}
grpc_slice_unref(path_slice);
NSAssert(call != nil, @"Unable to create call.");
if (call == NULL) {
NSLog(@"Unable to create call.");
}
return call;
}

@ -39,12 +39,16 @@ NS_ASSUME_NONNULL_BEGIN
*/
@interface GRPCPooledChannel : NSObject
- (nullable instancetype)init NS_UNAVAILABLE;
+ (nullable instancetype)new NS_UNAVAILABLE;
/**
* Initialize with an actual channel object \a channel and a reference to the channel pool.
*/
- (nullable instancetype)initWithChannelConfiguration:
(GRPCChannelConfiguration *)channelConfiguration
channelPool:(GRPCChannelPool *)channelPool;
channelPool:(GRPCChannelPool *)channelPool NS_DESIGNATED_INITIALIZER;
/**
* Create a grpc core call object (grpc_call) from this channel. If channel is disconnected, get a
@ -59,17 +63,21 @@ NS_ASSUME_NONNULL_BEGIN
* \a unmanagedCallWithPath:completionQueue:callOptions: and decrease channel refcount. If refcount
* of the channel becomes 0, return the channel object to channel pool.
*/
- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall;
- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall;
/**
* Force the channel to disconnect immediately.
* Force the channel to disconnect immediately. Subsequent calls to unmanagedCallWithPath: will
* attempt to reconnect to the remote channel.
*/
- (void)disconnect;
// The following methods and properties are for test only
@end
/** Test-only interface for \a GRPCPooledChannel. */
@interface GRPCPooledChannel (Test)
/**
* Return the pointer to the real channel wrapped by the proxy.
* Return the pointer to the raw channel wrapped.
*/
@property(atomic, readonly) GRPCChannel *wrappedChannel;
@ -81,6 +89,10 @@ NS_ASSUME_NONNULL_BEGIN
*/
@interface GRPCChannelPool : NSObject
- (nullable instancetype)init NS_UNAVAILABLE;
+ (nullable instancetype)new NS_UNAVAILABLE;
/**
* Get the global channel pool.
*/
@ -92,31 +104,20 @@ NS_ASSUME_NONNULL_BEGIN
- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions;
/**
* This method is deprecated.
*
* Destroy all open channels and close their connections.
* Disconnect all channels in this pool.
*/
- (void)closeOpenConnections;
// Test-only methods below
- (void)disconnectAllChannels;
/**
* Get an instance of pool isolated from the global shared pool. This method is for test only.
* Global pool should be used in production.
*/
- (nullable instancetype)init;
@end
/**
* Simulate a network transition event and destroy all channels. This method is for internal and
* test only.
*/
- (void)disconnectAllChannels;
/** Test-only interface for \a GRPCChannelPool. */
@interface GRPCChannelPool (Test)
/**
* Set the destroy delay of channels. A channel should be destroyed if it stayed idle (no active
* call on it) for this period of time. This property is for test only.
* Get an instance of pool isolated from the global shared pool with channels' destroy delay being
* \a destroyDelay.
*/
@property(atomic) NSTimeInterval destroyDelay;
- (nullable instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay;
@end

@ -52,10 +52,9 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
__weak GRPCChannelPool *_channelPool;
GRPCChannelConfiguration *_channelConfiguration;
NSMutableSet *_unmanagedCalls;
GRPCChannel *_wrappedChannel;
}
@synthesize wrappedChannel = _wrappedChannel;
- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
channelPool:(GRPCChannelPool *)channelPool {
NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
@ -68,11 +67,17 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
_channelPool = channelPool;
_channelConfiguration = channelConfiguration;
_unmanagedCalls = [NSMutableSet set];
_wrappedChannel = nil;
}
return self;
}
- (void)dealloc {
NSAssert([_unmanagedCalls count] == 0 && _wrappedChannel == nil, @"Pooled channel should only be"
"destroyed after the wrapped channel is destroyed");
}
- (grpc_call *)unmanagedCallWithPath:(NSString *)path
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions {
@ -99,31 +104,38 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
return call;
}
- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall {
if (unmanagedCall == nil) return;
- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall {
if (unmanagedCall == NULL) {
return;
}
grpc_call_unref(unmanagedCall);
BOOL timedDestroy = NO;
@synchronized(self) {
if ([_unmanagedCalls containsObject:[NSValue valueWithPointer:unmanagedCall]]) {
[_unmanagedCalls removeObject:[NSValue valueWithPointer:unmanagedCall]];
if ([_unmanagedCalls count] == 0) {
timedDestroy = YES;
}
NSValue *removedCall = [NSValue valueWithPointer:unmanagedCall];
[_unmanagedCalls removeObject:removedCall];
if ([_unmanagedCalls count] == 0) {
_wrappedChannel = nil;
GRPCChannelPool *strongPool = _channelPool;
[strongPool unrefChannelWithConfiguration:_channelConfiguration];
}
}
if (timedDestroy) {
[self timedDestroy];
}
}
- (void)disconnect {
@synchronized(self) {
_wrappedChannel = nil;
[_unmanagedCalls removeAllObjects];
if (_wrappedChannel != nil) {
_wrappedChannel = nil;
[_unmanagedCalls removeAllObjects];
GRPCChannelPool *strongPool = _channelPool;
[strongPool unrefChannelWithConfiguration:_channelConfiguration];
}
}
}
@end
@implementation GRPCPooledChannel (Test)
- (GRPCChannel *)wrappedChannel {
GRPCChannel *channel = nil;
@synchronized(self) {
@ -132,67 +144,52 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
return channel;
}
- (void)timedDestroy {
__strong GRPCChannelPool *pool = nil;
@synchronized(self) {
// Check if we still want to destroy the channel.
if ([_unmanagedCalls count] == 0) {
pool = _channelPool;
_wrappedChannel = nil;
}
}
[pool unrefChannelWithConfiguration:_channelConfiguration];
}
@end
/**
* A convenience value type for cached channel.
*/
@interface GRPCChannelRecord : NSObject<NSCopying>
@interface GRPCChannelRecord : NSObject
/** Pointer to the raw channel. May be nil when the channel has been destroyed. */
@property GRPCChannel *channel;
/** Channel proxy corresponding to this channel configuration. */
@property GRPCPooledChannel *proxy;
@property GRPCPooledChannel *pooledChannel;
/** Last time when a timed destroy is initiated on the channel. */
@property NSDate *timedDestroyDate;
/** Reference count of the proxy to the channel. */
@property NSUInteger refcount;
@property NSUInteger refCount;
@end
@implementation GRPCChannelRecord
- (id)copyWithZone:(NSZone *)zone {
GRPCChannelRecord *newRecord = [[GRPCChannelRecord allocWithZone:zone] init];
newRecord.channel = _channel;
newRecord.proxy = _proxy;
newRecord.timedDestroyDate = _timedDestroyDate;
newRecord.refcount = _refcount;
@end
return newRecord;
}
@interface GRPCChannelPool ()
- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay NS_DESIGNATED_INITIALIZER;
@end
@implementation GRPCChannelPool {
NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelRecord *> *_channelPool;
dispatch_queue_t _dispatchQueue;
NSTimeInterval _destroyDelay;
}
+ (instancetype)sharedInstance {
dispatch_once(&gInitChannelPool, ^{
gChannelPool = [[GRPCChannelPool alloc] init];
gChannelPool = [[GRPCChannelPool alloc] initInstanceWithDestroyDelay:kDefaultChannelDestroyDelay];
NSAssert(gChannelPool != nil, @"Cannot initialize global channel pool.");
});
return gChannelPool;
}
- (instancetype)init {
- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay {
if ((self = [super init])) {
_channelPool = [NSMutableDictionary dictionary];
#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
@ -206,7 +203,7 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
#endif
_dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
}
_destroyDelay = kDefaultChannelDestroyDelay;
_destroyDelay = destroyDelay;
// Connectivity monitor is not required for CFStream
char *enableCFStream = getenv(kCFStreamVarName);
@ -217,56 +214,56 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
return self;
}
- (void)dealloc {
[GRPCConnectivityMonitor unregisterObserver:self];
}
- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
NSAssert(host.length > 0, @"Host must not be empty.");
NSAssert(callOptions != nil, @"callOptions must not be empty.");
if (host.length == 0) return nil;
if (callOptions == nil) return nil;
if (host.length == 0 || callOptions == nil) {
return nil;
}
GRPCPooledChannel *channelProxy = nil;
GRPCPooledChannel *pooledChannel = nil;
GRPCChannelConfiguration *configuration =
[[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions];
@synchronized(self) {
GRPCChannelRecord *record = _channelPool[configuration];
if (record == nil) {
record = [[GRPCChannelRecord alloc] init];
record.proxy =
record.pooledChannel =
[[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration channelPool:self];
record.timedDestroyDate = nil;
_channelPool[configuration] = record;
channelProxy = record.proxy;
pooledChannel = record.pooledChannel;
} else {
channelProxy = record.proxy;
pooledChannel = record.pooledChannel;
}
}
return channelProxy;
}
- (void)closeOpenConnections {
[self disconnectAllChannels];
return pooledChannel;
}
- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
GRPCChannel *ret = nil;
@synchronized(self) {
NSAssert(configuration != nil, @"configuration cannot be empty.");
if (configuration == nil) return nil;
if (configuration == nil) {
return nil;
}
GRPCChannelRecord *record = _channelPool[configuration];
NSAssert(record != nil, @"No record corresponding to a proxy.");
if (record == nil) return nil;
if (record == nil) {
return nil;
}
record.refCount++;
record.timedDestroyDate = nil;
if (record.channel == nil) {
// Channel is already destroyed;
record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration];
record.timedDestroyDate = nil;
record.refcount = 1;
ret = record.channel;
} else {
ret = record.channel;
record.timedDestroyDate = nil;
record.refcount++;
}
ret = record.channel;
}
return ret;
}
@ -275,11 +272,13 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
@synchronized(self) {
GRPCChannelRecord *record = _channelPool[configuration];
NSAssert(record != nil, @"No record corresponding to a proxy.");
if (record == nil) return;
NSAssert(record.refcount > 0, @"Inconsistent channel refcount.");
if (record.refcount > 0) {
record.refcount--;
if (record.refcount == 0) {
if (record == nil) {
return;
}
NSAssert(record.refCount > 0, @"Inconsistent channel refcount.");
if (record.refCount > 0) {
record.refCount--;
if (record.refCount == 0) {
NSDate *now = [NSDate date];
record.timedDestroyDate = now;
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)),
@ -288,7 +287,6 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
if (now == record.timedDestroyDate) {
// Destroy the raw channel and reset related records.
record.timedDestroyDate = nil;
record.refcount = 0;
record.channel = nil;
}
}
@ -306,8 +304,7 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
GRPCChannelRecord *_Nonnull obj, BOOL *_Nonnull stop) {
obj.channel = nil;
obj.timedDestroyDate = nil;
obj.refcount = 0;
[proxySet addObject:obj.proxy];
[proxySet addObject:obj.pooledChannel];
}];
}
// Disconnect proxies
@ -320,8 +317,12 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
[self disconnectAllChannels];
}
- (void)dealloc {
[GRPCConnectivityMonitor unregisterObserver:self];
@end
@implementation GRPCChannelPool (Test)
- (instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay {
return [self initInstanceWithDestroyDelay:destroyDelay];
}
@end

@ -317,7 +317,7 @@
}
- (void)dealloc {
[_channel unrefUnmanagedCall:_call];
[_channel destroyUnmanagedCall:_call];
_channel = nil;
}

@ -133,18 +133,18 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
}
- (void)cancel {
GRPCCall2 *call;
GRPCCall2 *copiedCall;
@synchronized(self) {
call = _call;
copiedCall = _call;
_call = nil;
if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(_handler.dispatchQueue, ^{
id<GRPCProtoResponseHandler> handler = nil;
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
handler = self->_handler;
copiedHandler = self->_handler;
self->_handler = nil;
}
[handler closedWithTrailingMetadata:nil
[copiedHandler closedWithTrailingMetadata:nil
error:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
userInfo:@{
@ -152,9 +152,11 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
@"Canceled by app"
}]];
});
} else {
_handler = nil;
}
}
[call cancel];
[copiedCall cancel];
}
- (void)writeMessage:(GPBMessage *)message {
@ -182,11 +184,11 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
@synchronized(self) {
if (initialMetadata != nil && [_handler respondsToSelector:@selector(initialMetadata:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> handler = nil;
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
handler = self->_handler;
copiedHandler = self->_handler;
}
[handler receivedInitialMetadata:initialMetadata];
[copiedHandler receivedInitialMetadata:initialMetadata];
});
}
}
@ -200,21 +202,21 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
@synchronized(self) {
if (parsed && [_handler respondsToSelector:@selector(receivedProtoMessage:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> handler = nil;
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
handler = self->_handler;
copiedHandler = self->_handler;
}
[handler receivedProtoMessage:parsed];
[copiedHandler receivedProtoMessage:parsed];
});
} else if (!parsed &&
[_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> handler = nil;
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
handler = self->_handler;
copiedHandler = self->_handler;
self->_handler = nil;
}
[handler closedWithTrailingMetadata:nil
[copiedHandler closedWithTrailingMetadata:nil
error:ErrorForBadProto(message, _responseClass, error)];
});
[_call cancel];
@ -227,12 +229,12 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
@synchronized(self) {
if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(_dispatchQueue, ^{
id<GRPCProtoResponseHandler> handler = nil;
id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
handler = self->_handler;
copiedHandler = self->_handler;
self->_handler = nil;
}
[handler closedWithTrailingMetadata:trailingMetadata error:error];
[copiedHandler closedWithTrailingMetadata:trailingMetadata error:error];
});
}
_call = nil;

@ -28,6 +28,8 @@ NSString *kDummyHost = @"dummy.host";
NSString *kDummyHost2 = @"dummy.host.2";
NSString *kDummyPath = @"/dummy/path";
const NSTimeInterval kDestroyDelay = 1.0;
@interface ChannelPoolTest : XCTestCase
@end
@ -39,7 +41,7 @@ NSString *kDummyPath = @"/dummy/path";
}
- (void)testCreateChannelAndCall {
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel =
(GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
@ -49,12 +51,12 @@ NSString *kDummyPath = @"/dummy/path";
[channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssert(call != NULL);
XCTAssertNotNil(channel.wrappedChannel);
[channel unrefUnmanagedCall:call];
[channel destroyUnmanagedCall:call];
XCTAssertNil(channel.wrappedChannel);
}
- (void)testCacheChannel {
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options1 = [[GRPCCallOptions alloc] init];
GRPCCallOptions *options2 = [options1 copy];
GRPCMutableCallOptions *options3 = [options1 mutableCopy];
@ -80,17 +82,14 @@ NSString *kDummyPath = @"/dummy/path";
XCTAssertNotEqual(channel1.wrappedChannel, channel3.wrappedChannel);
XCTAssertNotEqual(channel1.wrappedChannel, channel4.wrappedChannel);
XCTAssertNotEqual(channel3.wrappedChannel, channel4.wrappedChannel);
[channel1 unrefUnmanagedCall:call1];
[channel2 unrefUnmanagedCall:call2];
[channel3 unrefUnmanagedCall:call3];
[channel4 unrefUnmanagedCall:call4];
[channel1 destroyUnmanagedCall:call1];
[channel2 destroyUnmanagedCall:call2];
[channel3 destroyUnmanagedCall:call3];
[channel4 destroyUnmanagedCall:call4];
}
- (void)testTimedDestroyChannel {
const NSTimeInterval kDestroyDelay = 1.0;
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
pool.destroyDelay = kDestroyDelay;
GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel =
(GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
@ -99,25 +98,24 @@ NSString *kDummyPath = @"/dummy/path";
[channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
GRPCChannel *wrappedChannel = channel.wrappedChannel;
[channel unrefUnmanagedCall:call];
[channel destroyUnmanagedCall:call];
// Confirm channel is not destroyed at this time
call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
[channel unrefUnmanagedCall:call];
[channel destroyUnmanagedCall:call];
sleep(kDestroyDelay + 1);
// Confirm channel is new at this time
call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel);
// Confirm the new channel can create call
call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssert(call != NULL);
[channel unrefUnmanagedCall:call];
[channel destroyUnmanagedCall:call];
}
- (void)testPoolDisconnection {
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel =
(GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
@ -141,12 +139,12 @@ NSString *kDummyPath = @"/dummy/path";
[channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssertNotNil(channel.wrappedChannel);
XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel);
[channel unrefUnmanagedCall:call];
[channel unrefUnmanagedCall:call2];
[channel destroyUnmanagedCall:call];
[channel destroyUnmanagedCall:call2];
}
- (void)testUnrefCallFromStaleChannel {
GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel =
(GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
@ -163,12 +161,12 @@ NSString *kDummyPath = @"/dummy/path";
// destroy state
XCTAssertNotNil(channel.wrappedChannel);
GRPCChannel *wrappedChannel = channel.wrappedChannel;
[channel unrefUnmanagedCall:call];
[channel destroyUnmanagedCall:call];
XCTAssertNotNil(channel.wrappedChannel);
XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
// Test unref the call of the current channel will cause the channel going into timed destroy
// state
[channel unrefUnmanagedCall:call2];
[channel destroyUnmanagedCall:call2];
XCTAssertNil(channel.wrappedChannel);
}

@ -35,7 +35,7 @@
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions;
- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall;
- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall;
@end
@ -66,7 +66,7 @@
return (grpc_call *)(++_grpcCallCounter);
}
- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall {
- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall {
if (_unrefExpectation) [_unrefExpectation fulfill];
}

@ -554,13 +554,14 @@ static GRPCProtoMethod *kFullDuplexCallMethod;
__weak XCTestExpectation *completion = [self expectationWithDescription:@"Timeout in a second."];
NSString *const kDummyAddress = [NSString stringWithFormat:@"8.8.8.8:1"];
GRPCCall *call = [[GRPCCall alloc] initWithHost:kDummyAddress
path:@"/dummyPath"
requestsWriter:[GRXWriter writerWithValue:[NSData data]]];
[GRPCCall useInsecureConnectionsForHost:kDummyAddress];
[GRPCCall setMinConnectTimeout:timeout * 1000
initialBackoff:backoff * 1000
maxBackoff:0
forHost:kDummyAddress];
GRPCCall *call = [[GRPCCall alloc] initWithHost:kDummyAddress
path:@"/dummyPath"
requestsWriter:[GRXWriter writerWithValue:[NSData data]]];
NSDate *startTime = [NSDate date];
id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(id value) {
XCTAssert(NO, @"Received message. Should not reach here");

Loading…
Cancel
Save