|
|
|
@ -62,7 +62,7 @@ |
|
|
|
|
|
|
|
|
|
- (void)writeBufferUntilPausedOrStopped { |
|
|
|
|
while (_state == GRXWriterStateStarted && _queue.count > 0) { |
|
|
|
|
[_writeable didReceiveValue:[self popValue]]; |
|
|
|
|
[_writeable writeValue:[self popValue]]; |
|
|
|
|
} |
|
|
|
|
if (_inputIsFinished && _queue.count == 0) { |
|
|
|
|
// Our writer finished normally while we were paused or not-started-yet. |
|
|
|
@ -77,10 +77,10 @@ |
|
|
|
|
return _state == GRXWriterStateStarted && _queue.count == 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
- (void)didReceiveValue:(id)value { |
|
|
|
|
- (void)writeValue:(id)value { |
|
|
|
|
if (self.shouldFastForward) { |
|
|
|
|
// Skip the queue. |
|
|
|
|
[_writeable didReceiveValue:value]; |
|
|
|
|
[_writeable writeValue:value]; |
|
|
|
|
} else { |
|
|
|
|
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. |
|
|
|
|
// So just buffer the new value. |
|
|
|
|