diff --git a/src/objective-c/ProtoRPC/ProtoRPC.h b/src/objective-c/ProtoRPC/ProtoRPC.h index 92fc0eb815c..25f025313e0 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.h +++ b/src/objective-c/ProtoRPC/ProtoRPC.h @@ -126,33 +126,6 @@ NS_ASSUME_NONNULL_BEGIN */ - (void)cancel; -/** - * Finish the RPC request and half-close the call. The server may still send messages and/or - * trailers to the client. This method should only be used when flow control is enabled. If flow - * control is not enabled, It will be automatically called upon message received. - */ -- (void)finish; - -/** - * Tell gRPC to receive another message. - * - * This method should only be used when flow control is enabled. If flow control is enabled, gRPC - * will only receive additional messages after the user indicates so by using either - * receiveNextMessage: or receiveNextMessages: methods. If flow control is not enabled, messages - * will be automatically received after the previous one is delivered. - */ -- (void)receiveNextMessage; - -/** - * Tell gRPC to receive another N message. - * - * This method should only be used when flow control is enabled. If flow control is enabled, the - * messages received from the server are buffered in gRPC until the user want to receive the next - * message. If flow control is not enabled, messages will be automatically received after the - * previous one is delivered. - */ -- (void)receiveNextMessages:(NSUInteger)numberOfMessages; - @end /** A client-streaming RPC call with Protobuf. */ diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m index 309c86f3d6b..396f58f8835 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.m +++ b/src/objective-c/ProtoRPC/ProtoRPC.m @@ -27,14 +27,6 @@ #import #import -#pragma mark - GRPCStreamingProtoCall - -@interface GRPCStreamingProtoCall () - -@property(nonatomic, readonly) GRPCCallOptions *callOptions; - -@end - @implementation GRPCUnaryResponseHandler { void (^_responseHandler)(id, NSError *); dispatch_queue_t _responseDispatchQueue; @@ -108,35 +100,25 @@ - (void)start { [_call start]; + [_call receiveNextMessage]; [_call writeMessage:_message]; - - if (!_call.callOptions.flowControlEnabled) { - [_call finish]; - } + [_call finish]; } - (void)cancel { [_call cancel]; } -- (void)finish { - [_call finish]; -} - -#pragma mark - GRPCControllableProtoCallFlow +@end -- (void)receiveNextMessage { - [_call receiveNextMessage]; -} -- (void)receiveNextMessages:(NSUInteger)numberOfMessages { - [_call receiveNextMessages:numberOfMessages]; -} +@interface GRPCStreamingProtoCall () @end @implementation GRPCStreamingProtoCall { GRPCRequestOptions *_requestOptions; id _handler; + GRPCCallOptions *_callOptions; Class _responseClass; GRPCCall2 *_call; @@ -244,8 +226,6 @@ [copiedCall finish]; } -#pragma mark - GRPCControllableProtoCallFlow - - (void)receiveNextMessage { [self receiveNextMessages:1]; } @@ -257,8 +237,6 @@ [copiedCall receiveNextMessages:numberOfMessages]; } -#pragma mark - GRPCResponseHandler - - (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata { @synchronized(self) { if (initialMetadata != nil && @@ -336,8 +314,6 @@ } } -#pragma mark - GRPCDispatchable - - (dispatch_queue_t)dispatchQueue { return _dispatchQueue; } diff --git a/src/objective-c/tests/InteropTests/InteropTests.m b/src/objective-c/tests/InteropTests/InteropTests.m index 8a06c30ed26..bae94ef2394 100644 --- a/src/objective-c/tests/InteropTests/InteropTests.m +++ b/src/objective-c/tests/InteropTests/InteropTests.m @@ -701,87 +701,6 @@ static dispatch_once_t initGlobalInterceptorFactory; [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; } -- (void)testUnaryRPCWithV2APIFlowControl { - XCTAssertNotNil([[self class] host]); - __weak XCTestExpectation *expectReceive = - [self expectationWithDescription:@"LargeUnaryWithV2API received message"]; - __weak XCTestExpectation *expectComplete = - [self expectationWithDescription:@"LargeUnaryWithV2API received complete"]; - - const int responseSize = 123; - RMTSimpleRequest *request = [RMTSimpleRequest message]; - request.responseType = RMTPayloadType_Compressable; - request.responseSize = responseSize; - request.payload.body = [NSMutableData dataWithLength:456]; - - GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; - // For backwards compatibility - options.transportType = [[self class] transportType]; - options.transport = [[self class] transport]; - options.PEMRootCertificates = [[self class] PEMRootCertificates]; - options.hostNameOverride = [[self class] hostNameOverride]; - options.flowControlEnabled = YES; - - GRPCUnaryProtoCall *call = [_service - unaryCallWithMessage:request - responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil - messageCallback:^(id message) { - XCTAssertNotNil(message); - if (message) { - RMTSimpleResponse *expectedResponse = - [RMTSimpleResponse message]; - expectedResponse.payload.type = RMTPayloadType_Compressable; - expectedResponse.payload.body = - [NSMutableData dataWithLength:responseSize]; - XCTAssertEqualObjects(message, expectedResponse); - [expectReceive fulfill]; - [call finish]; - } - } - closeCallback:^(NSDictionary *trailingMetadata, NSError *error) { - XCTAssertNil(error, @"Unexpected error: %@", error); - [expectComplete fulfill]; - }] - callOptions:options]; - [call start]; - [call receiveNextMessage]; - [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; -} - -- (void)testUnaryRPCWithV2APIFlowControlNotReceivingMessage { - XCTAssertNotNil([[self class] host]); - - __weak XCTestExpectation *expectTimeout = - [self expectationWithDescription: - @"testUnaryRPCWithV2APIFlowControlNotReceivingMessage received timeout"]; - - RMTSimpleRequest *request = [RMTSimpleRequest message]; - request.responseType = RMTPayloadType_Compressable; - request.responseSize = 123; - request.payload.body = [NSMutableData dataWithLength:456]; - - GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; - // For backwards compatibility - options.transportType = [[self class] transportType]; - options.transport = [[self class] transport]; - options.PEMRootCertificates = [[self class] PEMRootCertificates]; - options.hostNameOverride = [[self class] hostNameOverride]; - options.flowControlEnabled = YES; - - GRPCUnaryProtoCall *call = [_service - unaryCallWithMessage:request - responseHandler:[[InteropTestsBlockCallbacks alloc] - initWithInitialMetadataCallback:nil - messageCallback:^(id message) { - XCTFail("Not expected to receive message"); - } - closeCallback:nil] - callOptions:options]; - [call start]; - XCTWaiterResult result = [XCTWaiter waitForExpectations:@[ expectTimeout ] timeout:5]; - XCTAssertEqual(XCTWaiterResultTimedOut, result, @"Unexpected waiter result %@", @(result)); -} - - (void)testConcurrentRPCsWithErrorsWithV2API { NSMutableArray *completeExpectations = [NSMutableArray array]; NSMutableArray *calls = [NSMutableArray array]; @@ -1159,72 +1078,9 @@ static dispatch_once_t initGlobalInterceptorFactory; [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil]; } -- (void)testPingPongUnaryRPCWithFlowControl { +- (void)testPingPongRPCWithFlowControl { XCTAssertNotNil([[self class] host]); - __weak XCTestExpectation *expectation = - [self expectationWithDescription:@"UnaryPingPongWithV2API"]; - - NSNumber *requestSize = @321; - NSArray *responseSizes = @[ @123, @234 ]; - - RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message]; - request.payload.body = [NSMutableData dataWithLength:requestSize.unsignedIntegerValue]; - for (NSNumber *responseSize in responseSizes) { - RMTResponseParameters *parameters = [RMTResponseParameters message]; - parameters.size = responseSize.intValue; - [request.responseParametersArray addObject:parameters]; - } - - GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; - // For backwards compatibility - options.transportType = [[self class] transportType]; - options.transport = [[self class] transport]; - options.PEMRootCertificates = [[self class] PEMRootCertificates]; - options.hostNameOverride = [[self class] hostNameOverride]; - options.flowControlEnabled = YES; - - __block GRPCUnaryProtoCall *call = nil; - __block int receivedMessageCount = 0; - - id messageHandler = ^(id message) { - NSLog(@"received message %@", @(receivedMessageCount)); - XCTAssertLessThan(receivedMessageCount, responseSizes.count, - "More than expected messages received"); - id expected = - [RMTStreamingOutputCallResponse messageWithPayloadSize:responseSizes[receivedMessageCount]]; - XCTAssertEqualObjects(message, expected); - - receivedMessageCount += 1; - if (receivedMessageCount < responseSizes.count) { - [call receiveNextMessage]; - } else { - [call finish]; - } - }; - - id closeHandler = ^(NSDictionary *trailingMetadata, NSError *error) { - XCTAssertNil(error, @"Finished with unexpected error: %@", error); - [expectation fulfill]; - }; - - InteropTestsBlockCallbacks *handler = - [[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil - messageCallback:messageHandler - closeCallback:closeHandler - writeMessageCallback:nil]; - - call = [_service streamingOutputCallWithMessage:request - responseHandler:handler - callOptions:options]; - [call start]; - [call receiveNextMessage]; - [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil]; -} - -- (void)testPingPongStreamingRPCWithFlowControl { - XCTAssertNotNil([[self class] host]); - __weak XCTestExpectation *expectation = - [self expectationWithDescription:@"StreamingPingPongWithV2API"]; + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"]; NSArray *requests = @[ @27182, @8, @1828, @45904 ]; NSArray *responses = @[ @31415, @9, @2653, @58979 ];