Merge pull request #17578 from muxi/grpccall-safety

Make gRPC ObjC thread safety right
pull/17690/head
Muxi Yan 6 years ago committed by GitHub
commit 412c44992b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 371
      src/objective-c/GRPCClient/GRPCCall.m
  2. 3
      src/objective-c/RxLibrary/GRXBufferedPipe.h
  3. 21
      src/objective-c/RxLibrary/GRXBufferedPipe.m
  4. 21
      src/objective-c/RxLibrary/GRXConcurrentWriteable.h
  5. 102
      src/objective-c/RxLibrary/GRXConcurrentWriteable.m
  6. 6
      src/objective-c/RxLibrary/GRXForwardingWriter.h
  7. 55
      src/objective-c/RxLibrary/GRXForwardingWriter.m
  8. 2
      src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
  9. 30
      src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
  10. 4
      src/objective-c/RxLibrary/GRXWriter.h

@ -56,7 +56,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Make them read-write.
@property(atomic, strong) NSDictionary *responseHeaders;
@property(atomic, strong) NSDictionary *responseTrailers;
@property(atomic) BOOL isWaitingForToken;
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
@ -425,9 +424,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
// queue
dispatch_queue_t _responseQueue;
// Whether the call is finished. If it is, should not call finishWithError again.
BOOL _finished;
// The OAuth2 token fetched from a token provider.
NSString *_fetchedOauth2AccessToken;
}
@ -448,24 +444,28 @@ const char *kCFStreamVarName = "grpc_cfstream";
return;
}
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
switch (callSafety) {
case GRPCCallSafetyDefault:
callFlags[hostAndPath] = @0;
break;
case GRPCCallSafetyIdempotentRequest:
callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
break;
case GRPCCallSafetyCacheableRequest:
callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
break;
default:
break;
@synchronized(callFlags) {
switch (callSafety) {
case GRPCCallSafetyDefault:
callFlags[hostAndPath] = @0;
break;
case GRPCCallSafetyIdempotentRequest:
callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
break;
case GRPCCallSafetyCacheableRequest:
callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
break;
default:
break;
}
}
}
+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
return [callFlags[hostAndPath] intValue];
@synchronized(callFlags) {
return [callFlags[hostAndPath] intValue];
}
}
// Designated initializer
@ -506,7 +506,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
_callOptions = [callOptions copy];
// Serial queue to invoke the non-reentrant methods of the grpc_call object.
_callQueue = dispatch_queue_create("io.grpc.call", NULL);
_callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
_requestWriter = requestWriter;
@ -523,66 +523,48 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
if (_state != GRXWriterStateNotStarted) {
return;
@synchronized(self) {
if (_state != GRXWriterStateNotStarted) {
return;
}
_responseQueue = queue;
}
_responseQueue = queue;
}
#pragma mark Finish
// This function should support being called within a @synchronized(self) block in another function
// Should not manipulate _requestWriter for deadlock prevention.
- (void)finishWithError:(NSError *)errorOrNil {
@synchronized(self) {
if (_state == GRXWriterStateFinished) {
return;
}
_state = GRXWriterStateFinished;
}
// If there were still request messages coming, stop them.
@synchronized(_requestWriter) {
_requestWriter.state = GRXWriterStateFinished;
}
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
} else {
[_responseWriteable enqueueSuccessfulCompletion];
}
[GRPCConnectivityMonitor unregisterObserver:self];
// If the call isn't retained anywhere else, it can be deallocated now.
_retainSelf = nil;
}
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
} else {
[_responseWriteable enqueueSuccessfulCompletion];
}
- (void)cancelCall {
// Can be called from any thread, any number of times.
@synchronized(self) {
[_wrappedCall cancel];
// If the call isn't retained anywhere else, it can be deallocated now.
_retainSelf = nil;
}
}
- (void)cancel {
@synchronized(self) {
[self cancelCall];
self.isWaitingForToken = NO;
}
[self
maybeFinishWithError:[NSError
errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
}
- (void)maybeFinishWithError:(NSError *)errorOrNil {
BOOL toFinish = NO;
@synchronized(self) {
if (_finished == NO) {
_finished = YES;
toFinish = YES;
if (_state == GRXWriterStateFinished) {
return;
}
[self finishWithError:[NSError
errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
[_wrappedCall cancel];
}
if (toFinish == YES) {
[self finishWithError:errorOrNil];
}
_requestWriter.state = GRXWriterStateFinished;
}
- (void)dealloc {
@ -609,21 +591,24 @@ const char *kCFStreamVarName = "grpc_cfstream";
// TODO(jcanizales): Rename to readResponseIfNotPaused.
- (void)startNextRead {
@synchronized(self) {
if (self.state == GRXWriterStatePaused) {
if (_state != GRXWriterStateStarted) {
return;
}
}
dispatch_async(_callQueue, ^{
__weak GRPCCall *weakSelf = self;
__weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable;
[self startReadWithHandler:^(grpc_byte_buffer *message) {
__strong GRPCCall *strongSelf = weakSelf;
__strong GRXConcurrentWriteable *strongWriteable = weakWriteable;
NSLog(@"message received");
if (message == NULL) {
// No more messages from the server
return;
}
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf == nil) {
grpc_byte_buffer_destroy(message);
return;
}
NSData *data = [NSData grpc_dataWithByteBuffer:message];
grpc_byte_buffer_destroy(message);
if (!data) {
@ -631,21 +616,26 @@ const char *kCFStreamVarName = "grpc_cfstream";
// don't want to throw, because the app shouldn't crash for a behavior
// that's on the hands of any server to have. Instead we finish and ask
// the server to cancel.
[strongSelf cancelCall];
[strongSelf
maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeResourceExhausted
userInfo:@{
NSLocalizedDescriptionKey :
@"Client does not have enough memory to "
@"hold the server response."
}]];
return;
@synchronized(strongSelf) {
[strongSelf
finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeResourceExhausted
userInfo:@{
NSLocalizedDescriptionKey :
@"Client does not have enough memory to "
@"hold the server response."
}]];
[strongSelf->_wrappedCall cancel];
}
strongSelf->_requestWriter.state = GRXWriterStateFinished;
} else {
@synchronized(strongSelf) {
[strongSelf->_responseWriteable enqueueValue:data
completionHandler:^{
[strongSelf startNextRead];
}];
}
}
[strongWriteable enqueueValue:data
completionHandler:^{
[strongSelf startNextRead];
}];
}];
});
}
@ -684,11 +674,13 @@ const char *kCFStreamVarName = "grpc_cfstream";
initWithMetadata:headers
flags:callSafetyFlags
handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
if (!_unaryCall) {
[_wrappedCall startBatchWithOperations:@[ op ]];
} else {
[_unaryOpBatch addObject:op];
}
dispatch_async(_callQueue, ^{
if (!self->_unaryCall) {
[self->_wrappedCall startBatchWithOperations:@[ op ]];
} else {
[self->_unaryOpBatch addObject:op];
}
});
}
#pragma mark GRXWriteable implementation
@ -703,9 +695,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
@synchronized(strongSelf->_requestWriter) {
strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
};
@ -721,13 +711,17 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)writeValue:(id)value {
// TODO(jcanizales): Throw/assert if value isn't NSData.
NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
@synchronized(self) {
if (_state == GRXWriterStateFinished) {
return;
}
}
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
@synchronized(_requestWriter) {
_requestWriter.state = GRXWriterStatePaused;
}
_requestWriter.state = GRXWriterStatePaused;
dispatch_async(_callQueue, ^{
// Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
@ -766,17 +760,20 @@ const char *kCFStreamVarName = "grpc_cfstream";
// The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
// TODO(jcanizales): Add error handlers for async failures
[_wrappedCall
startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
[_wrappedCall
startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
dispatch_async(_callQueue, ^{
// TODO(jcanizales): Add error handlers for async failures
[self->_wrappedCall
startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
[self->_wrappedCall
startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
});
}
- (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
// Response headers received.
NSLog(@"response received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseHeaders = headers;
@ -784,6 +781,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
}
completionHandler:^(NSError *error, NSDictionary *trailers) {
NSLog(@"completion received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseTrailers = trailers;
@ -794,112 +792,114 @@ const char *kCFStreamVarName = "grpc_cfstream";
[userInfo addEntriesFromDictionary:error.userInfo];
}
userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
// TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
// called before this one, so an error might end up with trailers but no headers. We
// shouldn't call finishWithError until ater both blocks are called. It is also when
// this is done that we can provide a merged view of response headers and trailers in a
// thread-safe way.
if (strongSelf.responseHeaders) {
userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
}
// Since gRPC core does not guarantee the headers block being called before this block,
// responseHeaders might be nil.
userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
}
[strongSelf maybeFinishWithError:error];
[strongSelf finishWithError:error];
strongSelf->_requestWriter.state = GRXWriterStateFinished;
}
}];
// Now that the RPC has been initiated, request writes can start.
@synchronized(_requestWriter) {
[_requestWriter startWithWriteable:self];
}
}
#pragma mark GRXWriter implementation
// Lock acquired inside startWithWriteable:
- (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
_responseWriteable =
[[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
GRPCPooledChannel *channel =
[[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path
completionQueue:[GRPCCompletionQueue completionQueue]
callOptions:_callOptions];
if (wrappedCall == nil) {
[self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeUnavailable
userInfo:@{
NSLocalizedDescriptionKey :
@"Failed to create call or channel."
}]];
return;
}
@synchronized(self) {
_wrappedCall = wrappedCall;
}
if (_state == GRXWriterStateFinished) {
return;
}
_responseWriteable =
[[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
GRPCPooledChannel *channel =
[[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
_wrappedCall = [channel wrappedCallWithPath:_path
completionQueue:[GRPCCompletionQueue completionQueue]
callOptions:_callOptions];
if (_wrappedCall == nil) {
[self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeUnavailable
userInfo:@{
NSLocalizedDescriptionKey :
@"Failed to create call or channel."
}]];
return;
}
[self sendHeaders];
[self invokeCall];
[self sendHeaders];
[self invokeCall];
// Connectivity monitor is not required for CFStream
char *enableCFStream = getenv(kCFStreamVarName);
if (enableCFStream == nil || enableCFStream[0] != '1') {
[GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
// Connectivity monitor is not required for CFStream
char *enableCFStream = getenv(kCFStreamVarName);
if (enableCFStream == nil || enableCFStream[0] != '1') {
[GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
}
}
// Now that the RPC has been initiated, request writes can start.
[_requestWriter startWithWriteable:self];
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
id<GRPCAuthorizationProtocol> tokenProvider = nil;
@synchronized(self) {
_state = GRXWriterStateStarted;
}
// Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
// This makes RPCs in which the call isn't externally retained possible (as long as it is started
// before being autoreleased).
// Care is taken not to retain self strongly in any of the blocks used in this implementation, so
// that the life of the instance is determined by this retain cycle.
_retainSelf = self;
// Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
// This makes RPCs in which the call isn't externally retained possible (as long as it is
// started before being autoreleased). Care is taken not to retain self strongly in any of the
// blocks used in this implementation, so that the life of the instance is determined by this
// retain cycle.
_retainSelf = self;
if (_callOptions == nil) {
GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
if (_serverName.length != 0) {
callOptions.serverAuthority = _serverName;
}
if (_timeout > 0) {
callOptions.timeout = _timeout;
}
uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
if (callFlags != 0) {
if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
_callSafety = GRPCCallSafetyIdempotentRequest;
} else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
_callSafety = GRPCCallSafetyCacheableRequest;
}
}
if (_callOptions == nil) {
GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
if (_serverName.length != 0) {
callOptions.serverAuthority = _serverName;
}
if (_timeout > 0) {
callOptions.timeout = _timeout;
}
uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
if (callFlags != 0) {
if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
_callSafety = GRPCCallSafetyIdempotentRequest;
} else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
_callSafety = GRPCCallSafetyCacheableRequest;
id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
if (tokenProvider != nil) {
callOptions.authTokenProvider = tokenProvider;
}
_callOptions = callOptions;
}
id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
if (tokenProvider != nil) {
callOptions.authTokenProvider = tokenProvider;
}
_callOptions = callOptions;
NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
@"authTokenProvider and oauth2AccessToken cannot be set at the same time");
tokenProvider = _callOptions.authTokenProvider;
}
NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
@"authTokenProvider and oauth2AccessToken cannot be set at the same time");
if (_callOptions.authTokenProvider != nil) {
@synchronized(self) {
self.isWaitingForToken = YES;
}
[_callOptions.authTokenProvider getTokenWithHandler:^(NSString *token) {
@synchronized(self) {
if (self.isWaitingForToken) {
if (token) {
self->_fetchedOauth2AccessToken = [token copy];
if (tokenProvider != nil) {
__weak typeof(self) weakSelf = self;
[tokenProvider getTokenWithHandler:^(NSString *token) {
__strong typeof(self) strongSelf = weakSelf;
if (strongSelf) {
@synchronized(strongSelf) {
if (strongSelf->_state == GRXWriterStateNotStarted) {
if (token) {
strongSelf->_fetchedOauth2AccessToken = [token copy];
}
}
[self startCallWithWriteable:writeable];
self.isWaitingForToken = NO;
}
[strongSelf startCallWithWriteable:writeable];
}
}];
} else {
@ -938,16 +938,21 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)connectivityChanged:(NSNotification *)note {
// Cancel underlying call upon this notification
// Cancel underlying call upon this notification.
// Retain because connectivity manager only keeps weak reference to GRPCCall.
__strong GRPCCall *strongSelf = self;
if (strongSelf) {
[self cancelCall];
[self
maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeUnavailable
userInfo:@{
NSLocalizedDescriptionKey : @"Connectivity lost."
}]];
@synchronized(strongSelf) {
[_wrappedCall cancel];
[strongSelf
finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeUnavailable
userInfo:@{
NSLocalizedDescriptionKey : @"Connectivity lost."
}]];
}
strongSelf->_requestWriter.state = GRXWriterStateFinished;
}
}

@ -36,8 +36,7 @@
* crash. If you want to react to flow control signals to prevent that, instead of using this class
* you can implement an object that conforms to GRXWriter.
*
* Thread-safety:
* The methods of an object of this class should not be called concurrently from different threads.
* Thread-safety: the methods of this class are thread-safe.
*/
@interface GRXBufferedPipe : GRXWriter<GRXWriteable>

@ -51,16 +51,22 @@
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
value = [value copy];
}
__weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
[weakSelf.writeable writeValue:value];
@synchronized(self) {
if (self->_state == GRXWriterStateFinished) {
return;
}
[self.writeable writeValue:value];
}
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
__weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^{
[weakSelf finishWithError:errorOrNil];
if (self->_state == GRXWriterStateFinished) {
return;
}
[self finishWithError:errorOrNil];
});
}
@ -100,14 +106,15 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
self.writeable = writeable;
_state = GRXWriterStateStarted;
@synchronized(self) {
self.writeable = writeable;
_state = GRXWriterStateStarted;
}
dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
[self.writeable writesFinishedWithError:errorOrNil];
self.state = GRXWriterStateFinished;
}
- (void)dealloc {

@ -23,10 +23,10 @@
/**
* This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a
* GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last
* message sent to it (no matter what messages are sent to the wrapper, in what order, nor from
* which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g.
* by the app cancelling the writes), no further messages are sent to the writeable except
* GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is
* the last message sent to it (no matter what messages are sent to the wrapper, in what order, nor
* from which thread). It also guarantees that, if cancelWithError: is called (e.g. by the app
* cancelling the writes), no further messages are sent to the writeable except
* writesFinishedWithError:.
*
* TODO(jcanizales): Let the user specify another queue for the writeable callbacks.
@ -43,21 +43,22 @@
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable;
/**
* Enqueues writeValue: to be sent to the writeable in the main thread.
* The passed handler is invoked from the main thread after writeValue: returns.
* Enqueues writeValue: to be sent to the writeable from the designated dispatch queue.
* The passed handler is invoked from designated dispatch queue after writeValue: returns.
*/
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler;
/**
* Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that
* message is sent to the writeable, all other methods of this object are effectively noops.
* Enqueues writesFinishedWithError:nil to be sent to the writeable in the designated dispatch
* queue. After that message is sent to the writeable, all other methods of this object are
* effectively noops.
*/
- (void)enqueueSuccessfulCompletion;
/**
* If the writeable has not yet received a writesFinishedWithError: message, this will enqueue one
* to be sent to it in the main thread, and cancel all other pending messages to the writeable
* enqueued by this object (both past and future).
* to be sent to it in the designated dispatch queue, and cancel all other pending messages to the
* writeable enqueued by this object (both past and future).
* The error argument cannot be nil.
*/
- (void)cancelWithError:(NSError *)error;

@ -27,8 +27,15 @@
@implementation GRXConcurrentWriteable {
dispatch_queue_t _writeableQueue;
// This ensures that writesFinishedWithError: is only sent once to the writeable.
// This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected
// by _writeableQueue.
BOOL _alreadyFinished;
// This ivar ensures that a cancelWithError: call prevents further values to be sent to
// self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be
// protected by self lock.
BOOL _cancelled;
}
- (instancetype)init {
@ -41,6 +48,8 @@
if (self = [super init]) {
_writeableQueue = queue;
_writeable = writeable;
_alreadyFinished = NO;
_cancelled = NO;
}
return self;
}
@ -51,78 +60,63 @@
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
dispatch_async(_writeableQueue, ^{
// We're racing a possible cancellation performed by another thread. To turn all already-
// enqueued messages into noops, cancellation nillifies the writeable property. If we get it
// before it's nil, we won the race.
id<GRXWriteable> writeable = self.writeable;
if (writeable) {
[writeable writeValue:value];
handler();
if (self->_alreadyFinished) {
return;
}
@synchronized(self) {
if (self->_cancelled) {
return;
}
}
[self.writeable writeValue:value];
handler();
});
}
- (void)enqueueSuccessfulCompletion {
__weak typeof(self) weakSelf = self;
dispatch_async(_writeableQueue, ^{
typeof(self) strongSelf = weakSelf;
if (strongSelf) {
BOOL finished = NO;
@synchronized(strongSelf) {
if (!strongSelf->_alreadyFinished) {
strongSelf->_alreadyFinished = YES;
} else {
finished = YES;
}
}
if (!finished) {
// Cancellation is now impossible. None of the other three blocks can run concurrently with
// this one.
[strongSelf.writeable writesFinishedWithError:nil];
// Skip any possible message to the wrapped writeable enqueued after this one.
strongSelf.writeable = nil;
if (self->_alreadyFinished) {
return;
}
@synchronized(self) {
if (self->_cancelled) {
return;
}
}
[self.writeable writesFinishedWithError:nil];
// Skip any possible message to the wrapped writeable enqueued after this one.
self->_alreadyFinished = YES;
self.writeable = nil;
});
}
- (void)cancelWithError:(NSError *)error {
NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
BOOL finished = NO;
NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion.");
@synchronized(self) {
if (!_alreadyFinished) {
_alreadyFinished = YES;
} else {
finished = YES;
}
self->_cancelled = YES;
}
if (!finished) {
// Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
// nillify writeable because we might be running concurrently with the blocks in
// _writeableQueue, and assignment with ARC isn't atomic.
id<GRXWriteable> writeable = self.writeable;
self.writeable = nil;
dispatch_async(_writeableQueue, ^{
if (self->_alreadyFinished) {
// a cancel or a successful completion is already issued
return;
}
[self.writeable writesFinishedWithError:error];
dispatch_async(_writeableQueue, ^{
[writeable writesFinishedWithError:error];
});
}
// Skip any possible message to the wrapped writeable enqueued after this one.
self->_alreadyFinished = YES;
self.writeable = nil;
});
}
- (void)cancelSilently {
BOOL finished = NO;
@synchronized(self) {
if (!_alreadyFinished) {
_alreadyFinished = YES;
} else {
finished = YES;
dispatch_async(_writeableQueue, ^{
if (self->_alreadyFinished) {
return;
}
}
if (!finished) {
// Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
// nillify writeable because we might be running concurrently with the blocks in
// _writeableQueue, and assignment with ARC isn't atomic.
self.writeable = nil;
}
});
}
@end

@ -25,11 +25,7 @@
* input writer, and for classes that represent objects with input and
* output sequences of values, like an RPC.
*
* Thread-safety:
* All messages sent to this object need to be serialized. When it is started, the writer it wraps
* is started in the same thread. Manual state changes are propagated to the wrapped writer in the
* same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be
* serialized with any message sent to this object.
* Thread-safety: the methods of this class are thread safe.
*/
@interface GRXForwardingWriter : GRXWriter
- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;

@ -54,48 +54,65 @@
[writeable writesFinishedWithError:errorOrNil];
}
// This is used to stop the input writer. It nillifies our reference to it
// to release it.
- (void)finishInput {
GRXWriter *writer = _writer;
_writer = nil;
writer.state = GRXWriterStateFinished;
}
#pragma mark GRXWriteable implementation
- (void)writeValue:(id)value {
[_writeable writeValue:value];
@synchronized(self) {
[_writeable writeValue:value];
}
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
_writer = nil;
[self finishOutputWithError:errorOrNil];
@synchronized(self) {
_writer = nil;
[self finishOutputWithError:errorOrNil];
}
}
#pragma mark GRXWriter implementation
- (GRXWriterState)state {
return _writer ? _writer.state : GRXWriterStateFinished;
GRXWriter *copiedWriter;
@synchronized(self) {
copiedWriter = _writer;
}
return copiedWriter ? copiedWriter.state : GRXWriterStateFinished;
}
- (void)setState:(GRXWriterState)state {
GRXWriter *copiedWriter = nil;
if (state == GRXWriterStateFinished) {
_writeable = nil;
[self finishInput];
@synchronized(self) {
_writeable = nil;
copiedWriter = _writer;
_writer = nil;
}
copiedWriter.state = GRXWriterStateFinished;
} else {
_writer.state = state;
@synchronized(self) {
copiedWriter = _writer;
}
copiedWriter.state = state;
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
_writeable = writeable;
[_writer startWithWriteable:self];
GRXWriter *copiedWriter = nil;
@synchronized(self) {
_writeable = writeable;
copiedWriter = _writer;
}
[copiedWriter startWithWriteable:self];
}
- (void)finishWithError:(NSError *)errorOrNil {
[self finishOutputWithError:errorOrNil];
[self finishInput];
GRXWriter *copiedWriter = nil;
@synchronized(self) {
[self finishOutputWithError:errorOrNil];
copiedWriter = _writer;
_writer = nil;
}
copiedWriter.state = GRXWriterStateFinished;
}
@end

@ -23,6 +23,8 @@
/**
* Utility to construct GRXWriter instances from values that are immediately available when
* required.
*
* Thread safety: the methods of this class are thread safe.
*/
@interface GRXImmediateSingleWriter : GRXImmediateWriter

@ -20,7 +20,6 @@
@implementation GRXImmediateSingleWriter {
id _value;
id<GRXWriteable> _writeable;
}
@synthesize state = _state;
@ -38,17 +37,16 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
_state = GRXWriterStateStarted;
_writeable = writeable;
[writeable writeValue:_value];
[self finish];
}
- (void)finish {
_state = GRXWriterStateFinished;
_value = nil;
id<GRXWriteable> writeable = _writeable;
_writeable = nil;
id copiedValue = nil;
@synchronized(self) {
if (_state != GRXWriterStateNotStarted) {
return;
}
copiedValue = _value;
_value = nil;
_state = GRXWriterStateFinished;
}
[writeable writeValue:copiedValue];
[writeable writesFinishedWithError:nil];
}
@ -65,9 +63,11 @@
// the original \a map function returns a new Writer of another type. So we
// need to override this function here.
- (GRXWriter *)map:(id (^)(id))map {
// Since _value is available when creating the object, we can simply
// apply the map and store the output.
_value = map(_value);
@synchronized(self) {
// Since _value is available when creating the object, we can simply
// apply the map and store the output.
_value = map(_value);
}
return self;
}

@ -80,9 +80,9 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
* This property can be used to query the current state of the writer, which determines how it might
* currently use its writeable. Some state transitions can be triggered by setting this property to
* the corresponding value, and that's useful for advanced use cases like pausing an writer. For
* more details, see the documentation of the enum further down.
* more details, see the documentation of the enum further down. The property is thread safe.
*/
@property(nonatomic) GRXWriterState state;
@property GRXWriterState state;
/**
* Transition to the Started state, and start sending messages to the writeable (a reference to it

Loading…
Cancel
Save