From 142acc965e12b950c4b3d52df22ffaa28b6b62e5 Mon Sep 17 00:00:00 2001 From: Jorge Canizales Date: Fri, 15 May 2015 18:43:34 -0700 Subject: [PATCH 1/4] Adds GRXBufferedPipe --- src/objective-c/RxLibrary/GRXBufferedPipe.h | 53 ++++++++ src/objective-c/RxLibrary/GRXBufferedPipe.m | 142 ++++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 src/objective-c/RxLibrary/GRXBufferedPipe.h create mode 100644 src/objective-c/RxLibrary/GRXBufferedPipe.m diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h new file mode 100644 index 00000000000..02dc6b5fe83 --- /dev/null +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -0,0 +1,53 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import + +#import "GRXWriteable.h" +#import "GRXWriter.h" + +// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed +// to -startWithWriteable:). +// Once it is started, whatever values are written into it (via -didReceiveValue:) will be +// propagated immediately, unless flow control prevents it. +// If it is throttled and keeps receiving values, as well as if it receives values before being +// started, it will buffer them and propagate them in order as soon as its state becomes +// GRXWriterStateStarted. +// If it receives an error (via -didFinishWithError:), it will drop any buffered values and +// propagate the error immediately. +@interface GRXBufferedPipe : NSObject + +// Convenience constructor. ++ (instancetype)pipe; + +@end diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m new file mode 100644 index 00000000000..e1295cef8ae --- /dev/null +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -0,0 +1,142 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import "GRXBufferedPipe.h" + +@implementation GRXBufferedPipe { + id _writeable; + NSMutableArray *_queue; + BOOL _inputIsFinished; + NSError *_errorOrNil; +} + +@synthesize state = _state; + ++ (instancetype)pipe { + return [[self alloc] init]; +} + +- (instancetype)init { + if (self = [super init]) { + _queue = [NSMutableArray array]; + _state = GRXWriterStateNotStarted; + } + return self; +} + +- (id)popValue { + id value = _queue[0]; + [_queue removeObjectAtIndex:0]; + return value; +} + +- (void)writeBufferUntilPausedOrStopped { + while (_state == GRXWriterStateStarted && _queue.count > 0) { + [_writeable didReceiveValue:[self popValue]]; + } + if (_inputIsFinished && _queue.count == 0) { + // Our writer finished normally while we were paused or not-started-yet. + [self finishWithError:_errorOrNil]; + } +} + +#pragma mark GRXWriteable implementation + +// Returns whether events can be simply propagated to the other end of the pipe. +- (BOOL)shouldFastForward { + return _state == GRXWriterStateStarted && _queue.count == 0; +} + +- (void)didReceiveValue:(id)value { + if (self.shouldFastForward) { + // Skip the queue. + [_writeable didReceiveValue:value]; + } else { + // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. + // So just buffer the new value. + [_queue addObject:value]; + } +} + +- (void)didFinishWithError:(NSError *)errorOrNil { + _inputIsFinished = YES; + _errorOrNil = errorOrNil; + if (errorOrNil || self.shouldFastForward) { + // No need to write pending values. + [self finishWithError:_errorOrNil]; + } +} + +#pragma mark GRXWriter implementation + +- (void)setState:(GRXWriterState)newState { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { + return; + } + + switch (newState) { + case GRXWriterStateFinished: + _state = newState; + _queue = nil; + // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the + // writeable to be messaged anymore. + _writeable = nil; + return; + case GRXWriterStatePaused: + _state = newState; + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + [self writeBufferUntilPausedOrStopped]; + } + return; + case GRXWriterStateNotStarted: + return; + } +} + +- (void)startWithWriteable:(id)writeable { + _state = GRXWriterStateStarted; + _writeable = writeable; + [self writeBufferUntilPausedOrStopped]; +} + +- (void)finishWithError:(NSError *)errorOrNil { + id writeable = _writeable; + self.state = GRXWriterStateFinished; + [writeable didFinishWithError:errorOrNil]; +} + +@end From 421f6c9488795a1a125770f258b9cf4feef4f293 Mon Sep 17 00:00:00 2001 From: Jorge Canizales Date: Mon, 18 May 2015 11:51:12 -0700 Subject: [PATCH 2/4] Use BufferedPipe to add PingPong test and fix Cancel test --- .../Sample/SampleTests/RemoteProtoTests.m | 98 ++++++++++++++++--- 1 file changed, 87 insertions(+), 11 deletions(-) diff --git a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m index f4892298341..7cebc0c2a74 100644 --- a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m +++ b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m @@ -36,13 +36,46 @@ #import #import -#import #import +#import +#import #import #import #import #import +// Convenience constructors for the generated proto messages: + +@interface RMTStreamingOutputCallRequest (Constructors) ++ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize + requestedResponseSize:(NSNumber *)responseSize; +@end + +@implementation RMTStreamingOutputCallRequest (Constructors) ++ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize + requestedResponseSize:(NSNumber *)responseSize { + RMTStreamingOutputCallRequest *request = [self message]; + RMTResponseParameters *parameters = [RMTResponseParameters message]; + parameters.size = responseSize.integerValue; + [request.responseParametersArray addObject:parameters]; + request.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue]; + return request; +} +@end + +@interface RMTStreamingOutputCallResponse (Constructors) ++ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize; +@end + +@implementation RMTStreamingOutputCallResponse (Constructors) ++ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize { + RMTStreamingOutputCallResponse * response = [self message]; + response.payload.type = RMTPayloadType_Compressable; + response.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue]; + return response; +} +@end + @interface RemoteProtoTests : XCTestCase @end @@ -70,7 +103,7 @@ [expectation fulfill]; }]; - [self waitForExpectationsWithTimeout:2. handler:nil]; + [self waitForExpectationsWithTimeout:2 handler:nil]; } - (void)testLargeUnaryRPC { @@ -92,7 +125,7 @@ [expectation fulfill]; }]; - [self waitForExpectationsWithTimeout:4. handler:nil]; + [self waitForExpectationsWithTimeout:4 handler:nil]; } - (void)testClientStreamingRPC { @@ -124,7 +157,7 @@ [expectation fulfill]; }]; - [self waitForExpectationsWithTimeout:4. handler:nil]; + [self waitForExpectationsWithTimeout:4 handler:nil]; } - (void)testServerStreamingRPC { @@ -149,10 +182,7 @@ if (response) { XCTAssertLessThan(index, 4, @"More than 4 responses received."); - RMTStreamingOutputCallResponse * expected = [RMTStreamingOutputCallResponse message]; - expected.payload.type = RMTPayloadType_Compressable; - int expectedSize = [expectedSizes[index] unsignedIntegerValue]; - expected.payload.body = [NSMutableData dataWithLength:expectedSize]; + id expected = [RMTStreamingOutputCallResponse messageWithPayloadSize:expectedSizes[index]]; XCTAssertEqualObjects(response, expected); index += 1; } @@ -166,6 +196,49 @@ [self waitForExpectationsWithTimeout:4 handler:nil]; } +- (void)testPingPongRPC { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPong"]; + + NSArray *requests = @[@27182, @8, @1828, @45904]; + NSArray *responses = @[@31415, @9, @2653, @58979]; + + GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init]; + + __block int index = 0; + + id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] + requestedResponseSize:responses[index]]; + [requestsBuffer didReceiveValue:request]; + + [_service fullDuplexCallWithRequestsWriter:requestsBuffer + handler:^(BOOL done, + RMTStreamingOutputCallResponse *response, + NSError *error) { + XCTAssertNil(error, @"Finished with unexpected error: %@", error); + XCTAssertTrue(done || response, @"Event handler called without an event."); + + if (response) { + XCTAssertLessThan(index, 4, @"More than 4 responses received."); + id expected = [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]]; + XCTAssertEqualObjects(response, expected); + index += 1; + if (index < 4) { + id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] + requestedResponseSize:responses[index]]; + [requestsBuffer didReceiveValue:request]; + } else { + [requestsBuffer didFinishWithError:nil]; + } + } + + if (done) { + XCTAssertEqual(index, 4, @"Received %i responses instead of 4.", index); + [expectation fulfill]; + } + }]; + [self waitForExpectationsWithTimeout:2 handler:nil]; +} + - (void)testEmptyStreamRPC { __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"]; [_service fullDuplexCallWithRequestsWriter:[GRXWriter emptyWriter] @@ -176,13 +249,16 @@ XCTAssert(done, @"Unexpected response: %@", response); [expectation fulfill]; }]; - [self waitForExpectationsWithTimeout:4 handler:nil]; + [self waitForExpectationsWithTimeout:2 handler:nil]; } - (void)testCancelAfterBeginRPC { __weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterBegin"]; - // TODO(mlumish): change to writing that blocks instead of writing - ProtoRPC *call = [_service RPCToStreamingInputCallWithRequestsWriter:[GRXWriter emptyWriter] + + // A buffered pipe to which we never write any value acts as a writer that just hangs. + GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init]; + + ProtoRPC *call = [_service RPCToStreamingInputCallWithRequestsWriter:requestsBuffer handler:^(RMTStreamingInputCallResponse *response, NSError *error) { XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED); From 63e4091dbc0902ea0447bb30940f757478abdef7 Mon Sep 17 00:00:00 2001 From: Jorge Canizales Date: Mon, 18 May 2015 17:41:20 -0700 Subject: [PATCH 3/4] Add warning about the perils of buffering without bounds --- src/objective-c/RxLibrary/GRXBufferedPipe.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index 02dc6b5fe83..4147362ba42 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -45,6 +45,12 @@ // GRXWriterStateStarted. // If it receives an error (via -didFinishWithError:), it will drop any buffered values and // propagate the error immediately. +// +// Beware that a pipe of this type can't prevent receiving more values when it is paused (for +// example if used to write data to a congested network connection). Because in such situations the +// pipe will keep buffering all data written to it, your application could run out of memory and +// crash. If you want to react to flow control signals to prevent that, instead of using this class +// you can implement an object that conforms to GRXWriter. @interface GRXBufferedPipe : NSObject // Convenience constructor. From 42e47b462ea4c45496e2367e3bfc168ab433600b Mon Sep 17 00:00:00 2001 From: Jorge Canizales Date: Mon, 18 May 2015 23:38:17 -0700 Subject: [PATCH 4/4] Copy values before buffering them. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit So they aren’t mutated before being written later. --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index e1295cef8ae..3ef470f89ff 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -84,6 +84,10 @@ } else { // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. // So just buffer the new value. + // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. + if ([value respondsToSelector:@selector(copy)]) { + value = [value copy]; + } [_queue addObject:value]; } }