Allow multiple pending reads

pull/17936/head
Muxi Yan 6 years ago
parent cd2755b72b
commit 0d9ae65c70
  1. 2
      src/objective-c/GRPCClient/GRPCCall.h
  2. 41
      src/objective-c/GRPCClient/GRPCCall.m
  3. 12
      src/objective-c/ProtoRPC/ProtoRPC.h
  4. 6
      src/objective-c/ProtoRPC/ProtoRPC.m
  5. 41
      src/objective-c/tests/APIv2Tests/APIv2Tests.m

@ -265,7 +265,7 @@ extern NSString *const kGRPCTrailersKey;
*/ */
- (void)finish; - (void)finish;
- (void)receiveNextMessage; - (void)receiveNextMessages:(NSUInteger)numberOfMessages;
/** /**
* Get a copy of the original call options. * Get a copy of the original call options.

@ -70,7 +70,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
callOptions:(GRPCCallOptions *)callOptions callOptions:(GRPCCallOptions *)callOptions
writeDone:(void (^)(void))writeDone; writeDone:(void (^)(void))writeDone;
- (void)receiveNextMessage; - (void)receiveNextMessages:(NSUInteger)numberOfMessages;
@end @end
@ -123,7 +123,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
/** Flags whether call has been finished. */ /** Flags whether call has been finished. */
BOOL _finished; BOOL _finished;
/** The number of pending messages receiving requests. */ /** The number of pending messages receiving requests. */
NSUInteger _pendingReceiveNextMessage; NSUInteger _pendingReceiveNextMessages;
} }
- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
@ -212,9 +212,9 @@ const char *kCFStreamVarName = "grpc_cfstream";
if (_callOptions.initialMetadata) { if (_callOptions.initialMetadata) {
[_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata]; [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
} }
if (_pendingReceiveNextMessage) { if (_pendingReceiveNextMessages > 0) {
[_call receiveNextMessage]; [_call receiveNextMessages:_pendingReceiveNextMessages];
_pendingReceiveNextMessage = NO; _pendingReceiveNextMessages = 0;
} }
copiedCall = _call; copiedCall = _call;
} }
@ -399,16 +399,16 @@ const char *kCFStreamVarName = "grpc_cfstream";
} }
} }
- (void)receiveNextMessage { - (void)receiveNextMessages:(NSUInteger)numberOfMessages {
GRPCCall *copiedCall = nil; GRPCCall *copiedCall = nil;
@synchronized(self) { @synchronized(self) {
copiedCall = _call; copiedCall = _call;
if (copiedCall == nil) { if (copiedCall == nil) {
_pendingReceiveNextMessage = YES; _pendingReceiveNextMessages += numberOfMessages;
return; return;
} }
} }
[copiedCall receiveNextMessage]; [copiedCall receiveNextMessages:numberOfMessages];
} }
@end @end
@ -481,8 +481,8 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Indicate a read request to core is pending. // Indicate a read request to core is pending.
BOOL _pendingCoreRead; BOOL _pendingCoreRead;
// Indicate a read message request from user. // Indicate pending read message request from user.
BOOL _pendingReceiveNextMessage; NSUInteger _pendingReceiveNextMessages;
} }
@synthesize state = _state; @synthesize state = _state;
@ -591,7 +591,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
_responseQueue = dispatch_get_main_queue(); _responseQueue = dispatch_get_main_queue();
// do not start a read until initial metadata is received // do not start a read until initial metadata is received
_pendingReceiveNextMessage = NO; _pendingReceiveNextMessages = 0;
_pendingCoreRead = YES; _pendingCoreRead = YES;
} }
return self; return self;
@ -669,11 +669,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
if (_state != GRXWriterStateStarted) { if (_state != GRXWriterStateStarted) {
return; return;
} }
if (_callOptions.enableFlowControl && (_pendingCoreRead || !_pendingReceiveNextMessage)) { if (_callOptions.enableFlowControl && (_pendingCoreRead || _pendingReceiveNextMessages == 0)) {
return; return;
} }
_pendingCoreRead = YES; _pendingCoreRead = YES;
_pendingReceiveNextMessage = NO; _pendingReceiveNextMessages--;
} }
dispatch_async(_callQueue, ^{ dispatch_async(_callQueue, ^{
@ -696,7 +696,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
// that's on the hands of any server to have. Instead we finish and ask // that's on the hands of any server to have. Instead we finish and ask
// the server to cancel. // the server to cancel.
@synchronized(strongSelf) { @synchronized(strongSelf) {
strongSelf->_pendingReceiveNextMessage = NO; strongSelf->_pendingReceiveNextMessages--;
[strongSelf [strongSelf
finishWithError:[NSError errorWithDomain:kGRPCErrorDomain finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeResourceExhausted code:GRPCErrorCodeResourceExhausted
@ -764,13 +764,13 @@ const char *kCFStreamVarName = "grpc_cfstream";
}); });
} }
- (void)receiveNextMessage { - (void)receiveNextMessages:(NSUInteger)numberOfMessages {
if (numberOfMessages == 0) {
return;
}
@synchronized(self) { @synchronized(self) {
// Duplicate invocation of this method. Return _pendingReceiveNextMessages += numberOfMessages;
if (_pendingReceiveNextMessage) {
return;
}
_pendingReceiveNextMessage = YES;
[self maybeStartNextRead]; [self maybeStartNextRead];
} }
} }
@ -793,7 +793,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
} }
} }
}; };
GRPCOpSendMessage *op = GRPCOpSendMessage *op =
[[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler]; [[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler];
if (!_unaryCall) { if (!_unaryCall) {

@ -136,8 +136,20 @@ NS_ASSUME_NONNULL_BEGIN
*/ */
- (void)finish; - (void)finish;
/**
* Tell gRPC to receive the next message. If flow control is enabled, the messages received from the
* server are buffered in gRPC until the user want to receive the next message. If flow control is
* not enabled, messages will be automatically received after the previous one is delivered.
*/
- (void)receiveNextMessage; - (void)receiveNextMessage;
/**
* Tell gRPC to receive the next N message. If flow control is enabled, the messages received from
* the server are buffered in gRPC until the user want to receive the next message. If flow control
* is not enabled, messages will be automatically received after the previous one is delivered.
*/
- (void)receiveNextMessages:(NSUInteger)numberOfMessages;
@end @end
NS_ASSUME_NONNULL_END NS_ASSUME_NONNULL_END

@ -72,6 +72,7 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
- (void)start { - (void)start {
[_call start]; [_call start];
[_call receiveNextMessage];
[_call writeMessage:_message]; [_call writeMessage:_message];
[_call finish]; [_call finish];
} }
@ -198,11 +199,14 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
} }
- (void)receiveNextMessage { - (void)receiveNextMessage {
[self receiveNextMessages:1];
}
- (void)receiveNextMessages:(NSUInteger)numberOfMessages {
GRPCCall2 *copiedCall; GRPCCall2 *copiedCall;
@synchronized(self) { @synchronized(self) {
copiedCall = _call; copiedCall = _call;
} }
[copiedCall receiveNextMessage]; [copiedCall receiveNextMessages:numberOfMessages];
} }
- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata { - (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata {

@ -502,7 +502,7 @@ static const NSTimeInterval kInvertedTimeout = 2;
[self waitForExpectationsWithTimeout:kTestTimeout handler:nil]; [self waitForExpectationsWithTimeout:kTestTimeout handler:nil];
} }
- (void)testWriteFlowControl { - (void)testFlowControlWrite {
__weak XCTestExpectation *expectWriteData = __weak XCTestExpectation *expectWriteData =
[self expectationWithDescription:@"Reported write data"]; [self expectationWithDescription:@"Reported write data"];
@ -531,7 +531,7 @@ static const NSTimeInterval kInvertedTimeout = 2;
callOptions:options]; callOptions:options];
[call start]; [call start];
[call receiveNextMessage]; [call receiveNextMessages:1];
[call writeData:[request data]]; [call writeData:[request data]];
// Wait for 3 seconds and make sure we do not receive the response // Wait for 3 seconds and make sure we do not receive the response
@ -540,7 +540,7 @@ static const NSTimeInterval kInvertedTimeout = 2;
[call finish]; [call finish];
} }
- (void)testReadFlowControl { - (void)testFlowControlRead {
__weak __block XCTestExpectation *expectBlockedMessage = __weak __block XCTestExpectation *expectBlockedMessage =
[self expectationWithDescription:@"Message not delivered without recvNextMessage"]; [self expectationWithDescription:@"Message not delivered without recvNextMessage"];
__weak __block XCTestExpectation *expectPassedMessage = nil; __weak __block XCTestExpectation *expectPassedMessage = nil;
@ -593,29 +593,31 @@ static const NSTimeInterval kInvertedTimeout = 2;
expectPassedClose = [self expectationWithDescription:@"Close delivered after receiveNextMessage"]; expectPassedClose = [self expectationWithDescription:@"Close delivered after receiveNextMessage"];
unblocked = YES; unblocked = YES;
[call receiveNextMessage]; [call receiveNextMessages:1];
[self waitForExpectationsWithTimeout:kTestTimeout handler:nil]; [self waitForExpectationsWithTimeout:kTestTimeout handler:nil];
} }
- (void)testReadFlowControlMultipleMessages { - (void)testFlowControlMultipleMessages {
XCTestExpectation *expectPassedMessage = __weak XCTestExpectation *expectPassedMessage =
[self expectationWithDescription:@"two messages delivered with receiveNextMessage"]; [self expectationWithDescription:@"two messages delivered with receiveNextMessage"];
expectPassedMessage.expectedFulfillmentCount = 2; expectPassedMessage.expectedFulfillmentCount = 2;
XCTestExpectation *expectBlockedMessage = __weak XCTestExpectation *expectBlockedMessage =
[self expectationWithDescription:@"Message 3 not delivered"]; [self expectationWithDescription:@"Message 3 not delivered"];
expectBlockedMessage.inverted = YES; expectBlockedMessage.inverted = YES;
__weak XCTestExpectation *expectWriteTwice =
[self expectationWithDescription:@"Write 2 messages done"];
expectWriteTwice.expectedFulfillmentCount = 2;
RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message]; RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message];
RMTResponseParameters *parameters = [RMTResponseParameters message]; RMTResponseParameters *parameters = [RMTResponseParameters message];
parameters.size = kSimpleDataLength; parameters.size = kSimpleDataLength;
[request.responseParametersArray addObject:parameters]; [request.responseParametersArray addObject:parameters];
[request.responseParametersArray addObject:parameters];
request.payload.body = [NSMutableData dataWithLength:kSimpleDataLength]; request.payload.body = [NSMutableData dataWithLength:kSimpleDataLength];
GRPCRequestOptions *callRequest = GRPCRequestOptions *callRequest =
[[GRPCRequestOptions alloc] initWithHost:(NSString *)kHostAddress [[GRPCRequestOptions alloc] initWithHost:(NSString *)kHostAddress
path:kOutputStreamingCallMethod.HTTPPath path:kFullDuplexCallMethod.HTTPPath
safety:GRPCCallSafetyDefault]; safety:GRPCCallSafetyDefault];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
options.transportType = GRPCTransportTypeInsecure; options.transportType = GRPCTransportTypeInsecure;
@ -628,25 +630,26 @@ static const NSTimeInterval kInvertedTimeout = 2;
messageCallback:^(NSData *message) { messageCallback:^(NSData *message) {
if (messageId <= 1) { if (messageId <= 1) {
[expectPassedMessage fulfill]; [expectPassedMessage fulfill];
if (messageId < 1) {
[call receiveNextMessage];
}
} else { } else {
[expectBlockedMessage fulfill]; [expectBlockedMessage fulfill];
} }
messageId++; messageId++;
} }
closeCallback:nil] closeCallback:nil
writeDataCallback:^{
[expectWriteTwice fulfill];
}]
callOptions:options]; callOptions:options];
[call receiveNextMessage]; [call receiveNextMessages:2];
[call start]; [call start];
[call writeData:[request data]]; [call writeData:[request data]];
[call writeData:[request data]];
[self waitForExpectationsWithTimeout:kInvertedTimeout handler:nil]; [self waitForExpectationsWithTimeout:kInvertedTimeout handler:nil];
} }
- (void)testReadFlowControlReadyBeforeStart { - (void)testFlowControlReadReadyBeforeStart {
__weak XCTestExpectation *expectPassedMessage = __weak XCTestExpectation *expectPassedMessage =
[self expectationWithDescription:@"Message delivered with receiveNextMessage"]; [self expectationWithDescription:@"Message delivered with receiveNextMessage"];
__weak XCTestExpectation *expectPassedClose = __weak XCTestExpectation *expectPassedClose =
@ -678,7 +681,7 @@ static const NSTimeInterval kInvertedTimeout = 2;
}] }]
callOptions:options]; callOptions:options];
[call receiveNextMessage]; [call receiveNextMessages:1];
[call start]; [call start];
[call writeData:[request data]]; [call writeData:[request data]];
[call finish]; [call finish];
@ -686,7 +689,7 @@ static const NSTimeInterval kInvertedTimeout = 2;
[self waitForExpectationsWithTimeout:kInvertedTimeout handler:nil]; [self waitForExpectationsWithTimeout:kInvertedTimeout handler:nil];
} }
- (void)testReadFlowControlReadyAfterStart { - (void)testFlowControlReadReadyAfterStart {
__weak XCTestExpectation *expectPassedMessage = __weak XCTestExpectation *expectPassedMessage =
[self expectationWithDescription:@"Message delivered with receiveNextMessage"]; [self expectationWithDescription:@"Message delivered with receiveNextMessage"];
__weak XCTestExpectation *expectPassedClose = __weak XCTestExpectation *expectPassedClose =
@ -720,14 +723,14 @@ static const NSTimeInterval kInvertedTimeout = 2;
callOptions:options]; callOptions:options];
[call start]; [call start];
[call receiveNextMessage]; [call receiveNextMessages:1];
[call writeData:[request data]]; [call writeData:[request data]];
[call finish]; [call finish];
[self waitForExpectationsWithTimeout:kTestTimeout handler:nil]; [self waitForExpectationsWithTimeout:kTestTimeout handler:nil];
} }
- (void)testReadFlowControlNonBlockingFailure { - (void)testFlowControlReadNonBlockingFailure {
__weak XCTestExpectation *completion = [self expectationWithDescription:@"RPC completed."]; __weak XCTestExpectation *completion = [self expectationWithDescription:@"RPC completed."];
GRPCRequestOptions *requestOptions = GRPCRequestOptions *requestOptions =

Loading…
Cancel
Save