batch fixes

pull/16190/head
Muxi Yan 6 years ago
parent f9e50322bf
commit da759f1fc6
  1. 2
      src/objective-c/GRPCClient/GRPCCall.h
  2. 5
      src/objective-c/GRPCClient/GRPCCall.m
  3. 4
      src/objective-c/GRPCClient/GRPCCallOptions.h
  4. 32
      src/objective-c/GRPCClient/GRPCCallOptions.m
  5. 5
      src/objective-c/GRPCClient/private/GRPCChannel.h
  6. 8
      src/objective-c/GRPCClient/private/GRPCChannel.m
  7. 13
      src/objective-c/GRPCClient/private/GRPCChannelPool.h
  8. 78
      src/objective-c/GRPCClient/private/GRPCChannelPool.m
  9. 8
      src/objective-c/GRPCClient/private/GRPCCronetChannelFactory.m
  10. 7
      src/objective-c/GRPCClient/private/GRPCHost.m
  11. 5
      src/objective-c/GRPCClient/private/GRPCSecureChannelFactory.m
  12. 11
      src/objective-c/GRPCClient/private/GRPCWrappedCall.m
  13. 22
      src/objective-c/GRPCClient/private/utilities.h

@ -206,7 +206,7 @@ extern NSString *const kGRPCTrailersKey;
@property(copy, readonly) NSString *path;
/**
* Specify whether the call is idempotent or cachable. gRPC may select different HTTP verbs for the
* call based on this information.
* call based on this information. The default verb used by gRPC is POST.
*/
@property(readonly) GRPCCallSafety safety;

