From cd2755b72bd0414f1d91bba64999aa2e7afc38e5 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 26 Mar 2019 14:43:21 -0700 Subject: [PATCH] polish tests and fix issues --- src/objective-c/GRPCClient/GRPCCall.m | 12 ++++ src/objective-c/tests/APIv2Tests/APIv2Tests.m | 71 ++++++++++++++++--- 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index e70aa5caba0..e9a0f43cdde 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -122,6 +122,8 @@ const char *kCFStreamVarName = "grpc_cfstream"; BOOL _canceled; /** Flags whether call has been finished. */ BOOL _finished; + /** The number of pending messages receiving requests. */ + NSUInteger _pendingReceiveNextMessage; } - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions @@ -210,6 +212,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; if (_callOptions.initialMetadata) { [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata]; } + if (_pendingReceiveNextMessage) { + [_call receiveNextMessage]; + _pendingReceiveNextMessage = NO; + } copiedCall = _call; } @@ -397,6 +403,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; GRPCCall *copiedCall = nil; @synchronized(self) { copiedCall = _call; + if (copiedCall == nil) { + _pendingReceiveNextMessage = YES; + return; + } } [copiedCall receiveNextMessage]; } @@ -662,6 +672,8 @@ const char *kCFStreamVarName = "grpc_cfstream"; if (_callOptions.enableFlowControl && (_pendingCoreRead || !_pendingReceiveNextMessage)) { return; } + _pendingCoreRead = YES; + _pendingReceiveNextMessage = NO; } dispatch_async(_callQueue, ^{ diff --git a/src/objective-c/tests/APIv2Tests/APIv2Tests.m b/src/objective-c/tests/APIv2Tests/APIv2Tests.m index 6509b79b822..17df4ab933e 100644 --- a/src/objective-c/tests/APIv2Tests/APIv2Tests.m +++ b/src/objective-c/tests/APIv2Tests/APIv2Tests.m @@ -39,11 +39,12 @@ static NSString *const kService = @"TestService"; static GRPCProtoMethod *kInexistentMethod; static GRPCProtoMethod *kEmptyCallMethod; static GRPCProtoMethod *kUnaryCallMethod; +static GRPCProtoMethod *kOutputStreamingCallMethod; static GRPCProtoMethod *kFullDuplexCallMethod; static const int kSimpleDataLength = 100; -static const NSTimeInterval kTestTimeout = 16; +static const NSTimeInterval kTestTimeout = 8; static const NSTimeInterval kInvertedTimeout = 2; // Reveal the _class ivar for testing access @@ -143,6 +144,8 @@ static const NSTimeInterval kInvertedTimeout = 2; [[GRPCProtoMethod alloc] initWithPackage:kPackage service:kService method:@"EmptyCall"]; kUnaryCallMethod = [[GRPCProtoMethod alloc] initWithPackage:kPackage service:kService method:@"UnaryCall"]; + kOutputStreamingCallMethod = + [[GRPCProtoMethod alloc] initWithPackage:kPackage service:kService method:@"StreamingOutputCall"]; kFullDuplexCallMethod = [[GRPCProtoMethod alloc] initWithPackage:kPackage service:kService method:@"FullDuplexCall"]; } @@ -547,10 +550,8 @@ static const NSTimeInterval kInvertedTimeout = 2; expectBlockedMessage.inverted = YES; expectBlockedClose.inverted = YES; - RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message]; - RMTResponseParameters *parameters = [RMTResponseParameters message]; - parameters.size = kSimpleDataLength; - [request.responseParametersArray addObject:parameters]; + RMTSimpleRequest *request = [RMTSimpleRequest message]; + request.responseSize = kSimpleDataLength; request.payload.body = [NSMutableData dataWithLength:kSimpleDataLength]; GRPCRequestOptions *callRequest = @@ -597,15 +598,62 @@ static const NSTimeInterval kInvertedTimeout = 2; [self waitForExpectationsWithTimeout:kTestTimeout handler:nil]; } -- (void)testReadFlowControlReadyBeforeStart { - __weak XCTestExpectation *expectBlockedMessage = - [self expectationWithDescription:@"Message delivered with receiveNextMessage"]; +- (void)testReadFlowControlMultipleMessages { + XCTestExpectation *expectPassedMessage = + [self expectationWithDescription:@"two messages delivered with receiveNextMessage"]; + expectPassedMessage.expectedFulfillmentCount = 2; + XCTestExpectation *expectBlockedMessage = + [self expectationWithDescription:@"Message 3 not delivered"]; expectBlockedMessage.inverted = YES; RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message]; RMTResponseParameters *parameters = [RMTResponseParameters message]; parameters.size = kSimpleDataLength; [request.responseParametersArray addObject:parameters]; + [request.responseParametersArray addObject:parameters]; + request.payload.body = [NSMutableData dataWithLength:kSimpleDataLength]; + + GRPCRequestOptions *callRequest = + [[GRPCRequestOptions alloc] initWithHost:(NSString *)kHostAddress + path:kOutputStreamingCallMethod.HTTPPath + safety:GRPCCallSafetyDefault]; + GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; + options.transportType = GRPCTransportTypeInsecure; + options.enableFlowControl = YES; + __block NSUInteger messageId = 0; + __block GRPCCall2 *call = [[GRPCCall2 alloc] + initWithRequestOptions:callRequest + responseHandler:[[ClientTestsBlockCallbacks alloc] + initWithInitialMetadataCallback:nil + messageCallback:^(NSData *message) { + if (messageId <= 1) { + [expectPassedMessage fulfill]; + if (messageId < 1) { + [call receiveNextMessage]; + } + } else { + [expectBlockedMessage fulfill]; + } + messageId++; + } + closeCallback:nil] + callOptions:options]; + + [call receiveNextMessage]; + [call start]; + [call writeData:[request data]]; + + [self waitForExpectationsWithTimeout:kInvertedTimeout handler:nil]; +} + +- (void)testReadFlowControlReadyBeforeStart { + __weak XCTestExpectation *expectPassedMessage = + [self expectationWithDescription:@"Message delivered with receiveNextMessage"]; + __weak XCTestExpectation *expectPassedClose = + [self expectationWithDescription:@"Close delivered with receiveNextMessage"]; + + RMTSimpleRequest *request = [RMTSimpleRequest message]; + request.responseSize = kSimpleDataLength; request.payload.body = [NSMutableData dataWithLength:kSimpleDataLength]; GRPCRequestOptions *callRequest = @@ -621,10 +669,13 @@ static const NSTimeInterval kInvertedTimeout = 2; responseHandler:[[ClientTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil messageCallback:^(NSData *message) { - [expectBlockedMessage fulfill]; + [expectPassedMessage fulfill]; XCTAssertFalse(closed); } - closeCallback:nil] + closeCallback:^(NSDictionary *ttrailers, NSError *error) { + closed = YES; + [expectPassedClose fulfill]; + }] callOptions:options]; [call receiveNextMessage];