diff --git a/src/objective-c/GRPCClient/GRPCCall+Interceptor.h b/src/objective-c/GRPCClient/GRPCCall+Interceptor.h new file mode 100644 index 00000000000..e6da2a7bda4 --- /dev/null +++ b/src/objective-c/GRPCClient/GRPCCall+Interceptor.h @@ -0,0 +1,38 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// The global interceptor feature is experimental and might be modified or removed at any time. + +#import "GRPCCall.h" + +@protocol GRPCInterceptorFactory; + +@interface GRPCCall2 (Interceptor) + +/** + * Register a global interceptor's factory in the current process. Only one interceptor can be + * registered in a process. If another one attempts to be registered, an exception will be raised. + */ ++ (void)registerGlobalInterceptor:(nonnull id)interceptorFactory; + +/** + * Get the global interceptor's factory. + */ ++ (nullable id)globalInterceptorFactory; + +@end diff --git a/src/objective-c/GRPCClient/GRPCCall+Interceptor.m b/src/objective-c/GRPCClient/GRPCCall+Interceptor.m new file mode 100644 index 00000000000..0d9f1f6b9dc --- /dev/null +++ b/src/objective-c/GRPCClient/GRPCCall+Interceptor.m @@ -0,0 +1,61 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#import "GRPCCall+Interceptor.h" +#import "GRPCInterceptor.h" + +static id globalInterceptorFactory = nil; +static NSLock *globalInterceptorLock = nil; +static dispatch_once_t onceToken; + +@implementation GRPCCall2 (Interceptor) + ++ (void)registerGlobalInterceptor:(id)interceptorFactory { + if (interceptorFactory == nil) { + return; + } + dispatch_once(&onceToken, ^{ + globalInterceptorLock = [[NSLock alloc] init]; + }); + [globalInterceptorLock lock]; + if (globalInterceptorFactory != nil) { + [globalInterceptorLock unlock]; + [NSException raise:NSInternalInconsistencyException + format: + @"Global interceptor is already registered. Only one global interceptor can be " + @"registered in a process."]; + return; + } + + globalInterceptorFactory = interceptorFactory; + [globalInterceptorLock unlock]; +} + ++ (id)globalInterceptorFactory { + dispatch_once(&onceToken, ^{ + globalInterceptorLock = [[NSLock alloc] init]; + }); + id factory; + [globalInterceptorLock lock]; + factory = globalInterceptorFactory; + [globalInterceptorLock unlock]; + + return factory; +} + +@end diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index b5dfade0c39..45b03e4b684 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -17,6 +17,7 @@ */ #import "GRPCCall.h" +#import "GRPCCall+Interceptor.h" #import "GRPCCall+OAuth2.h" #import "GRPCCallOptions.h" #import "GRPCInterceptor.h" @@ -141,33 +142,52 @@ const char *kCFStreamVarName = "grpc_cfstream"; _responseHandler = responseHandler; // Initialize the interceptor chain + + // First initialize the internal call GRPCCall2Internal *internalCall = [[GRPCCall2Internal alloc] init]; id nextInterceptor = internalCall; GRPCInterceptorManager *nextManager = nil; - NSArray *interceptorFactories = _actualCallOptions.interceptorFactories; - if (interceptorFactories.count == 0) { - [internalCall setResponseHandler:_responseHandler]; - } else { - for (int i = (int)interceptorFactories.count - 1; i >= 0; i--) { - GRPCInterceptorManager *manager = - [[GRPCInterceptorManager alloc] initWithNextInterceptor:nextInterceptor]; - GRPCInterceptor *interceptor = - [interceptorFactories[i] createInterceptorWithManager:manager]; - NSAssert(interceptor != nil, @"Failed to create interceptor"); - if (interceptor == nil) { - return nil; - } - if (i == (int)interceptorFactories.count - 1) { - [internalCall setResponseHandler:interceptor]; - } else { - [nextManager setPreviousInterceptor:interceptor]; - } + + // Then initialize the global interceptor, if applicable + id globalInterceptorFactory = [GRPCCall2 globalInterceptorFactory]; + if (globalInterceptorFactory) { + GRPCInterceptorManager *manager = + [[GRPCInterceptorManager alloc] initWithNextInterceptor:nextInterceptor]; + GRPCInterceptor *interceptor = + [globalInterceptorFactory createInterceptorWithManager:manager]; + if (interceptor != nil) { + [internalCall setResponseHandler:interceptor]; nextInterceptor = interceptor; nextManager = manager; } + } + // Finally initialize the interceptors in the chain + NSArray *interceptorFactories = _actualCallOptions.interceptorFactories; + for (int i = (int)interceptorFactories.count - 1; i >= 0; i--) { + GRPCInterceptorManager *manager = + [[GRPCInterceptorManager alloc] initWithNextInterceptor:nextInterceptor]; + GRPCInterceptor *interceptor = [interceptorFactories[i] createInterceptorWithManager:manager]; + NSAssert(interceptor != nil, @"Failed to create interceptor from factory: %@", + interceptorFactories[i]); + if (interceptor == nil) { + NSLog(@"Failed to create interceptor from factory: %@", interceptorFactories[i]); + continue; + } + if (nextManager == nil) { + [internalCall setResponseHandler:interceptor]; + } else { + [nextManager setPreviousInterceptor:interceptor]; + } + nextInterceptor = interceptor; + nextManager = manager; + } + if (nextManager == nil) { + [internalCall setResponseHandler:_responseHandler]; + } else { [nextManager setPreviousInterceptor:_responseHandler]; } + _firstInterceptor = nextInterceptor; } diff --git a/src/objective-c/tests/InteropTests/InteropTests.m b/src/objective-c/tests/InteropTests/InteropTests.m index 197ab7f3da5..3260e918f2f 100644 --- a/src/objective-c/tests/InteropTests/InteropTests.m +++ b/src/objective-c/tests/InteropTests/InteropTests.m @@ -25,6 +25,7 @@ #endif #import #import +#import #import #import #import @@ -120,7 +121,7 @@ initWithRequestDispatchQueue:(dispatch_queue_t)requestDispatchQueue @end -@interface HookIntercetpor : GRPCInterceptor +@interface HookInterceptor : GRPCInterceptor - (instancetype) initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager @@ -143,6 +144,7 @@ initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager @end @implementation HookInterceptorFactory { + @protected void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, GRPCInterceptorManager *manager); void (^_writeDataHook)(id data, GRPCInterceptorManager *manager); @@ -189,7 +191,7 @@ initWithRequestDispatchQueue:(dispatch_queue_t)requestDispatchQueue } - (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager { - return [[HookIntercetpor alloc] initWithInterceptorManager:interceptorManager + return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager requestDispatchQueue:_requestDispatchQueue responseDispatchQueue:_responseDispatchQueue startHook:_startHook @@ -204,7 +206,7 @@ initWithRequestDispatchQueue:(dispatch_queue_t)requestDispatchQueue @end -@implementation HookIntercetpor { +@implementation HookInterceptor { void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, GRPCInterceptorManager *manager); void (^_writeDataHook)(id data, GRPCInterceptorManager *manager); @@ -314,6 +316,90 @@ initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager @end +@interface GlobalInterceptorFactory : HookInterceptorFactory + +@property BOOL enabled; + +- (instancetype)initWithRequestDispatchQueue:(dispatch_queue_t)requestDispatchQueue + responseDispatchQueue:(dispatch_queue_t)responseDispatchQueue; + +- (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, + GRPCInterceptorManager *manager))startHook + writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook + finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook + receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages, + GRPCInterceptorManager *manager))receiveNextMessagesHook + responseHeaderHook:(void (^)(NSDictionary *initialMetadata, + GRPCInterceptorManager *manager))responseHeaderHook + responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook + responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error, + GRPCInterceptorManager *manager))responseCloseHook + didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook; + +@end + +@implementation GlobalInterceptorFactory + +- (instancetype)initWithRequestDispatchQueue:(dispatch_queue_t)requestDispatchQueue + responseDispatchQueue:(dispatch_queue_t)responseDispatchQueue { + _enabled = NO; + return [super initWithRequestDispatchQueue:requestDispatchQueue + responseDispatchQueue:responseDispatchQueue + startHook:nil + writeDataHook:nil + finishHook:nil + receiveNextMessagesHook:nil + responseHeaderHook:nil + responseDataHook:nil + responseCloseHook:nil + didWriteDataHook:nil]; +} + +- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager { + if (_enabled) { + return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager + requestDispatchQueue:_requestDispatchQueue + responseDispatchQueue:_responseDispatchQueue + startHook:_startHook + writeDataHook:_writeDataHook + finishHook:_finishHook + receiveNextMessagesHook:_receiveNextMessagesHook + responseHeaderHook:_responseHeaderHook + responseDataHook:_responseDataHook + responseCloseHook:_responseCloseHook + didWriteDataHook:_didWriteDataHook]; + } else { + return nil; + } +} + +- (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, + GRPCInterceptorManager *manager))startHook + writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook + finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook + receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages, + GRPCInterceptorManager *manager))receiveNextMessagesHook + responseHeaderHook:(void (^)(NSDictionary *initialMetadata, + GRPCInterceptorManager *manager))responseHeaderHook + responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook + responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error, + GRPCInterceptorManager *manager))responseCloseHook + didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook { + _startHook = startHook; + _writeDataHook = writeDataHook; + _finishHook = finishHook; + _receiveNextMessagesHook = receiveNextMessagesHook; + _responseHeaderHook = responseHeaderHook; + _responseDataHook = responseDataHook; + _responseCloseHook = responseCloseHook; + _didWriteDataHook = didWriteDataHook; +} + +@end + +static GlobalInterceptorFactory *globalInterceptorFactory = nil; +static dispatch_once_t initGlobalInterceptorFactory; + #pragma mark Tests @implementation InteropTests { @@ -357,6 +443,14 @@ initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager #ifdef GRPC_CFSTREAM setenv(kCFStreamVarName, "1", 1); #endif + + dispatch_once(&initGlobalInterceptorFactory, ^{ + dispatch_queue_t globalInterceptorQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + globalInterceptorFactory = + [[GlobalInterceptorFactory alloc] initWithRequestDispatchQueue:globalInterceptorQueue + responseDispatchQueue:globalInterceptorQueue]; + [GRPCCall2 registerGlobalInterceptor:globalInterceptorFactory]; + }); } - (void)setUp { @@ -1229,7 +1323,8 @@ initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager - (void)testDefaultInterceptor { XCTAssertNotNil([[self class] host]); - __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"]; + __weak XCTestExpectation *expectation = + [self expectationWithDescription:@"testDefaultInterceptor"]; NSArray *requests = @[ @27182, @8, @1828, @45904 ]; NSArray *responses = @[ @31415, @9, @2653, @58979 ]; @@ -1282,7 +1377,8 @@ initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager - (void)testLoggingInterceptor { XCTAssertNotNil([[self class] host]); - __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"]; + __weak XCTestExpectation *expectation = + [self expectationWithDescription:@"testLoggingInterceptor"]; __block NSUInteger startCount = 0; __block NSUInteger writeDataCount = 0; @@ -1405,7 +1501,10 @@ initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager - (void)testHijackingInterceptor { NSUInteger kCancelAfterWrites = 2; XCTAssertNotNil([[self class] host]); - __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"]; + __weak XCTestExpectation *expectUserCallComplete = + [self expectationWithDescription:@"User call completed."]; + __weak XCTestExpectation *expectCallInternalComplete = + [self expectationWithDescription:@"Internal gRPC call completed."]; NSArray *responses = @[ @1, @2, @3, @4 ]; __block int index = 0; @@ -1462,6 +1561,7 @@ initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager XCTAssertNil(trailingMetadata); XCTAssertNotNil(error); XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED); + [expectCallInternalComplete fulfill]; } didWriteDataHook:nil]; @@ -1503,7 +1603,7 @@ initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager XCTAssertEqual(index, 4, @"Received %i responses instead of 4.", index); - [expectation fulfill]; + [expectUserCallComplete fulfill]; }] callOptions:options]; [call start]; @@ -1519,4 +1619,305 @@ initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager XCTAssertEqual(responseCloseCount, 1); } +- (void)testGlobalInterceptor { + XCTAssertNotNil([[self class] host]); + __weak XCTestExpectation *expectation = + [self expectationWithDescription:@"testGlobalInterceptor"]; + + __block NSUInteger startCount = 0; + __block NSUInteger writeDataCount = 0; + __block NSUInteger finishCount = 0; + __block NSUInteger receiveNextMessageCount = 0; + __block NSUInteger responseHeaderCount = 0; + __block NSUInteger responseDataCount = 0; + __block NSUInteger responseCloseCount = 0; + __block NSUInteger didWriteDataCount = 0; + [globalInterceptorFactory setStartHook:^(GRPCRequestOptions *requestOptions, + GRPCCallOptions *callOptions, + GRPCInterceptorManager *manager) { + startCount++; + XCTAssertEqualObjects(requestOptions.host, [[self class] host]); + XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall"); + XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault); + [manager startNextInterceptorWithRequest:[requestOptions copy] callOptions:[callOptions copy]]; + } + writeDataHook:^(id data, GRPCInterceptorManager *manager) { + writeDataCount++; + [manager writeNextInterceptorWithData:data]; + } + finishHook:^(GRPCInterceptorManager *manager) { + finishCount++; + [manager finishNextInterceptor]; + } + receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) { + receiveNextMessageCount++; + [manager receiveNextInterceptorMessages:numberOfMessages]; + } + responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) { + responseHeaderCount++; + [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; + } + responseDataHook:^(id data, GRPCInterceptorManager *manager) { + responseDataCount++; + [manager forwardPreviousInterceptorWithData:data]; + } + responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error, + GRPCInterceptorManager *manager) { + responseCloseCount++; + [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata error:error]; + } + didWriteDataHook:^(GRPCInterceptorManager *manager) { + didWriteDataCount++; + [manager forwardPreviousInterceptorDidWriteData]; + }]; + + NSArray *requests = @[ @1, @2, @3, @4 ]; + NSArray *responses = @[ @1, @2, @3, @4 ]; + + __block int index = 0; + + id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] + requestedResponseSize:responses[index]]; + GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; + options.transportType = [[self class] transportType]; + options.PEMRootCertificates = [[self class] PEMRootCertificates]; + options.hostNameOverride = [[self class] hostNameOverride]; + options.flowControlEnabled = YES; + globalInterceptorFactory.enabled = YES; + + __block BOOL canWriteData = NO; + __block GRPCStreamingProtoCall *call = [_service + fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc] + initWithInitialMetadataCallback:nil + messageCallback:^(id message) { + XCTAssertLessThan(index, 4, + @"More than 4 responses received."); + index += 1; + if (index < 4) { + id request = [RMTStreamingOutputCallRequest + messageWithPayloadSize:requests[index] + requestedResponseSize:responses[index]]; + XCTAssertTrue(canWriteData); + canWriteData = NO; + [call writeMessage:request]; + [call receiveNextMessage]; + } else { + [call finish]; + } + } + closeCallback:^(NSDictionary *trailingMetadata, + NSError *error) { + XCTAssertNil(error, + @"Finished with unexpected error: %@", + error); + [expectation fulfill]; + } + writeMessageCallback:^{ + canWriteData = YES; + }] + callOptions:options]; + [call start]; + [call receiveNextMessage]; + [call writeMessage:request]; + + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; + XCTAssertEqual(startCount, 1); + XCTAssertEqual(writeDataCount, 4); + XCTAssertEqual(finishCount, 1); + XCTAssertEqual(receiveNextMessageCount, 4); + XCTAssertEqual(responseHeaderCount, 1); + XCTAssertEqual(responseDataCount, 4); + XCTAssertEqual(responseCloseCount, 1); + XCTAssertEqual(didWriteDataCount, 4); + globalInterceptorFactory.enabled = NO; +} + +- (void)testConflictingGlobalInterceptors { + id factory = [[HookInterceptorFactory alloc] + initWithRequestDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL) + responseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL) + startHook:nil + writeDataHook:nil + finishHook:nil + receiveNextMessagesHook:nil + responseHeaderHook:nil + responseDataHook:nil + responseCloseHook:nil + didWriteDataHook:nil]; + @try { + [GRPCCall2 registerGlobalInterceptor:factory]; + XCTFail(@"Did not receive an exception when registering global interceptor the second time"); + } @catch (NSException *exception) { + // Do nothing; test passes + } +} + +- (void)testInterceptorAndGlobalInterceptor { + XCTAssertNotNil([[self class] host]); + __weak XCTestExpectation *expectation = + [self expectationWithDescription:@"testInterceptorAndGlobalInterceptor"]; + + __block NSUInteger startCount = 0; + __block NSUInteger writeDataCount = 0; + __block NSUInteger finishCount = 0; + __block NSUInteger receiveNextMessageCount = 0; + __block NSUInteger responseHeaderCount = 0; + __block NSUInteger responseDataCount = 0; + __block NSUInteger responseCloseCount = 0; + __block NSUInteger didWriteDataCount = 0; + + id factory = [[HookInterceptorFactory alloc] + initWithRequestDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL) + responseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL) + startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions, + GRPCInterceptorManager *manager) { + startCount++; + XCTAssertEqualObjects(requestOptions.host, [[self class] host]); + XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall"); + XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault); + [manager startNextInterceptorWithRequest:[requestOptions copy] + callOptions:[callOptions copy]]; + } + writeDataHook:^(id data, GRPCInterceptorManager *manager) { + writeDataCount++; + [manager writeNextInterceptorWithData:data]; + } + finishHook:^(GRPCInterceptorManager *manager) { + finishCount++; + [manager finishNextInterceptor]; + } + receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) { + receiveNextMessageCount++; + [manager receiveNextInterceptorMessages:numberOfMessages]; + } + responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) { + responseHeaderCount++; + [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; + } + responseDataHook:^(id data, GRPCInterceptorManager *manager) { + responseDataCount++; + [manager forwardPreviousInterceptorWithData:data]; + } + responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error, + GRPCInterceptorManager *manager) { + responseCloseCount++; + [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata error:error]; + } + didWriteDataHook:^(GRPCInterceptorManager *manager) { + didWriteDataCount++; + [manager forwardPreviousInterceptorDidWriteData]; + }]; + + __block NSUInteger globalStartCount = 0; + __block NSUInteger globalWriteDataCount = 0; + __block NSUInteger globalFinishCount = 0; + __block NSUInteger globalReceiveNextMessageCount = 0; + __block NSUInteger globalResponseHeaderCount = 0; + __block NSUInteger globalResponseDataCount = 0; + __block NSUInteger globalResponseCloseCount = 0; + __block NSUInteger globalDidWriteDataCount = 0; + + [globalInterceptorFactory setStartHook:^(GRPCRequestOptions *requestOptions, + GRPCCallOptions *callOptions, + GRPCInterceptorManager *manager) { + globalStartCount++; + XCTAssertEqualObjects(requestOptions.host, [[self class] host]); + XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall"); + XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault); + [manager startNextInterceptorWithRequest:[requestOptions copy] callOptions:[callOptions copy]]; + } + writeDataHook:^(id data, GRPCInterceptorManager *manager) { + globalWriteDataCount++; + [manager writeNextInterceptorWithData:data]; + } + finishHook:^(GRPCInterceptorManager *manager) { + globalFinishCount++; + [manager finishNextInterceptor]; + } + receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) { + globalReceiveNextMessageCount++; + [manager receiveNextInterceptorMessages:numberOfMessages]; + } + responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) { + globalResponseHeaderCount++; + [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; + } + responseDataHook:^(id data, GRPCInterceptorManager *manager) { + globalResponseDataCount++; + [manager forwardPreviousInterceptorWithData:data]; + } + responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error, + GRPCInterceptorManager *manager) { + globalResponseCloseCount++; + [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata error:error]; + } + didWriteDataHook:^(GRPCInterceptorManager *manager) { + globalDidWriteDataCount++; + [manager forwardPreviousInterceptorDidWriteData]; + }]; + + NSArray *requests = @[ @1, @2, @3, @4 ]; + NSArray *responses = @[ @1, @2, @3, @4 ]; + + __block int index = 0; + + id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] + requestedResponseSize:responses[index]]; + GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; + options.transportType = [[self class] transportType]; + options.PEMRootCertificates = [[self class] PEMRootCertificates]; + options.hostNameOverride = [[self class] hostNameOverride]; + options.flowControlEnabled = YES; + options.interceptorFactories = @[ factory ]; + globalInterceptorFactory.enabled = YES; + + __block BOOL canWriteData = NO; + __block GRPCStreamingProtoCall *call = [_service + fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc] + initWithInitialMetadataCallback:nil + messageCallback:^(id message) { + index += 1; + if (index < 4) { + id request = [RMTStreamingOutputCallRequest + messageWithPayloadSize:requests[index] + requestedResponseSize:responses[index]]; + canWriteData = NO; + [call writeMessage:request]; + [call receiveNextMessage]; + } else { + [call finish]; + } + } + closeCallback:^(NSDictionary *trailingMetadata, + NSError *error) { + [expectation fulfill]; + } + writeMessageCallback:^{ + canWriteData = YES; + }] + callOptions:options]; + [call start]; + [call receiveNextMessage]; + [call writeMessage:request]; + + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; + XCTAssertEqual(startCount, 1); + XCTAssertEqual(writeDataCount, 4); + XCTAssertEqual(finishCount, 1); + XCTAssertEqual(receiveNextMessageCount, 4); + XCTAssertEqual(responseHeaderCount, 1); + XCTAssertEqual(responseDataCount, 4); + XCTAssertEqual(responseCloseCount, 1); + XCTAssertEqual(didWriteDataCount, 4); + XCTAssertEqual(globalStartCount, 1); + XCTAssertEqual(globalWriteDataCount, 4); + XCTAssertEqual(globalFinishCount, 1); + XCTAssertEqual(globalReceiveNextMessageCount, 4); + XCTAssertEqual(globalResponseHeaderCount, 1); + XCTAssertEqual(globalResponseDataCount, 4); + XCTAssertEqual(globalResponseCloseCount, 1); + XCTAssertEqual(globalDidWriteDataCount, 4); + globalInterceptorFactory.enabled = NO; +} + @end