mirror of https://github.com/grpc/grpc.git
Merge pull request #1647 from jcanizales/buffered-pipe-and-ping-pong-test
Adds GRXBufferedPipe and the PingPong interop test using itpull/1455/merge
commit
9a688dfb06
3 changed files with 292 additions and 11 deletions
@ -0,0 +1,59 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#import <Foundation/Foundation.h> |
||||
|
||||
#import "GRXWriteable.h" |
||||
#import "GRXWriter.h" |
||||
|
||||
// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
|
||||
// to -startWithWriteable:).
|
||||
// Once it is started, whatever values are written into it (via -didReceiveValue:) will be
|
||||
// propagated immediately, unless flow control prevents it.
|
||||
// If it is throttled and keeps receiving values, as well as if it receives values before being
|
||||
// started, it will buffer them and propagate them in order as soon as its state becomes
|
||||
// GRXWriterStateStarted.
|
||||
// If it receives an error (via -didFinishWithError:), it will drop any buffered values and
|
||||
// propagate the error immediately.
|
||||
//
|
||||
// Beware that a pipe of this type can't prevent receiving more values when it is paused (for
|
||||
// example if used to write data to a congested network connection). Because in such situations the
|
||||
// pipe will keep buffering all data written to it, your application could run out of memory and
|
||||
// crash. If you want to react to flow control signals to prevent that, instead of using this class
|
||||
// you can implement an object that conforms to GRXWriter.
|
||||
@interface GRXBufferedPipe : NSObject<GRXWriteable, GRXWriter> |
||||
|
||||
// Convenience constructor.
|
||||
+ (instancetype)pipe; |
||||
|
||||
@end |
@ -0,0 +1,146 @@ |
||||
/* |
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#import "GRXBufferedPipe.h" |
||||
|
||||
@implementation GRXBufferedPipe { |
||||
id<GRXWriteable> _writeable; |
||||
NSMutableArray *_queue; |
||||
BOOL _inputIsFinished; |
||||
NSError *_errorOrNil; |
||||
} |
||||
|
||||
@synthesize state = _state; |
||||
|
||||
+ (instancetype)pipe { |
||||
return [[self alloc] init]; |
||||
} |
||||
|
||||
- (instancetype)init { |
||||
if (self = [super init]) { |
||||
_queue = [NSMutableArray array]; |
||||
_state = GRXWriterStateNotStarted; |
||||
} |
||||
return self; |
||||
} |
||||
|
||||
- (id)popValue { |
||||
id value = _queue[0]; |
||||
[_queue removeObjectAtIndex:0]; |
||||
return value; |
||||
} |
||||
|
||||
- (void)writeBufferUntilPausedOrStopped { |
||||
while (_state == GRXWriterStateStarted && _queue.count > 0) { |
||||
[_writeable didReceiveValue:[self popValue]]; |
||||
} |
||||
if (_inputIsFinished && _queue.count == 0) { |
||||
// Our writer finished normally while we were paused or not-started-yet. |
||||
[self finishWithError:_errorOrNil]; |
||||
} |
||||
} |
||||
|
||||
#pragma mark GRXWriteable implementation |
||||
|
||||
// Returns whether events can be simply propagated to the other end of the pipe. |
||||
- (BOOL)shouldFastForward { |
||||
return _state == GRXWriterStateStarted && _queue.count == 0; |
||||
} |
||||
|
||||
- (void)didReceiveValue:(id)value { |
||||
if (self.shouldFastForward) { |
||||
// Skip the queue. |
||||
[_writeable didReceiveValue:value]; |
||||
} else { |
||||
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. |
||||
// So just buffer the new value. |
||||
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. |
||||
if ([value respondsToSelector:@selector(copy)]) { |
||||
value = [value copy]; |
||||
} |
||||
[_queue addObject:value]; |
||||
} |
||||
} |
||||
|
||||
- (void)didFinishWithError:(NSError *)errorOrNil { |
||||
_inputIsFinished = YES; |
||||
_errorOrNil = errorOrNil; |
||||
if (errorOrNil || self.shouldFastForward) { |
||||
// No need to write pending values. |
||||
[self finishWithError:_errorOrNil]; |
||||
} |
||||
} |
||||
|
||||
#pragma mark GRXWriter implementation |
||||
|
||||
- (void)setState:(GRXWriterState)newState { |
||||
// Manual transitions are only allowed from the started or paused states. |
||||
if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { |
||||
return; |
||||
} |
||||
|
||||
switch (newState) { |
||||
case GRXWriterStateFinished: |
||||
_state = newState; |
||||
_queue = nil; |
||||
// Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the |
||||
// writeable to be messaged anymore. |
||||
_writeable = nil; |
||||
return; |
||||
case GRXWriterStatePaused: |
||||
_state = newState; |
||||
return; |
||||
case GRXWriterStateStarted: |
||||
if (_state == GRXWriterStatePaused) { |
||||
_state = newState; |
||||
[self writeBufferUntilPausedOrStopped]; |
||||
} |
||||
return; |
||||
case GRXWriterStateNotStarted: |
||||
return; |
||||
} |
||||
} |
||||
|
||||
- (void)startWithWriteable:(id<GRXWriteable>)writeable { |
||||
_state = GRXWriterStateStarted; |
||||
_writeable = writeable; |
||||
[self writeBufferUntilPausedOrStopped]; |
||||
} |
||||
|
||||
- (void)finishWithError:(NSError *)errorOrNil { |
||||
id<GRXWriteable> writeable = _writeable; |
||||
self.state = GRXWriterStateFinished; |
||||
[writeable didFinishWithError:errorOrNil]; |
||||
} |
||||
|
||||
@end |
Loading…
Reference in new issue