From 0d9ae65c70fd0e0ecabdd800ac73a38ee603c890 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 26 Mar 2019 15:45:29 -0700 Subject: [PATCH] Allow multiple pending reads --- src/objective-c/GRPCClient/GRPCCall.h | 2 +- src/objective-c/GRPCClient/GRPCCall.m | 41 +++++++++---------- src/objective-c/ProtoRPC/ProtoRPC.h | 12 ++++++ src/objective-c/ProtoRPC/ProtoRPC.m | 6 ++- src/objective-c/tests/APIv2Tests/APIv2Tests.m | 41 ++++++++++--------- 5 files changed, 60 insertions(+), 42 deletions(-) diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h index aab6ecc4445..decafd5c58e 100644 --- a/src/objective-c/GRPCClient/GRPCCall.h +++ b/src/objective-c/GRPCClient/GRPCCall.h @@ -265,7 +265,7 @@ extern NSString *const kGRPCTrailersKey; */ - (void)finish; -- (void)receiveNextMessage; +- (void)receiveNextMessages:(NSUInteger)numberOfMessages; /** * Get a copy of the original call options. diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index e9a0f43cdde..e2625a63e19 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -70,7 +70,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; callOptions:(GRPCCallOptions *)callOptions writeDone:(void (^)(void))writeDone; -- (void)receiveNextMessage; +- (void)receiveNextMessages:(NSUInteger)numberOfMessages; @end @@ -123,7 +123,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; /** Flags whether call has been finished. */ BOOL _finished; /** The number of pending messages receiving requests. */ - NSUInteger _pendingReceiveNextMessage; + NSUInteger _pendingReceiveNextMessages; } - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions @@ -212,9 +212,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; if (_callOptions.initialMetadata) { [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata]; } - if (_pendingReceiveNextMessage) { - [_call receiveNextMessage]; - _pendingReceiveNextMessage = NO; + if (_pendingReceiveNextMessages > 0) { + [_call receiveNextMessages:_pendingReceiveNextMessages]; + _pendingReceiveNextMessages = 0; } copiedCall = _call; } @@ -399,16 +399,16 @@ const char *kCFStreamVarName = "grpc_cfstream"; } } -- (void)receiveNextMessage { +- (void)receiveNextMessages:(NSUInteger)numberOfMessages { GRPCCall *copiedCall = nil; @synchronized(self) { copiedCall = _call; if (copiedCall == nil) { - _pendingReceiveNextMessage = YES; + _pendingReceiveNextMessages += numberOfMessages; return; } } - [copiedCall receiveNextMessage]; + [copiedCall receiveNextMessages:numberOfMessages]; } @end @@ -481,8 +481,8 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Indicate a read request to core is pending. BOOL _pendingCoreRead; - // Indicate a read message request from user. - BOOL _pendingReceiveNextMessage; + // Indicate pending read message request from user. + NSUInteger _pendingReceiveNextMessages; } @synthesize state = _state; @@ -591,7 +591,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; _responseQueue = dispatch_get_main_queue(); // do not start a read until initial metadata is received - _pendingReceiveNextMessage = NO; + _pendingReceiveNextMessages = 0; _pendingCoreRead = YES; } return self; @@ -669,11 +669,11 @@ const char *kCFStreamVarName = "grpc_cfstream"; if (_state != GRXWriterStateStarted) { return; } - if (_callOptions.enableFlowControl && (_pendingCoreRead || !_pendingReceiveNextMessage)) { + if (_callOptions.enableFlowControl && (_pendingCoreRead || _pendingReceiveNextMessages == 0)) { return; } _pendingCoreRead = YES; - _pendingReceiveNextMessage = NO; + _pendingReceiveNextMessages--; } dispatch_async(_callQueue, ^{ @@ -696,7 +696,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. @synchronized(strongSelf) { - strongSelf->_pendingReceiveNextMessage = NO; + strongSelf->_pendingReceiveNextMessages--; [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeResourceExhausted @@ -764,13 +764,13 @@ const char *kCFStreamVarName = "grpc_cfstream"; }); } -- (void)receiveNextMessage { +- (void)receiveNextMessages:(NSUInteger)numberOfMessages { + if (numberOfMessages == 0) { + return; + } @synchronized(self) { - // Duplicate invocation of this method. Return - if (_pendingReceiveNextMessage) { - return; - } - _pendingReceiveNextMessage = YES; + _pendingReceiveNextMessages += numberOfMessages; + [self maybeStartNextRead]; } } @@ -793,7 +793,6 @@ const char *kCFStreamVarName = "grpc_cfstream"; } } }; - GRPCOpSendMessage *op = [[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler]; if (!_unaryCall) { diff --git a/src/objective-c/ProtoRPC/ProtoRPC.h b/src/objective-c/ProtoRPC/ProtoRPC.h index 19294763fc1..68dec445c71 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.h +++ b/src/objective-c/ProtoRPC/ProtoRPC.h @@ -136,8 +136,20 @@ NS_ASSUME_NONNULL_BEGIN */ - (void)finish; +/** + * Tell gRPC to receive the next message. If flow control is enabled, the messages received from the + * server are buffered in gRPC until the user want to receive the next message. If flow control is + * not enabled, messages will be automatically received after the previous one is delivered. + */ - (void)receiveNextMessage; +/** + * Tell gRPC to receive the next N message. If flow control is enabled, the messages received from + * the server are buffered in gRPC until the user want to receive the next message. If flow control + * is not enabled, messages will be automatically received after the previous one is delivered. + */ +- (void)receiveNextMessages:(NSUInteger)numberOfMessages; + @end NS_ASSUME_NONNULL_END diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m index e7a56b0f285..80988be318b 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.m +++ b/src/objective-c/ProtoRPC/ProtoRPC.m @@ -72,6 +72,7 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing - (void)start { [_call start]; + [_call receiveNextMessage]; [_call writeMessage:_message]; [_call finish]; } @@ -198,11 +199,14 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing } - (void)receiveNextMessage { + [self receiveNextMessages:1]; +} +- (void)receiveNextMessages:(NSUInteger)numberOfMessages { GRPCCall2 *copiedCall; @synchronized(self) { copiedCall = _call; } - [copiedCall receiveNextMessage]; + [copiedCall receiveNextMessages:numberOfMessages]; } - (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata { diff --git a/src/objective-c/tests/APIv2Tests/APIv2Tests.m b/src/objective-c/tests/APIv2Tests/APIv2Tests.m index 17df4ab933e..0966b4e58f9 100644 --- a/src/objective-c/tests/APIv2Tests/APIv2Tests.m +++ b/src/objective-c/tests/APIv2Tests/APIv2Tests.m @@ -502,7 +502,7 @@ static const NSTimeInterval kInvertedTimeout = 2; [self waitForExpectationsWithTimeout:kTestTimeout handler:nil]; } -- (void)testWriteFlowControl { +- (void)testFlowControlWrite { __weak XCTestExpectation *expectWriteData = [self expectationWithDescription:@"Reported write data"]; @@ -531,7 +531,7 @@ static const NSTimeInterval kInvertedTimeout = 2; callOptions:options]; [call start]; - [call receiveNextMessage]; + [call receiveNextMessages:1]; [call writeData:[request data]]; // Wait for 3 seconds and make sure we do not receive the response @@ -540,7 +540,7 @@ static const NSTimeInterval kInvertedTimeout = 2; [call finish]; } -- (void)testReadFlowControl { +- (void)testFlowControlRead { __weak __block XCTestExpectation *expectBlockedMessage = [self expectationWithDescription:@"Message not delivered without recvNextMessage"]; __weak __block XCTestExpectation *expectPassedMessage = nil; @@ -593,29 +593,31 @@ static const NSTimeInterval kInvertedTimeout = 2; expectPassedClose = [self expectationWithDescription:@"Close delivered after receiveNextMessage"]; unblocked = YES; - [call receiveNextMessage]; + [call receiveNextMessages:1]; [self waitForExpectationsWithTimeout:kTestTimeout handler:nil]; } -- (void)testReadFlowControlMultipleMessages { - XCTestExpectation *expectPassedMessage = +- (void)testFlowControlMultipleMessages { + __weak XCTestExpectation *expectPassedMessage = [self expectationWithDescription:@"two messages delivered with receiveNextMessage"]; expectPassedMessage.expectedFulfillmentCount = 2; - XCTestExpectation *expectBlockedMessage = + __weak XCTestExpectation *expectBlockedMessage = [self expectationWithDescription:@"Message 3 not delivered"]; expectBlockedMessage.inverted = YES; + __weak XCTestExpectation *expectWriteTwice = + [self expectationWithDescription:@"Write 2 messages done"]; + expectWriteTwice.expectedFulfillmentCount = 2; RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message]; RMTResponseParameters *parameters = [RMTResponseParameters message]; parameters.size = kSimpleDataLength; [request.responseParametersArray addObject:parameters]; - [request.responseParametersArray addObject:parameters]; request.payload.body = [NSMutableData dataWithLength:kSimpleDataLength]; GRPCRequestOptions *callRequest = [[GRPCRequestOptions alloc] initWithHost:(NSString *)kHostAddress - path:kOutputStreamingCallMethod.HTTPPath + path:kFullDuplexCallMethod.HTTPPath safety:GRPCCallSafetyDefault]; GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; options.transportType = GRPCTransportTypeInsecure; @@ -628,25 +630,26 @@ static const NSTimeInterval kInvertedTimeout = 2; messageCallback:^(NSData *message) { if (messageId <= 1) { [expectPassedMessage fulfill]; - if (messageId < 1) { - [call receiveNextMessage]; - } } else { [expectBlockedMessage fulfill]; } messageId++; } - closeCallback:nil] + closeCallback:nil + writeDataCallback:^{ + [expectWriteTwice fulfill]; + }] callOptions:options]; - [call receiveNextMessage]; + [call receiveNextMessages:2]; [call start]; [call writeData:[request data]]; + [call writeData:[request data]]; [self waitForExpectationsWithTimeout:kInvertedTimeout handler:nil]; } -- (void)testReadFlowControlReadyBeforeStart { +- (void)testFlowControlReadReadyBeforeStart { __weak XCTestExpectation *expectPassedMessage = [self expectationWithDescription:@"Message delivered with receiveNextMessage"]; __weak XCTestExpectation *expectPassedClose = @@ -678,7 +681,7 @@ static const NSTimeInterval kInvertedTimeout = 2; }] callOptions:options]; - [call receiveNextMessage]; + [call receiveNextMessages:1]; [call start]; [call writeData:[request data]]; [call finish]; @@ -686,7 +689,7 @@ static const NSTimeInterval kInvertedTimeout = 2; [self waitForExpectationsWithTimeout:kInvertedTimeout handler:nil]; } -- (void)testReadFlowControlReadyAfterStart { +- (void)testFlowControlReadReadyAfterStart { __weak XCTestExpectation *expectPassedMessage = [self expectationWithDescription:@"Message delivered with receiveNextMessage"]; __weak XCTestExpectation *expectPassedClose = @@ -720,14 +723,14 @@ static const NSTimeInterval kInvertedTimeout = 2; callOptions:options]; [call start]; - [call receiveNextMessage]; + [call receiveNextMessages:1]; [call writeData:[request data]]; [call finish]; [self waitForExpectationsWithTimeout:kTestTimeout handler:nil]; } -- (void)testReadFlowControlNonBlockingFailure { +- (void)testFlowControlReadNonBlockingFailure { __weak XCTestExpectation *completion = [self expectationWithDescription:@"RPC completed."]; GRPCRequestOptions *requestOptions =