Support configuring dispatch queue in GRPCCall and below

pull/10505/head
Muxi Yan 8 years ago
parent 80b4a8af19
commit 895f3d83da
  1. 7
      src/objective-c/GRPCClient/GRPCCall.h
  2. 16
      src/objective-c/GRPCClient/GRPCCall.m
  3. 4
      src/objective-c/RxLibrary/GRXConcurrentWriteable.h
  4. 10
      src/objective-c/RxLibrary/GRXConcurrentWriteable.m

@ -253,6 +253,13 @@ extern id const kGRPCTrailersKey;
*/ */
+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path; + (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path;
/**
* Set the dispatch queue to be used for queue responses.
*
* This configuration is only effective before the call starts.
*/
- (void)setResponseDispatchQueue:(dispatch_queue_t)queue;
// TODO(jcanizales): Let specify a deadline. As a category of GRXWriter? // TODO(jcanizales): Let specify a deadline. As a category of GRXWriter?
@end @end

@ -113,6 +113,10 @@ static NSMutableDictionary *callFlags;
// the SendClose op is added. // the SendClose op is added.
BOOL _unaryCall; BOOL _unaryCall;
NSMutableArray *_unaryOpBatch; NSMutableArray *_unaryOpBatch;
// The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
// queue
dispatch_queue_t _responseQueue;
} }
@synthesize state = _state; @synthesize state = _state;
@ -175,10 +179,19 @@ static NSMutableDictionary *callFlags;
_unaryCall = YES; _unaryCall = YES;
_unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch]; _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
} }
_responseQueue = dispatch_get_main_queue();
} }
return self; return self;
} }
- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
if (_state != GRXWriterStateNotStarted) {
return;
}
_responseQueue = queue;
}
#pragma mark Finish #pragma mark Finish
- (void)finishWithError:(NSError *)errorOrNil { - (void)finishWithError:(NSError *)errorOrNil {
@ -424,7 +437,8 @@ static NSMutableDictionary *callFlags;
// that the life of the instance is determined by this retain cycle. // that the life of the instance is determined by this retain cycle.
_retainSelf = self; _retainSelf = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable
dispatchQueue:_responseQueue];
_wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path]; _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path];
NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?"); NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");

@ -53,7 +53,9 @@
* The GRXWriteable instance is retained until writesFinishedWithError: is sent to it, and released * The GRXWriteable instance is retained until writesFinishedWithError: is sent to it, and released
* after that. * after that.
*/ */
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable NS_DESIGNATED_INITIALIZER; - (instancetype)initWithWriteable:(id<GRXWriteable>)writeable
dispatchQueue:(dispatch_queue_t)queue NS_DESIGNATED_INITIALIZER;
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable;
/** /**
* Enqueues writeValue: to be sent to the writeable in the main thread. * Enqueues writeValue: to be sent to the writeable in the main thread.

@ -51,14 +51,20 @@
} }
// Designated initializer // Designated initializer
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { - (instancetype)initWithWriteable:(id<GRXWriteable>)writeable
dispatchQueue:(dispatch_queue_t)queue {
if (self = [super init]) { if (self = [super init]) {
_writeableQueue = dispatch_get_main_queue(); _writeableQueue = queue;
_writeable = writeable; _writeable = writeable;
} }
return self; return self;
} }
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable {
return [self initWithWriteable:writeable
dispatchQueue:dispatch_get_main_queue()];
}
- (void)enqueueValue:(id)value completionHandler:(void (^)())handler { - (void)enqueueValue:(id)value completionHandler:(void (^)())handler {
dispatch_async(_writeableQueue, ^{ dispatch_async(_writeableQueue, ^{
// We're racing a possible cancellation performed by another thread. To turn all already- // We're racing a possible cancellation performed by another thread. To turn all already-

Loading…
Cancel
Save