Revert "Adding flow control support API for GRPCUnaryProtoCall (#26969)" (#27047)

This reverts commit 52fece38e6.
pull/27109/head
Denny C. Dai 3 years ago committed by GitHub
parent 533f02642d
commit d17884b7d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      src/objective-c/ProtoRPC/ProtoRPC.h
  2. 34
      src/objective-c/ProtoRPC/ProtoRPC.m
  3. 148
      src/objective-c/tests/InteropTests/InteropTests.m

@ -126,33 +126,6 @@ NS_ASSUME_NONNULL_BEGIN
*/
- (void)cancel;
/**
* Finish the RPC request and half-close the call. The server may still send messages and/or
* trailers to the client. This method should only be used when flow control is enabled. If flow
* control is not enabled, It will be automatically called upon message received.
*/
- (void)finish;
/**
* Tell gRPC to receive another message.
*
* This method should only be used when flow control is enabled. If flow control is enabled, gRPC
* will only receive additional messages after the user indicates so by using either
* receiveNextMessage: or receiveNextMessages: methods. If flow control is not enabled, messages
* will be automatically received after the previous one is delivered.
*/
- (void)receiveNextMessage;
/**
* Tell gRPC to receive another N message.
*
* This method should only be used when flow control is enabled. If flow control is enabled, the
* messages received from the server are buffered in gRPC until the user want to receive the next
* message. If flow control is not enabled, messages will be automatically received after the
* previous one is delivered.
*/
- (void)receiveNextMessages:(NSUInteger)numberOfMessages;
@end
/** A client-streaming RPC call with Protobuf. */

@ -27,14 +27,6 @@
#import <RxLibrary/GRXWriteable.h>
#import <RxLibrary/GRXWriter+Transformations.h>
#pragma mark - GRPCStreamingProtoCall
@interface GRPCStreamingProtoCall () <GRPCResponseHandler>
@property(nonatomic, readonly) GRPCCallOptions *callOptions;
@end
@implementation GRPCUnaryResponseHandler {
void (^_responseHandler)(id, NSError *);
dispatch_queue_t _responseDispatchQueue;
@ -108,35 +100,25 @@
- (void)start {
[_call start];
[_call receiveNextMessage];
[_call writeMessage:_message];
if (!_call.callOptions.flowControlEnabled) {
[_call finish];
}
[_call finish];
}
- (void)cancel {
[_call cancel];
}
- (void)finish {
[_call finish];
}
#pragma mark - GRPCControllableProtoCallFlow
@end
- (void)receiveNextMessage {
[_call receiveNextMessage];
}
- (void)receiveNextMessages:(NSUInteger)numberOfMessages {
[_call receiveNextMessages:numberOfMessages];
}
@interface GRPCStreamingProtoCall () <GRPCResponseHandler>
@end
@implementation GRPCStreamingProtoCall {
GRPCRequestOptions *_requestOptions;
id<GRPCProtoResponseHandler> _handler;
GRPCCallOptions *_callOptions;
Class _responseClass;
GRPCCall2 *_call;
@ -244,8 +226,6 @@
[copiedCall finish];
}
#pragma mark - GRPCControllableProtoCallFlow
- (void)receiveNextMessage {
[self receiveNextMessages:1];
}
@ -257,8 +237,6 @@
[copiedCall receiveNextMessages:numberOfMessages];
}
#pragma mark - GRPCResponseHandler
- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata {
@synchronized(self) {
if (initialMetadata != nil &&
@ -336,8 +314,6 @@
}
}
#pragma mark - GRPCDispatchable
- (dispatch_queue_t)dispatchQueue {
return _dispatchQueue;
}

@ -701,87 +701,6 @@ static dispatch_once_t initGlobalInterceptorFactory;
[self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
}
- (void)testUnaryRPCWithV2APIFlowControl {
XCTAssertNotNil([[self class] host]);
__weak XCTestExpectation *expectReceive =
[self expectationWithDescription:@"LargeUnaryWithV2API received message"];
__weak XCTestExpectation *expectComplete =
[self expectationWithDescription:@"LargeUnaryWithV2API received complete"];
const int responseSize = 123;
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = responseSize;
request.payload.body = [NSMutableData dataWithLength:456];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.flowControlEnabled = YES;
GRPCUnaryProtoCall *call = [_service
unaryCallWithMessage:request
responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
XCTAssertNotNil(message);
if (message) {
RMTSimpleResponse *expectedResponse =
[RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body =
[NSMutableData dataWithLength:responseSize];
XCTAssertEqualObjects(message, expectedResponse);
[expectReceive fulfill];
[call finish];
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
XCTAssertNil(error, @"Unexpected error: %@", error);
[expectComplete fulfill];
}]
callOptions:options];
[call start];
[call receiveNextMessage];
[self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
}
- (void)testUnaryRPCWithV2APIFlowControlNotReceivingMessage {
XCTAssertNotNil([[self class] host]);
__weak XCTestExpectation *expectTimeout =
[self expectationWithDescription:
@"testUnaryRPCWithV2APIFlowControlNotReceivingMessage received timeout"];
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = 123;
request.payload.body = [NSMutableData dataWithLength:456];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.flowControlEnabled = YES;
GRPCUnaryProtoCall *call = [_service
unaryCallWithMessage:request
responseHandler:[[InteropTestsBlockCallbacks alloc]
initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
XCTFail("Not expected to receive message");
}
closeCallback:nil]
callOptions:options];
[call start];
XCTWaiterResult result = [XCTWaiter waitForExpectations:@[ expectTimeout ] timeout:5];
XCTAssertEqual(XCTWaiterResultTimedOut, result, @"Unexpected waiter result %@", @(result));
}
- (void)testConcurrentRPCsWithErrorsWithV2API {
NSMutableArray *completeExpectations = [NSMutableArray array];
NSMutableArray *calls = [NSMutableArray array];
@ -1159,72 +1078,9 @@ static dispatch_once_t initGlobalInterceptorFactory;
[self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
}
- (void)testPingPongUnaryRPCWithFlowControl {
- (void)testPingPongRPCWithFlowControl {
XCTAssertNotNil([[self class] host]);
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"UnaryPingPongWithV2API"];
NSNumber *requestSize = @321;
NSArray *responseSizes = @[ @123, @234 ];
RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message];
request.payload.body = [NSMutableData dataWithLength:requestSize.unsignedIntegerValue];
for (NSNumber *responseSize in responseSizes) {
RMTResponseParameters *parameters = [RMTResponseParameters message];
parameters.size = responseSize.intValue;
[request.responseParametersArray addObject:parameters];
}
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.flowControlEnabled = YES;
__block GRPCUnaryProtoCall *call = nil;
__block int receivedMessageCount = 0;
id messageHandler = ^(id message) {
NSLog(@"received message %@", @(receivedMessageCount));
XCTAssertLessThan(receivedMessageCount, responseSizes.count,
"More than expected messages received");
id expected =
[RMTStreamingOutputCallResponse messageWithPayloadSize:responseSizes[receivedMessageCount]];
XCTAssertEqualObjects(message, expected);
receivedMessageCount += 1;
if (receivedMessageCount < responseSizes.count) {
[call receiveNextMessage];
} else {
[call finish];
}
};
id closeHandler = ^(NSDictionary *trailingMetadata, NSError *error) {
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
[expectation fulfill];
};
InteropTestsBlockCallbacks *handler =
[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:messageHandler
closeCallback:closeHandler
writeMessageCallback:nil];
call = [_service streamingOutputCallWithMessage:request
responseHandler:handler
callOptions:options];
[call start];
[call receiveNextMessage];
[self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
}
- (void)testPingPongStreamingRPCWithFlowControl {
XCTAssertNotNil([[self class] host]);
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"StreamingPingPongWithV2API"];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"];
NSArray *requests = @[ @27182, @8, @1828, @45904 ];
NSArray *responses = @[ @31415, @9, @2653, @58979 ];

Loading…
Cancel
Save