diff --git a/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m b/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m index 703cff63bb0..ae60d6208e1 100644 --- a/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m +++ b/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m @@ -36,7 +36,7 @@ } + (void)closeOpenConnections { - [[GRPCChannelPool sharedInstance] closeOpenConnections]; + [[GRPCChannelPool sharedInstance] disconnectAllChannels]; } + (void)setDefaultCompressMethod:(GRPCCompressAlgorithm)algorithm forhost:(nonnull NSString *)host { diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 94e470d4ed9..bf9441c27eb 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -69,11 +69,8 @@ const char *kCFStreamVarName = "grpc_cfstream"; - (instancetype)initWithHost:(NSString *)host path:(NSString *)path safety:(GRPCCallSafety)safety { NSAssert(host.length != 0 && path.length != 0, @"Host and Path cannot be empty"); - if (host.length == 0) { - host = [NSString string]; - } - if (path.length == 0) { - path = [NSString string]; + if (host.length == 0 || path.length == 0) { + return nil; } if ((self = [super init])) { _host = [host copy]; @@ -173,7 +170,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)start { - GRPCCall *call = nil; + GRPCCall *copiedCall = nil; @synchronized(self) { NSAssert(!_started, @"Call already started."); NSAssert(!_canceled, @"Call already canceled."); @@ -197,7 +194,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; if (_callOptions.initialMetadata) { [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata]; } - call = _call; + copiedCall = _call; } void (^valueHandler)(id value) = ^(id value) { @@ -235,11 +232,11 @@ const char *kCFStreamVarName = "grpc_cfstream"; }; id responseWriteable = [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler]; - [call startWithWriteable:responseWriteable]; + [copiedCall startWithWriteable:responseWriteable]; } - (void)cancel { - GRPCCall *call = nil; + GRPCCall *copiedCall = nil; @synchronized(self) { if (_canceled) { return; @@ -247,7 +244,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; _canceled = YES; - call = _call; + copiedCall = _call; _call = nil; _pipe = nil; @@ -268,13 +265,15 @@ const char *kCFStreamVarName = "grpc_cfstream"; @"Canceled by app" }]]; }); + } else { + _handler = nil; } } - [call cancel]; + [copiedCall cancel]; } - (void)writeData:(NSData *)data { - GRXBufferedPipe *pipe = nil; + GRXBufferedPipe *copiedPipe = nil; @synchronized(self) { NSAssert(!_canceled, @"Call arleady canceled."); NSAssert(!_finished, @"Call is half-closed before sending data."); @@ -286,14 +285,14 @@ const char *kCFStreamVarName = "grpc_cfstream"; } if (_pipe) { - pipe = _pipe; + copiedPipe = _pipe; } } - [pipe writeValue:data]; + [copiedPipe writeValue:data]; } - (void)finish { - GRXBufferedPipe *pipe = nil; + GRXBufferedPipe *copiedPipe = nil; @synchronized(self) { NSAssert(_started, @"Call not started."); NSAssert(!_canceled, @"Call arleady canceled."); @@ -309,12 +308,12 @@ const char *kCFStreamVarName = "grpc_cfstream"; } if (_pipe) { - pipe = _pipe; + copiedPipe = _pipe; _pipe = nil; } _finished = YES; } - [pipe writesFinishedWithError:nil]; + [copiedPipe writesFinishedWithError:nil]; } - (void)issueInitialMetadata:(NSDictionary *)initialMetadata { @@ -322,11 +321,11 @@ const char *kCFStreamVarName = "grpc_cfstream"; if (initialMetadata != nil && [_handler respondsToSelector:@selector(receivedInitialMetadata:)]) { dispatch_async(_dispatchQueue, ^{ - id handler = nil; + id copiedHandler = nil; @synchronized(self) { - handler = self->_handler; + copiedHandler = self->_handler; } - [handler receivedInitialMetadata:initialMetadata]; + [copiedHandler receivedInitialMetadata:initialMetadata]; }); } } @@ -336,11 +335,11 @@ const char *kCFStreamVarName = "grpc_cfstream"; @synchronized(self) { if (message != nil && [_handler respondsToSelector:@selector(receivedRawMessage:)]) { dispatch_async(_dispatchQueue, ^{ - id handler = nil; + id copiedHandler = nil; @synchronized(self) { - handler = self->_handler; + copiedHandler = self->_handler; } - [handler receivedRawMessage:message]; + [copiedHandler receivedRawMessage:message]; }); } } @@ -350,14 +349,16 @@ const char *kCFStreamVarName = "grpc_cfstream"; @synchronized(self) { if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { dispatch_async(_dispatchQueue, ^{ - id handler = nil; + id copiedHandler = nil; @synchronized(self) { - handler = self->_handler; + copiedHandler = self->_handler; // Clean up _handler so that no more responses are reported to the handler. self->_handler = nil; } - [handler closedWithTrailingMetadata:trailingMetadata error:error]; + [copiedHandler closedWithTrailingMetadata:trailingMetadata error:error]; }); + } else { + _handler = nil; } } } diff --git a/src/objective-c/GRPCClient/private/ChannelArgsUtil.m b/src/objective-c/GRPCClient/private/ChannelArgsUtil.m index d9e44b557d1..3135fcff028 100644 --- a/src/objective-c/GRPCClient/private/ChannelArgsUtil.m +++ b/src/objective-c/GRPCClient/private/ChannelArgsUtil.m @@ -60,36 +60,35 @@ grpc_channel_args *GRPCBuildChannelArgs(NSDictionary *dictionary) { NSUInteger argCount = [keys count]; grpc_channel_args *channelArgs = gpr_malloc(sizeof(grpc_channel_args)); - channelArgs->num_args = argCount; channelArgs->args = gpr_malloc(argCount * sizeof(grpc_arg)); // TODO(kriswuollett) Check that keys adhere to GRPC core library requirements + NSUInteger j = 0; for (NSUInteger i = 0; i < argCount; ++i) { - grpc_arg *arg = &channelArgs->args[i]; + grpc_arg *arg = &channelArgs->args[j]; arg->key = gpr_strdup([keys[i] UTF8String]); id value = dictionary[keys[i]]; if ([value respondsToSelector:@selector(UTF8String)]) { arg->type = GRPC_ARG_STRING; arg->value.string = gpr_strdup([value UTF8String]); + j++; } else if ([value respondsToSelector:@selector(intValue)]) { - if ([value compare:[NSNumber numberWithInteger:INT_MAX]] == NSOrderedDescending || - [value compare:[NSNumber numberWithInteger:INT_MIN]] == NSOrderedAscending) { - [NSException raise:NSInvalidArgumentException - format:@"Out of range for a value-typed channel argument: %@", value]; + int64_t value64 = [value longLongValue]; + if (value64 <= INT_MAX || value64 >= INT_MIN) { + arg->type = GRPC_ARG_INTEGER; + arg->value.integer = [value intValue]; + j++; } - arg->type = GRPC_ARG_INTEGER; - arg->value.integer = [value intValue]; } else if (value != nil) { arg->type = GRPC_ARG_POINTER; arg->value.pointer.p = (__bridge_retained void *)value; arg->value.pointer.vtable = &objc_arg_vtable; - } else { - [NSException raise:NSInvalidArgumentException - format:@"Invalid channel argument type: %@", [value class]]; + j++; } } + channelArgs->num_args = j; return channelArgs; } diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h index 5426b28d758..147015bed10 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.h +++ b/src/objective-c/GRPCClient/private/GRPCChannel.h @@ -32,6 +32,10 @@ NS_ASSUME_NONNULL_BEGIN /** Caching signature of a channel. */ @interface GRPCChannelConfiguration : NSObject +- (instancetype)init NS_UNAVAILABLE; + ++ (instancetype) new NS_UNAVAILABLE; + /** The host that this channel is connected to. */ @property(copy, readonly) NSString *host; @@ -47,7 +51,7 @@ NS_ASSUME_NONNULL_BEGIN /** Acquire the dictionary of channel args with current configurations. */ @property(copy, readonly) NSDictionary *channelArgs; -- (nullable instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions; +- (nullable instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions NS_DESIGNATED_INITIALIZER; @end diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m index e4cefc338c6..24cf670d1b5 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.m +++ b/src/objective-c/GRPCClient/private/GRPCChannel.m @@ -39,8 +39,9 @@ - (instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions { NSAssert(host.length > 0, @"Host must not be empty."); NSAssert(callOptions != nil, @"callOptions must not be empty."); - if (host.length == 0) return nil; - if (callOptions == nil) return nil; + if (host.length == 0 || callOptions == nil) { + return nil; + } if ((self = [super init])) { _host = [host copy]; @@ -180,7 +181,9 @@ - (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration { NSAssert(channelConfiguration != nil, @"channelConfiguration must not be empty."); - if (channelConfiguration == nil) return nil; + if (channelConfiguration == nil) { + return nil; + } if ((self = [super init])) { _configuration = [channelConfiguration copy]; @@ -218,35 +221,34 @@ if (callOptions == nil) return NULL; grpc_call *call = NULL; - @synchronized(self) { - NSAssert(_unmanagedChannel != NULL, @"Channel should have valid unmanaged channel."); - if (_unmanagedChannel == NULL) return NULL; - - NSString *serverAuthority = - callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority; - NSTimeInterval timeout = callOptions.timeout; - NSAssert(timeout >= 0, @"Invalid timeout"); - if (timeout < 0) return NULL; - grpc_slice host_slice = grpc_empty_slice(); - if (serverAuthority) { - host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String); - } - grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String); - gpr_timespec deadline_ms = - timeout == 0 ? gpr_inf_future(GPR_CLOCK_REALTIME) - : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN)); - call = grpc_channel_create_call(_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, - queue.unmanagedQueue, path_slice, - serverAuthority ? &host_slice : NULL, deadline_ms, NULL); - if (serverAuthority) { - grpc_slice_unref(host_slice); - } - grpc_slice_unref(path_slice); - NSAssert(call != nil, @"Unable to create call."); - if (call == NULL) { - NSLog(@"Unable to create call."); - } + // No need to lock here since _unmanagedChannel is only changed in _dealloc + NSAssert(_unmanagedChannel != NULL, @"Channel should have valid unmanaged channel."); + if (_unmanagedChannel == NULL) return NULL; + + NSString *serverAuthority = + callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority; + NSTimeInterval timeout = callOptions.timeout; + NSAssert(timeout >= 0, @"Invalid timeout"); + if (timeout < 0) return NULL; + grpc_slice host_slice = grpc_empty_slice(); + if (serverAuthority) { + host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String); + } + grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String); + gpr_timespec deadline_ms = + timeout == 0 ? gpr_inf_future(GPR_CLOCK_REALTIME) + : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN)); + call = grpc_channel_create_call(_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, + queue.unmanagedQueue, path_slice, + serverAuthority ? &host_slice : NULL, deadline_ms, NULL); + if (serverAuthority) { + grpc_slice_unref(host_slice); + } + grpc_slice_unref(path_slice); + NSAssert(call != nil, @"Unable to create call."); + if (call == NULL) { + NSLog(@"Unable to create call."); } return call; } diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.h b/src/objective-c/GRPCClient/private/GRPCChannelPool.h index 1e3c1d7d976..26600ef3a96 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannelPool.h +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.h @@ -39,12 +39,16 @@ NS_ASSUME_NONNULL_BEGIN */ @interface GRPCPooledChannel : NSObject +- (nullable instancetype)init NS_UNAVAILABLE; + ++ (nullable instancetype)new NS_UNAVAILABLE; + /** * Initialize with an actual channel object \a channel and a reference to the channel pool. */ - (nullable instancetype)initWithChannelConfiguration: (GRPCChannelConfiguration *)channelConfiguration - channelPool:(GRPCChannelPool *)channelPool; + channelPool:(GRPCChannelPool *)channelPool NS_DESIGNATED_INITIALIZER; /** * Create a grpc core call object (grpc_call) from this channel. If channel is disconnected, get a @@ -59,17 +63,21 @@ NS_ASSUME_NONNULL_BEGIN * \a unmanagedCallWithPath:completionQueue:callOptions: and decrease channel refcount. If refcount * of the channel becomes 0, return the channel object to channel pool. */ -- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall; +- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall; /** - * Force the channel to disconnect immediately. + * Force the channel to disconnect immediately. Subsequent calls to unmanagedCallWithPath: will + * attempt to reconnect to the remote channel. */ - (void)disconnect; -// The following methods and properties are for test only +@end + +/** Test-only interface for \a GRPCPooledChannel. */ +@interface GRPCPooledChannel (Test) /** - * Return the pointer to the real channel wrapped by the proxy. + * Return the pointer to the raw channel wrapped. */ @property(atomic, readonly) GRPCChannel *wrappedChannel; @@ -81,6 +89,10 @@ NS_ASSUME_NONNULL_BEGIN */ @interface GRPCChannelPool : NSObject +- (nullable instancetype)init NS_UNAVAILABLE; + ++ (nullable instancetype)new NS_UNAVAILABLE; + /** * Get the global channel pool. */ @@ -92,31 +104,20 @@ NS_ASSUME_NONNULL_BEGIN - (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions; /** - * This method is deprecated. - * - * Destroy all open channels and close their connections. + * Disconnect all channels in this pool. */ -- (void)closeOpenConnections; - -// Test-only methods below +- (void)disconnectAllChannels; -/** - * Get an instance of pool isolated from the global shared pool. This method is for test only. - * Global pool should be used in production. - */ -- (nullable instancetype)init; +@end -/** - * Simulate a network transition event and destroy all channels. This method is for internal and - * test only. - */ -- (void)disconnectAllChannels; +/** Test-only interface for \a GRPCChannelPool. */ +@interface GRPCChannelPool (Test) /** - * Set the destroy delay of channels. A channel should be destroyed if it stayed idle (no active - * call on it) for this period of time. This property is for test only. + * Get an instance of pool isolated from the global shared pool with channels' destroy delay being + * \a destroyDelay. */ -@property(atomic) NSTimeInterval destroyDelay; +- (nullable instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay; @end diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m index 7c139b37176..f6615b58405 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m @@ -52,10 +52,9 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; __weak GRPCChannelPool *_channelPool; GRPCChannelConfiguration *_channelConfiguration; NSMutableSet *_unmanagedCalls; + GRPCChannel *_wrappedChannel; } -@synthesize wrappedChannel = _wrappedChannel; - - (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration channelPool:(GRPCChannelPool *)channelPool { NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty."); @@ -68,11 +67,17 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; _channelPool = channelPool; _channelConfiguration = channelConfiguration; _unmanagedCalls = [NSMutableSet set]; + _wrappedChannel = nil; } return self; } +- (void)dealloc { + NSAssert([_unmanagedCalls count] == 0 && _wrappedChannel == nil, @"Pooled channel should only be" + "destroyed after the wrapped channel is destroyed"); +} + - (grpc_call *)unmanagedCallWithPath:(NSString *)path completionQueue:(GRPCCompletionQueue *)queue callOptions:(GRPCCallOptions *)callOptions { @@ -99,31 +104,38 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; return call; } -- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall { - if (unmanagedCall == nil) return; +- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall { + if (unmanagedCall == NULL) { + return; + } grpc_call_unref(unmanagedCall); - BOOL timedDestroy = NO; @synchronized(self) { - if ([_unmanagedCalls containsObject:[NSValue valueWithPointer:unmanagedCall]]) { - [_unmanagedCalls removeObject:[NSValue valueWithPointer:unmanagedCall]]; - if ([_unmanagedCalls count] == 0) { - timedDestroy = YES; - } + NSValue *removedCall = [NSValue valueWithPointer:unmanagedCall]; + [_unmanagedCalls removeObject:removedCall]; + if ([_unmanagedCalls count] == 0) { + _wrappedChannel = nil; + GRPCChannelPool *strongPool = _channelPool; + [strongPool unrefChannelWithConfiguration:_channelConfiguration]; } } - if (timedDestroy) { - [self timedDestroy]; - } } - (void)disconnect { @synchronized(self) { - _wrappedChannel = nil; - [_unmanagedCalls removeAllObjects]; + if (_wrappedChannel != nil) { + _wrappedChannel = nil; + [_unmanagedCalls removeAllObjects]; + GRPCChannelPool *strongPool = _channelPool; + [strongPool unrefChannelWithConfiguration:_channelConfiguration]; + } } } +@end + +@implementation GRPCPooledChannel (Test) + - (GRPCChannel *)wrappedChannel { GRPCChannel *channel = nil; @synchronized(self) { @@ -132,67 +144,52 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; return channel; } -- (void)timedDestroy { - __strong GRPCChannelPool *pool = nil; - @synchronized(self) { - // Check if we still want to destroy the channel. - if ([_unmanagedCalls count] == 0) { - pool = _channelPool; - _wrappedChannel = nil; - } - } - [pool unrefChannelWithConfiguration:_channelConfiguration]; -} - @end /** * A convenience value type for cached channel. */ -@interface GRPCChannelRecord : NSObject +@interface GRPCChannelRecord : NSObject /** Pointer to the raw channel. May be nil when the channel has been destroyed. */ @property GRPCChannel *channel; /** Channel proxy corresponding to this channel configuration. */ -@property GRPCPooledChannel *proxy; +@property GRPCPooledChannel *pooledChannel; /** Last time when a timed destroy is initiated on the channel. */ @property NSDate *timedDestroyDate; /** Reference count of the proxy to the channel. */ -@property NSUInteger refcount; +@property NSUInteger refCount; @end @implementation GRPCChannelRecord -- (id)copyWithZone:(NSZone *)zone { - GRPCChannelRecord *newRecord = [[GRPCChannelRecord allocWithZone:zone] init]; - newRecord.channel = _channel; - newRecord.proxy = _proxy; - newRecord.timedDestroyDate = _timedDestroyDate; - newRecord.refcount = _refcount; +@end - return newRecord; -} +@interface GRPCChannelPool () + +- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay NS_DESIGNATED_INITIALIZER; @end @implementation GRPCChannelPool { NSMutableDictionary *_channelPool; dispatch_queue_t _dispatchQueue; + NSTimeInterval _destroyDelay; } + (instancetype)sharedInstance { dispatch_once(&gInitChannelPool, ^{ - gChannelPool = [[GRPCChannelPool alloc] init]; + gChannelPool = [[GRPCChannelPool alloc] initInstanceWithDestroyDelay:kDefaultChannelDestroyDelay]; NSAssert(gChannelPool != nil, @"Cannot initialize global channel pool."); }); return gChannelPool; } -- (instancetype)init { +- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay { if ((self = [super init])) { _channelPool = [NSMutableDictionary dictionary]; #if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 @@ -206,7 +203,7 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; #endif _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); } - _destroyDelay = kDefaultChannelDestroyDelay; + _destroyDelay = destroyDelay; // Connectivity monitor is not required for CFStream char *enableCFStream = getenv(kCFStreamVarName); @@ -217,56 +214,56 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; return self; } +- (void)dealloc { + [GRPCConnectivityMonitor unregisterObserver:self]; +} + - (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions { NSAssert(host.length > 0, @"Host must not be empty."); NSAssert(callOptions != nil, @"callOptions must not be empty."); - if (host.length == 0) return nil; - if (callOptions == nil) return nil; + if (host.length == 0 || callOptions == nil) { + return nil; + } - GRPCPooledChannel *channelProxy = nil; + GRPCPooledChannel *pooledChannel = nil; GRPCChannelConfiguration *configuration = [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions]; @synchronized(self) { GRPCChannelRecord *record = _channelPool[configuration]; if (record == nil) { record = [[GRPCChannelRecord alloc] init]; - record.proxy = + record.pooledChannel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration channelPool:self]; - record.timedDestroyDate = nil; _channelPool[configuration] = record; - channelProxy = record.proxy; + pooledChannel = record.pooledChannel; } else { - channelProxy = record.proxy; + pooledChannel = record.pooledChannel; } } - return channelProxy; -} - -- (void)closeOpenConnections { - [self disconnectAllChannels]; + return pooledChannel; } - (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration { GRPCChannel *ret = nil; @synchronized(self) { NSAssert(configuration != nil, @"configuration cannot be empty."); - if (configuration == nil) return nil; + if (configuration == nil) { + return nil; + } GRPCChannelRecord *record = _channelPool[configuration]; NSAssert(record != nil, @"No record corresponding to a proxy."); - if (record == nil) return nil; + if (record == nil) { + return nil; + } + record.refCount++; + record.timedDestroyDate = nil; if (record.channel == nil) { // Channel is already destroyed; record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration]; - record.timedDestroyDate = nil; - record.refcount = 1; - ret = record.channel; - } else { - ret = record.channel; - record.timedDestroyDate = nil; - record.refcount++; } + ret = record.channel; } return ret; } @@ -275,11 +272,13 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; @synchronized(self) { GRPCChannelRecord *record = _channelPool[configuration]; NSAssert(record != nil, @"No record corresponding to a proxy."); - if (record == nil) return; - NSAssert(record.refcount > 0, @"Inconsistent channel refcount."); - if (record.refcount > 0) { - record.refcount--; - if (record.refcount == 0) { + if (record == nil) { + return; + } + NSAssert(record.refCount > 0, @"Inconsistent channel refcount."); + if (record.refCount > 0) { + record.refCount--; + if (record.refCount == 0) { NSDate *now = [NSDate date]; record.timedDestroyDate = now; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)), @@ -288,7 +287,6 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; if (now == record.timedDestroyDate) { // Destroy the raw channel and reset related records. record.timedDestroyDate = nil; - record.refcount = 0; record.channel = nil; } } @@ -306,8 +304,7 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; GRPCChannelRecord *_Nonnull obj, BOOL *_Nonnull stop) { obj.channel = nil; obj.timedDestroyDate = nil; - obj.refcount = 0; - [proxySet addObject:obj.proxy]; + [proxySet addObject:obj.pooledChannel]; }]; } // Disconnect proxies @@ -320,8 +317,12 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; [self disconnectAllChannels]; } -- (void)dealloc { - [GRPCConnectivityMonitor unregisterObserver:self]; +@end + +@implementation GRPCChannelPool (Test) + +- (instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay { + return [self initInstanceWithDestroyDelay:destroyDelay]; } @end diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index 615dfc85569..5788d0a003f 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -317,7 +317,7 @@ } - (void)dealloc { - [_channel unrefUnmanagedCall:_call]; + [_channel destroyUnmanagedCall:_call]; _channel = nil; } diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m index 2de2932072a..dff88b8591f 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.m +++ b/src/objective-c/ProtoRPC/ProtoRPC.m @@ -133,18 +133,18 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing } - (void)cancel { - GRPCCall2 *call; + GRPCCall2 *copiedCall; @synchronized(self) { - call = _call; + copiedCall = _call; _call = nil; if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { dispatch_async(_handler.dispatchQueue, ^{ - id handler = nil; + id copiedHandler = nil; @synchronized(self) { - handler = self->_handler; + copiedHandler = self->_handler; self->_handler = nil; } - [handler closedWithTrailingMetadata:nil + [copiedHandler closedWithTrailingMetadata:nil error:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeCancelled userInfo:@{ @@ -152,9 +152,11 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing @"Canceled by app" }]]; }); + } else { + _handler = nil; } } - [call cancel]; + [copiedCall cancel]; } - (void)writeMessage:(GPBMessage *)message { @@ -182,11 +184,11 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing @synchronized(self) { if (initialMetadata != nil && [_handler respondsToSelector:@selector(initialMetadata:)]) { dispatch_async(_dispatchQueue, ^{ - id handler = nil; + id copiedHandler = nil; @synchronized(self) { - handler = self->_handler; + copiedHandler = self->_handler; } - [handler receivedInitialMetadata:initialMetadata]; + [copiedHandler receivedInitialMetadata:initialMetadata]; }); } } @@ -200,21 +202,21 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing @synchronized(self) { if (parsed && [_handler respondsToSelector:@selector(receivedProtoMessage:)]) { dispatch_async(_dispatchQueue, ^{ - id handler = nil; + id copiedHandler = nil; @synchronized(self) { - handler = self->_handler; + copiedHandler = self->_handler; } - [handler receivedProtoMessage:parsed]; + [copiedHandler receivedProtoMessage:parsed]; }); } else if (!parsed && [_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { dispatch_async(_dispatchQueue, ^{ - id handler = nil; + id copiedHandler = nil; @synchronized(self) { - handler = self->_handler; + copiedHandler = self->_handler; self->_handler = nil; } - [handler closedWithTrailingMetadata:nil + [copiedHandler closedWithTrailingMetadata:nil error:ErrorForBadProto(message, _responseClass, error)]; }); [_call cancel]; @@ -227,12 +229,12 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing @synchronized(self) { if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { dispatch_async(_dispatchQueue, ^{ - id handler = nil; + id copiedHandler = nil; @synchronized(self) { - handler = self->_handler; + copiedHandler = self->_handler; self->_handler = nil; } - [handler closedWithTrailingMetadata:trailingMetadata error:error]; + [copiedHandler closedWithTrailingMetadata:trailingMetadata error:error]; }); } _call = nil; diff --git a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m index 4424801c110..9461945560a 100644 --- a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m +++ b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m @@ -28,6 +28,8 @@ NSString *kDummyHost = @"dummy.host"; NSString *kDummyHost2 = @"dummy.host.2"; NSString *kDummyPath = @"/dummy/path"; +const NSTimeInterval kDestroyDelay = 1.0; + @interface ChannelPoolTest : XCTestCase @end @@ -39,7 +41,7 @@ NSString *kDummyPath = @"/dummy/path"; } - (void)testCreateChannelAndCall { - GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; + GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; @@ -49,12 +51,12 @@ NSString *kDummyPath = @"/dummy/path"; [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; XCTAssert(call != NULL); XCTAssertNotNil(channel.wrappedChannel); - [channel unrefUnmanagedCall:call]; + [channel destroyUnmanagedCall:call]; XCTAssertNil(channel.wrappedChannel); } - (void)testCacheChannel { - GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; + GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; GRPCCallOptions *options1 = [[GRPCCallOptions alloc] init]; GRPCCallOptions *options2 = [options1 copy]; GRPCMutableCallOptions *options3 = [options1 mutableCopy]; @@ -80,17 +82,14 @@ NSString *kDummyPath = @"/dummy/path"; XCTAssertNotEqual(channel1.wrappedChannel, channel3.wrappedChannel); XCTAssertNotEqual(channel1.wrappedChannel, channel4.wrappedChannel); XCTAssertNotEqual(channel3.wrappedChannel, channel4.wrappedChannel); - [channel1 unrefUnmanagedCall:call1]; - [channel2 unrefUnmanagedCall:call2]; - [channel3 unrefUnmanagedCall:call3]; - [channel4 unrefUnmanagedCall:call4]; + [channel1 destroyUnmanagedCall:call1]; + [channel2 destroyUnmanagedCall:call2]; + [channel3 destroyUnmanagedCall:call3]; + [channel4 destroyUnmanagedCall:call4]; } - (void)testTimedDestroyChannel { - const NSTimeInterval kDestroyDelay = 1.0; - - GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; - pool.destroyDelay = kDestroyDelay; + GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; @@ -99,25 +98,24 @@ NSString *kDummyPath = @"/dummy/path"; [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; GRPCChannel *wrappedChannel = channel.wrappedChannel; - [channel unrefUnmanagedCall:call]; + [channel destroyUnmanagedCall:call]; // Confirm channel is not destroyed at this time call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; XCTAssertEqual(wrappedChannel, channel.wrappedChannel); - [channel unrefUnmanagedCall:call]; + [channel destroyUnmanagedCall:call]; sleep(kDestroyDelay + 1); // Confirm channel is new at this time call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel); // Confirm the new channel can create call - call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; XCTAssert(call != NULL); - [channel unrefUnmanagedCall:call]; + [channel destroyUnmanagedCall:call]; } - (void)testPoolDisconnection { - GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; + GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; @@ -141,12 +139,12 @@ NSString *kDummyPath = @"/dummy/path"; [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; XCTAssertNotNil(channel.wrappedChannel); XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel); - [channel unrefUnmanagedCall:call]; - [channel unrefUnmanagedCall:call2]; + [channel destroyUnmanagedCall:call]; + [channel destroyUnmanagedCall:call2]; } - (void)testUnrefCallFromStaleChannel { - GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; + GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; @@ -163,12 +161,12 @@ NSString *kDummyPath = @"/dummy/path"; // destroy state XCTAssertNotNil(channel.wrappedChannel); GRPCChannel *wrappedChannel = channel.wrappedChannel; - [channel unrefUnmanagedCall:call]; + [channel destroyUnmanagedCall:call]; XCTAssertNotNil(channel.wrappedChannel); XCTAssertEqual(wrappedChannel, channel.wrappedChannel); // Test unref the call of the current channel will cause the channel going into timed destroy // state - [channel unrefUnmanagedCall:call2]; + [channel destroyUnmanagedCall:call2]; XCTAssertNil(channel.wrappedChannel); } diff --git a/src/objective-c/tests/ChannelTests/ChannelTests.m b/src/objective-c/tests/ChannelTests/ChannelTests.m index c07c8e69834..55474490926 100644 --- a/src/objective-c/tests/ChannelTests/ChannelTests.m +++ b/src/objective-c/tests/ChannelTests/ChannelTests.m @@ -35,7 +35,7 @@ completionQueue:(GRPCCompletionQueue *)queue callOptions:(GRPCCallOptions *)callOptions; -- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall; +- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall; @end @@ -66,7 +66,7 @@ return (grpc_call *)(++_grpcCallCounter); } -- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall { +- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall { if (_unrefExpectation) [_unrefExpectation fulfill]; } diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index 834bf6d661a..2cfdd1a003b 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -554,13 +554,14 @@ static GRPCProtoMethod *kFullDuplexCallMethod; __weak XCTestExpectation *completion = [self expectationWithDescription:@"Timeout in a second."]; NSString *const kDummyAddress = [NSString stringWithFormat:@"8.8.8.8:1"]; - GRPCCall *call = [[GRPCCall alloc] initWithHost:kDummyAddress - path:@"/dummyPath" - requestsWriter:[GRXWriter writerWithValue:[NSData data]]]; + [GRPCCall useInsecureConnectionsForHost:kDummyAddress]; [GRPCCall setMinConnectTimeout:timeout * 1000 initialBackoff:backoff * 1000 maxBackoff:0 forHost:kDummyAddress]; + GRPCCall *call = [[GRPCCall alloc] initWithHost:kDummyAddress + path:@"/dummyPath" + requestsWriter:[GRXWriter writerWithValue:[NSData data]]]; NSDate *startTime = [NSDate date]; id responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(id value) { XCTAssert(NO, @"Received message. Should not reach here");