@ -35,7 +35,6 @@
#import "private/NSData+GRPC.h"
#import "private/NSDictionary+GRPC.h"
#import "private/NSError+GRPC.h"
#import "private/utilities.h"
#import "private/GRPCChannelPool.h"
#import "private/GRPCCompletionQueue.h"
@ -277,7 +276,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
- (void)writeData:(NSData *)data {
GRXBufferedPipe *copiedPipe = nil;
@synchronized(self) {
NSAssert(!_canceled, @"Call arleady canceled.");
NSAssert(!_canceled, @"Call already canceled.");
NSAssert(!_finished, @"Call is half-closed before sending data.");
if (_canceled) {
return;
@ -297,7 +296,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
GRXBufferedPipe *copiedPipe = nil;
@synchronized(self) {
NSAssert(_started, @"Call not started.");
NSAssert(!_canceled, @"Call arleady canceled.");
NSAssert(!_canceled, @"Call already canceled.");
NSAssert(!_finished, @"Call already half-closed.");
if (!_started) {
return;

@ -170,7 +170,7 @@ typedef NS_ENUM(NSUInteger, GRPCTransportType) {
/**
* PEM format certificate chain for client authentication, if required by the server.
*/
@property(copy, readonly, nullable) NSString *PEMCertChain;
@property(copy, readonly, nullable) NSString *PEMCertificateChain;
/**
* Select the transport type to be used for this call.
@ -314,7 +314,7 @@ typedef NS_ENUM(NSUInteger, GRPCTransportType) {
/**
* PEM format certificate chain for client authentication, if required by the server.
*/
@property(copy, readwrite, nullable) NSString *PEMCertChain;
@property(copy, readwrite, nullable) NSString *PEMCertificateChain;
/**
* Select the transport type to be used for this call.

@ -35,7 +35,7 @@ static const NSTimeInterval kDefaultConnectMaxBackoff = 0;
static NSDictionary *const kDefaultAdditionalChannelArgs = nil;
static NSString *const kDefaultPEMRootCertificates = nil;
static NSString *const kDefaultPEMPrivateKey = nil;
static NSString *const kDefaultPEMCertChain = nil;
static NSString *const kDefaultPEMCertificateChain = nil;
static NSString *const kDefaultOauth2AccessToken = nil;
static const id<GRPCAuthorizationProtocol> kDefaultAuthTokenProvider = nil;
static const GRPCTransportType kDefaultTransportType = GRPCTransportTypeChttp2BoringSSL;
@ -74,7 +74,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
NSDictionary *_additionalChannelArgs;
NSString *_PEMRootCertificates;
NSString *_PEMPrivateKey;
NSString *_PEMCertChain;
NSString *_PEMCertificateChain;
GRPCTransportType _transportType;
NSString *_hostNameOverride;
id<NSObject> _logContext;
@ -99,7 +99,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
@synthesize additionalChannelArgs = _additionalChannelArgs;
@synthesize PEMRootCertificates = _PEMRootCertificates;
@synthesize PEMPrivateKey = _PEMPrivateKey;
@synthesize PEMCertChain = _PEMCertChain;
@synthesize PEMCertificateChain = _PEMCertificateChain;
@synthesize transportType = _transportType;
@synthesize hostNameOverride = _hostNameOverride;
@synthesize logContext = _logContext;
@ -124,7 +124,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
additionalChannelArgs:kDefaultAdditionalChannelArgs
PEMRootCertificates:kDefaultPEMRootCertificates
PEMPrivateKey:kDefaultPEMPrivateKey
PEMCertChain:kDefaultPEMCertChain
PEMCertificateChain:kDefaultPEMCertificateChain
transportType:kDefaultTransportType
hostNameOverride:kDefaultHostNameOverride
logContext:kDefaultLogContext
@ -149,7 +149,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
additionalChannelArgs:(NSDictionary *)additionalChannelArgs
PEMRootCertificates:(NSString *)PEMRootCertificates
PEMPrivateKey:(NSString *)PEMPrivateKey
PEMCertChain:(NSString *)PEMCertChain
PEMCertificateChain:(NSString *)PEMCertificateChain
transportType:(GRPCTransportType)transportType
hostNameOverride:(NSString *)hostNameOverride
logContext:(id)logContext
@ -174,7 +174,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
[[NSDictionary alloc] initWithDictionary:additionalChannelArgs copyItems:YES];
_PEMRootCertificates = [PEMRootCertificates copy];
_PEMPrivateKey = [PEMPrivateKey copy];
_PEMCertChain = [PEMCertChain copy];
_PEMCertificateChain = [PEMCertificateChain copy];
_transportType = transportType;
_hostNameOverride = [hostNameOverride copy];
_logContext = logContext;
@ -203,7 +203,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
additionalChannelArgs:_additionalChannelArgs
PEMRootCertificates:_PEMRootCertificates
PEMPrivateKey:_PEMPrivateKey
PEMCertChain:_PEMCertChain
PEMCertificateChain:_PEMCertificateChain
transportType:_transportType
hostNameOverride:_hostNameOverride
logContext:_logContext
@ -233,7 +233,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
copyItems:YES]
PEMRootCertificates:[_PEMRootCertificates copy]
PEMPrivateKey:[_PEMPrivateKey copy]
PEMCertChain:[_PEMCertChain copy]
PEMCertificateChain:[_PEMCertificateChain copy]
transportType:_transportType
hostNameOverride:[_hostNameOverride copy]
logContext:_logContext
@ -256,7 +256,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
if (!areObjectsEqual(callOptions.additionalChannelArgs, _additionalChannelArgs)) return NO;
if (!areObjectsEqual(callOptions.PEMRootCertificates, _PEMRootCertificates)) return NO;
if (!areObjectsEqual(callOptions.PEMPrivateKey, _PEMPrivateKey)) return NO;
if (!areObjectsEqual(callOptions.PEMCertChain, _PEMCertChain)) return NO;
if (!areObjectsEqual(callOptions.PEMCertificateChain, _PEMCertificateChain)) return NO;
if (!areObjectsEqual(callOptions.hostNameOverride, _hostNameOverride)) return NO;
if (!(callOptions.transportType == _transportType)) return NO;
if (!areObjectsEqual(callOptions.logContext, _logContext)) return NO;
@ -280,7 +280,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
result ^= _additionalChannelArgs.hash;
result ^= _PEMRootCertificates.hash;
result ^= _PEMPrivateKey.hash;
result ^= _PEMCertChain.hash;
result ^= _PEMCertificateChain.hash;
result ^= _hostNameOverride.hash;
result ^= _transportType;
result ^= _logContext.hash;
@ -311,7 +311,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
@dynamic additionalChannelArgs;
@dynamic PEMRootCertificates;
@dynamic PEMPrivateKey;
@dynamic PEMCertChain;
@dynamic PEMCertificateChain;
@dynamic transportType;
@dynamic hostNameOverride;
@dynamic logContext;
@ -336,7 +336,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
additionalChannelArgs:kDefaultAdditionalChannelArgs
PEMRootCertificates:kDefaultPEMRootCertificates
PEMPrivateKey:kDefaultPEMPrivateKey
PEMCertChain:kDefaultPEMCertChain
PEMCertificateChain:kDefaultPEMCertificateChain
transportType:kDefaultTransportType
hostNameOverride:kDefaultHostNameOverride
logContext:kDefaultLogContext
@ -363,7 +363,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
additionalChannelArgs:_additionalChannelArgs
PEMRootCertificates:_PEMRootCertificates
PEMPrivateKey:_PEMPrivateKey
PEMCertChain:_PEMCertChain
PEMCertificateChain:_PEMCertificateChain
transportType:_transportType
hostNameOverride:_hostNameOverride
logContext:_logContext
@ -391,7 +391,7 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
additionalChannelArgs:[_additionalChannelArgs copy]
PEMRootCertificates:_PEMRootCertificates
PEMPrivateKey:_PEMPrivateKey
PEMCertChain:_PEMCertChain
PEMCertificateChain:_PEMCertificateChain
transportType:_transportType
hostNameOverride:_hostNameOverride
logContext:_logContext
@ -493,8 +493,8 @@ static BOOL areObjectsEqual(id obj1, id obj2) {
_PEMPrivateKey = [PEMPrivateKey copy];
}
- (void)setPEMCertChain:(NSString *)PEMCertChain {
_PEMCertChain = [PEMCertChain copy];
- (void)setPEMCertificateChain:(NSString *)PEMCertificateChain {
_PEMCertificateChain = [PEMCertificateChain copy];
}
- (void)setTransportType:(GRPCTransportType)transportType {

@ -29,7 +29,10 @@ struct grpc_channel_credentials;
NS_ASSUME_NONNULL_BEGIN
/** Caching signature of a channel. */
/**
* Signature for the channel. If two channel's signatures are the same and connect to the same
* remote, they share the same underlying \a GRPCChannel object.
*/
@interface GRPCChannelConfiguration : NSObject<NSCopying>
- (instancetype)init NS_UNAVAILABLE;

@ -28,7 +28,6 @@
#import "GRPCCronetChannelFactory.h"
#import "GRPCInsecureChannelFactory.h"
#import "GRPCSecureChannelFactory.h"
#import "utilities.h"
#import "version.h"
#import <GRPCClient/GRPCCall+Cronet.h>
@ -63,8 +62,9 @@
factory = [GRPCSecureChannelFactory
factoryWithPEMRootCertificates:_callOptions.PEMRootCertificates
privateKey:_callOptions.PEMPrivateKey
certChain:_callOptions.PEMCertChain
certChain:_callOptions.PEMCertificateChain
error:&error];
NSAssert(factory != nil, @"Failed to create secure channel factory");
if (factory == nil) {
NSLog(@"Error creating secure channel factory: %@", error);
}
@ -114,8 +114,8 @@
[NSNumber numberWithUnsignedInteger:(NSUInteger)(_callOptions.keepaliveTimeout * 1000)];
}
if (_callOptions.retryEnabled == NO) {
args[@GRPC_ARG_ENABLE_RETRIES] = [NSNumber numberWithInt:_callOptions.retryEnabled];
if (!_callOptions.retryEnabled) {
args[@GRPC_ARG_ENABLE_RETRIES] = [NSNumber numberWithInt:_callOptions.retryEnabled ? 1 : 0];
}
if (_callOptions.connectMinTimeout > 0) {

@ -16,11 +16,6 @@
*
*/
/**
* Signature for the channel. If two channel's signatures are the same, they share the same
* underlying \a GRPCChannel object.
*/
#import <GRPCClient/GRPCCallOptions.h>
#import "GRPCChannelFactory.h"
@ -35,10 +30,10 @@ NS_ASSUME_NONNULL_BEGIN
@class GRPCWrappedCall;
/**
* A proxied channel object that can be retained and creates GRPCWrappedCall object from. If a
* raw channel is not present (i.e. no tcp connection to the server) when a GRPCWrappedCall object
* is requested, it issues a connection/reconnection. The behavior of this object is to mimic that
* of gRPC core's channel object.
* A proxied channel object that can be retained and used to create GRPCWrappedCall object
* regardless of the current connection status. If a connection is not established when a
* GRPCWrappedCall object is requested, it issues a connection/reconnection. This behavior is to
* follow that of gRPC core's channel object.
*/
@interface GRPCPooledChannel : NSObject

@ -27,7 +27,6 @@
#import "GRPCCronetChannelFactory.h"
#import "GRPCInsecureChannelFactory.h"
#import "GRPCSecureChannelFactory.h"
#import "utilities.h"
#import "version.h"
#import "GRPCWrappedCall.h"
#import "GRPCCompletionQueue.h"
@ -57,6 +56,34 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
return [self initWithChannelConfiguration:channelConfiguration destroyDelay:kDefaultChannelDestroyDelay];
}
- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
destroyDelay:(NSTimeInterval)destroyDelay {
NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
if (channelConfiguration == nil) {
return nil;
}
if ((self = [super init])) {
_channelConfiguration = [channelConfiguration copy];
_destroyDelay = destroyDelay;
_wrappedCalls = [NSHashTable weakObjectsHashTable];
_wrappedChannel = nil;
_lastTimedDestroy = nil;
#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
if (@available(iOS 8.0, macOS 10.10, *)) {
_timerQueue = dispatch_queue_create(NULL,
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
} else {
#else
{
#endif
_timerQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
}
}
return self;
}
- (void)dealloc {
// Disconnect GRPCWrappedCall objects created but not yet removed
if (_wrappedCalls.allObjects.count != 0) {
@ -72,7 +99,9 @@ callOptions:(GRPCCallOptions *)callOptions {
NSAssert(path.length > 0, @"path must not be empty.");
NSAssert(queue != nil, @"completionQueue must not be empty.");
NSAssert(callOptions, @"callOptions must not be empty.");
if (path.length == 0 || queue == nil || callOptions == nil) return nil;
if (path.length == 0 || queue == nil || callOptions == nil) {
return nil;
}
GRPCWrappedCall *call = nil;
@ -97,6 +126,7 @@ callOptions:(GRPCCallOptions *)callOptions {
call = [[GRPCWrappedCall alloc] initWithUnmanagedCall:unmanagedCall pooledChannel:self];
if (call == nil) {
NSAssert(call != nil, @"Unable to create GRPCWrappedCall object");
grpc_call_unref(unmanagedCall);
return nil;
}
@ -111,16 +141,22 @@ callOptions:(GRPCCallOptions *)callOptions {
return;
}
@synchronized(self) {
// Detect if all objects weakly referenced in _wrappedCalls are (implicitly) removed. In such
// case the channel is no longer referenced by a grpc_call object and can be destroyed after
// a certain delay.
// Detect if all objects weakly referenced in _wrappedCalls are (implicitly) removed.
// _wrappedCalls.count does not work here since the hash table may include deallocated weak
// references. _wrappedCalls.allObjects forces removal of those objects.
if (_wrappedCalls.allObjects.count == 0) {
// No more call has reference to this channel. We may start the timer for destroying the
// channel now.
NSDate *now = [NSDate date];
NSAssert(now != nil, @"Unable to create NSDate object 'now'.");
_lastTimedDestroy = now;
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)_destroyDelay * NSEC_PER_SEC),
_timerQueue, ^{
@synchronized(self) {
// Check _lastTimedDestroy against now in case more calls are created (and
// maybe destroyed) after this dispatch_async. In that case the current
// dispatch_after block should be discarded; the channel should be
// destroyed in a later dispatch_after block.
if (now != nil && self->_lastTimedDestroy == now) {
self->_wrappedChannel = nil;
self->_lastTimedDestroy = nil;
@ -145,38 +181,6 @@ callOptions:(GRPCCallOptions *)callOptions {
}
}
@end
@implementation GRPCPooledChannel (Test)
- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
destroyDelay:(NSTimeInterval)destroyDelay {
NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
if (channelConfiguration == nil) {
return nil;
}
if ((self = [super init])) {
_channelConfiguration = [channelConfiguration copy];
_destroyDelay = destroyDelay;
_wrappedCalls = [NSHashTable weakObjectsHashTable];
_wrappedChannel = nil;
_lastTimedDestroy = nil;
#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
if (@available(iOS 8.0, macOS 10.10, *)) {
_timerQueue = dispatch_queue_create(NULL,
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
} else {
#else
{
#endif
_timerQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
}
}
return self;
}
- (GRPCChannel *)wrappedChannel {
GRPCChannel *channel = nil;
@synchronized(self) {

@ -40,8 +40,8 @@
}
- (instancetype)initWithEngine:(stream_engine *)engine {
NSAssert(engine != NULL, @"Cronet engine cannot be empty.");
if (!engine) {
[NSException raise:NSInvalidArgumentException format:@"Cronet engine is NULL. Set it first."];
return nil;
}
if ((self = [super init])) {
@ -65,14 +65,12 @@
@implementation GRPCCronetChannelFactory
+ (instancetype)sharedInstance {
[NSException raise:NSInvalidArgumentException
format:@"Must enable macro GRPC_COMPILE_WITH_CRONET to build Cronet channel."];
NSAssert(NO, @"Must enable macro GRPC_COMPILE_WITH_CRONET to build Cronet channel.");
return nil;
}
- (grpc_channel *)createChannelWithHost:(NSString *)host channelArgs:(NSDictionary *)args {
[NSException raise:NSInvalidArgumentException
format:@"Must enable macro GRPC_COMPILE_WITH_CRONET to build Cronet channel."];
NSAssert(NO, @"Must enable macro GRPC_COMPILE_WITH_CRONET to build Cronet channel.");
return NULL;
}

@ -32,7 +32,6 @@
#import "GRPCCronetChannelFactory.h"
#import "GRPCSecureChannelFactory.h"
#import "NSDictionary+GRPC.h"
#import "utilities.h"
#import "version.h"
NS_ASSUME_NONNULL_BEGIN
@ -42,7 +41,7 @@ static NSMutableDictionary *gHostCache;
@implementation GRPCHost {
NSString *_PEMRootCertificates;
NSString *_PEMPrivateKey;
NSString *_pemCertChain;
NSString *_PEMCertificateChain;
}
+ (nullable instancetype)hostWithAddress:(NSString *)address {
@ -96,7 +95,7 @@ static NSMutableDictionary *gHostCache;
error:(NSError **)errorPtr {
_PEMRootCertificates = [pemRootCerts copy];
_PEMPrivateKey = [pemPrivateKey copy];
_pemCertChain = [pemCertChain copy];
_PEMCertificateChain = [pemCertChain copy];
return YES;
}
@ -113,7 +112,7 @@ static NSMutableDictionary *gHostCache;
options.connectMaxBackoff = (NSTimeInterval)_maxConnectBackoff / 1000;
options.PEMRootCertificates = _PEMRootCertificates;
options.PEMPrivateKey = _PEMPrivateKey;
options.PEMCertChain = _pemCertChain;
options.PEMCertificateChain = _PEMCertificateChain;
options.hostNameOverride = _hostNameOverride;
#ifdef GRPC_COMPILE_WITH_CRONET
// By old API logic, insecure channel precedes Cronet channel; Cronet channel preceeds default

@ -22,7 +22,6 @@
#import "ChannelArgsUtil.h"
#import "GRPCChannel.h"
#import "utilities.h"
@implementation GRPCSecureChannelFactory {
grpc_channel_credentials *_channelCreds;
@ -116,6 +115,10 @@
}
- (grpc_channel *)createChannelWithHost:(NSString *)host channelArgs:(NSDictionary *)args {
NSAssert(host.length != 0, @"host cannot be empty");
if (host.length == 0) {
return NULL;
}
grpc_channel_args *coreChannelArgs = GRPCBuildChannelArgs([args copy]);
grpc_channel *unmanagedChannel =
grpc_secure_channel_create(_channelCreds, host.UTF8String, coreChannelArgs, NULL);

@ -30,7 +30,6 @@
#import "NSData+GRPC.h"
#import "NSDictionary+GRPC.h"
#import "NSError+GRPC.h"
#import "utilities.h"
#import "GRPCOpBatchLog.h"
@ -237,6 +236,7 @@
#pragma mark GRPCWrappedCall
@implementation GRPCWrappedCall {
// pooledChannel holds weak reference to this object so this is ok
GRPCPooledChannel *_pooledChannel;
grpc_call *_call;
}
@ -275,8 +275,7 @@
for (GRPCOperation *operation in operations) {
ops_array[i++] = operation.op;
}
grpc_call_error error;
error = grpc_call_start_batch(_call, ops_array, nops, (__bridge_retained void *)(^(bool success) {
grpc_call_error error = grpc_call_start_batch(_call, ops_array, nops, (__bridge_retained void *)(^(bool success) {
if (!success) {
if (errorHandler) {
errorHandler();
@ -291,11 +290,7 @@
NULL);
gpr_free(ops_array);
if (error != GRPC_CALL_OK) {
[NSException
raise:NSInternalInconsistencyException
format:@"A precondition for calling grpc_call_start_batch wasn't met. Error %i", error];
}
NSAssert(error == GRPC_CALL_OK, @"Error starting a batch of operations: %i", error);
}
}
}

@ -1,22 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#import <Foundation/Foundation.h>
/** Raise exception when condition not met. */
#define GRPCAssert(condition, errorString) NSAssert(condition, errorString)
Loading…
Cancel
Save