Comments on BufferedPipe

pull/11070/head
Muxi Yan 8 years ago
parent 033db460ba
commit d6545bb3df
  1. 12
      src/objective-c/RxLibrary/GRXBufferedPipe.m

@ -33,10 +33,13 @@
#import "GRXBufferedPipe.h" #import "GRXBufferedPipe.h"
@interface GRXBufferedPipe ()
@property(atomic) NSError *errorOrNil;
@end
@implementation GRXBufferedPipe { @implementation GRXBufferedPipe {
id<GRXWriteable> _writeable; id<GRXWriteable> _writeable;
BOOL _inputIsFinished; BOOL _inputIsFinished;
NSError *_errorOrNil;
dispatch_queue_t _writeQueue; dispatch_queue_t _writeQueue;
} }
@ -90,7 +93,7 @@
dispatch_async(_writeQueue, ^{ dispatch_async(_writeQueue, ^{
GRXBufferedPipe *strongSelf = weakSelf; GRXBufferedPipe *strongSelf = weakSelf;
if (strongSelf) { if (strongSelf) {
[strongSelf finishWithError:_errorOrNil]; [strongSelf finishWithError:nil];
} }
}); });
} }
@ -123,7 +126,7 @@
return; return;
case GRXWriterStateStarted: case GRXWriterStateStarted:
if (_state == GRXWriterStatePaused) { if (_state == GRXWriterStatePaused) {
_state = newState; _state = newState;
dispatch_resume(_writeQueue); dispatch_resume(_writeQueue);
} }
return; return;
@ -134,9 +137,6 @@
} }
- (void)startWithWriteable:(id<GRXWriteable>)writeable { - (void)startWithWriteable:(id<GRXWriteable>)writeable {
if (_state != GRXWriterStateNotStarted) {
return;
}
_state = GRXWriterStateStarted; _state = GRXWriterStateStarted;
_writeable = writeable; _writeable = writeable;
dispatch_resume(_writeQueue); dispatch_resume(_writeQueue);

Loading…
Cancel
Save