|
|
|
@ -27,8 +27,15 @@ |
|
|
|
|
|
|
|
|
|
@implementation GRXConcurrentWriteable { |
|
|
|
|
dispatch_queue_t _writeableQueue; |
|
|
|
|
// This ensures that writesFinishedWithError: is only sent once to the writeable. |
|
|
|
|
|
|
|
|
|
// This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected |
|
|
|
|
// by _writeableQueue. |
|
|
|
|
BOOL _alreadyFinished; |
|
|
|
|
|
|
|
|
|
// This ivar ensures that a cancelWithError: call prevents further values to be sent to |
|
|
|
|
// self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be |
|
|
|
|
// protected by self lock. |
|
|
|
|
BOOL _cancelled; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
- (instancetype)init { |
|
|
|
@ -42,6 +49,7 @@ |
|
|
|
|
_writeableQueue = queue; |
|
|
|
|
_writeable = writeable; |
|
|
|
|
_alreadyFinished = NO; |
|
|
|
|
_cancelled = NO; |
|
|
|
|
} |
|
|
|
|
return self; |
|
|
|
|
} |
|
|
|
@ -56,6 +64,12 @@ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@synchronized (self) { |
|
|
|
|
if (self->_cancelled) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
[self.writeable writeValue:value]; |
|
|
|
|
handler(); |
|
|
|
|
}); |
|
|
|
@ -63,13 +77,18 @@ |
|
|
|
|
|
|
|
|
|
- (void)enqueueSuccessfulCompletion { |
|
|
|
|
dispatch_async(_writeableQueue, ^{ |
|
|
|
|
@synchronized(self) { |
|
|
|
|
if (self->_alreadyFinished) { |
|
|
|
|
if (self->_alreadyFinished) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
@synchronized (self) { |
|
|
|
|
if (self->_cancelled) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
[self.writeable writesFinishedWithError:nil]; |
|
|
|
|
|
|
|
|
|
// Skip any possible message to the wrapped writeable enqueued after this one. |
|
|
|
|
self->_alreadyFinished = YES; |
|
|
|
|
self.writeable = nil; |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
@ -77,14 +96,17 @@ |
|
|
|
|
- (void)cancelWithError:(NSError *)error { |
|
|
|
|
NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); |
|
|
|
|
@synchronized(self) { |
|
|
|
|
self->_cancelled = YES; |
|
|
|
|
} |
|
|
|
|
dispatch_async(_writeableQueue, ^{ |
|
|
|
|
if (self->_alreadyFinished) { |
|
|
|
|
// a cancel or a successful completion is already issued |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
dispatch_async(_writeableQueue, ^{ |
|
|
|
|
// If enqueueSuccessfulCompletion is already issued, self.writeable is nil and the following |
|
|
|
|
// line is no-op. |
|
|
|
|
[self.writeable writesFinishedWithError:error]; |
|
|
|
|
|
|
|
|
|
// Skip any possible message to the wrapped writeable enqueued after this one. |
|
|
|
|
self->_alreadyFinished = YES; |
|
|
|
|
self.writeable = nil; |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|