|
|
|
@ -448,7 +448,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; |
|
|
|
|
@synchronized (callFlags) { |
|
|
|
|
@synchronized(callFlags) { |
|
|
|
|
switch (callSafety) { |
|
|
|
|
case GRPCCallSafetyDefault: |
|
|
|
|
callFlags[hostAndPath] = @0; |
|
|
|
@ -468,7 +468,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { |
|
|
|
|
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; |
|
|
|
|
uint32_t flags = 0; |
|
|
|
|
@synchronized (callFlags) { |
|
|
|
|
@synchronized(callFlags) { |
|
|
|
|
flags = [callFlags[hostAndPath] intValue]; |
|
|
|
|
} |
|
|
|
|
return flags; |
|
|
|
@ -529,7 +529,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
- (void)setResponseDispatchQueue:(dispatch_queue_t)queue { |
|
|
|
|
@synchronized (self) { |
|
|
|
|
@synchronized(self) { |
|
|
|
|
if (_state != GRXWriterStateNotStarted) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -562,14 +562,14 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
- (void)cancel { |
|
|
|
|
@synchronized (self) { |
|
|
|
|
@synchronized(self) { |
|
|
|
|
if (_state == GRXWriterStateFinished) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
[self finishWithError:[NSError |
|
|
|
|
errorWithDomain:kGRPCErrorDomain |
|
|
|
|
code:GRPCErrorCodeCancelled |
|
|
|
|
userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; |
|
|
|
|
errorWithDomain:kGRPCErrorDomain |
|
|
|
|
code:GRPCErrorCodeCancelled |
|
|
|
|
userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; |
|
|
|
|
[_wrappedCall cancel]; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -636,19 +636,19 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
// don't want to throw, because the app shouldn't crash for a behavior |
|
|
|
|
// that's on the hands of any server to have. Instead we finish and ask |
|
|
|
|
// the server to cancel. |
|
|
|
|
@synchronized (strongSelf) { |
|
|
|
|
@synchronized(strongSelf) { |
|
|
|
|
[strongSelf |
|
|
|
|
finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
|
|
|
|
code:GRPCErrorCodeResourceExhausted |
|
|
|
|
userInfo:@{ |
|
|
|
|
NSLocalizedDescriptionKey : |
|
|
|
|
@"Client does not have enough memory to " |
|
|
|
|
@"hold the server response." |
|
|
|
|
}]]; |
|
|
|
|
finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
|
|
|
|
code:GRPCErrorCodeResourceExhausted |
|
|
|
|
userInfo:@{ |
|
|
|
|
NSLocalizedDescriptionKey : |
|
|
|
|
@"Client does not have enough memory to " |
|
|
|
|
@"hold the server response." |
|
|
|
|
}]]; |
|
|
|
|
[strongSelf->_wrappedCall cancel]; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
@synchronized (strongSelf) { |
|
|
|
|
@synchronized(strongSelf) { |
|
|
|
|
[strongSelf->_responseWriteable enqueueValue:data |
|
|
|
|
completionHandler:^{ |
|
|
|
|
[strongSelf startNextRead]; |
|
|
|
@ -689,9 +689,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO(jcanizales): Add error handlers for async failures |
|
|
|
|
GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers |
|
|
|
|
flags:callSafetyFlags |
|
|
|
|
handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA |
|
|
|
|
GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] |
|
|
|
|
initWithMetadata:headers |
|
|
|
|
flags:callSafetyFlags |
|
|
|
|
handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA |
|
|
|
|
dispatch_async(_callQueue, ^{ |
|
|
|
|
if (!self->_unaryCall) { |
|
|
|
|
[self->_wrappedCall startBatchWithOperations:@[ op ]]; |
|
|
|
@ -731,7 +732,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
- (void)writeValue:(id)value { |
|
|
|
|
NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData"); |
|
|
|
|
|
|
|
|
|
@synchronized (self) { |
|
|
|
|
@synchronized(self) { |
|
|
|
|
if (_state == GRXWriterStateFinished) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -782,9 +783,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
dispatch_async(_callQueue, ^{ |
|
|
|
|
// TODO(jcanizales): Add error handlers for async failures |
|
|
|
|
[self->_wrappedCall |
|
|
|
|
startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; |
|
|
|
|
startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; |
|
|
|
|
[self->_wrappedCall |
|
|
|
|
startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; |
|
|
|
|
startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -825,16 +826,16 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
|
|
|
|
|
// Lock acquired inside startWithWriteable: |
|
|
|
|
- (void)startCallWithWriteable:(id<GRXWriteable>)writeable { |
|
|
|
|
@synchronized (self) { |
|
|
|
|
@synchronized(self) { |
|
|
|
|
if (_state == GRXWriterStateFinished) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
_responseWriteable = |
|
|
|
|
[[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; |
|
|
|
|
[[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; |
|
|
|
|
|
|
|
|
|
GRPCPooledChannel *channel = |
|
|
|
|
[[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; |
|
|
|
|
[[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; |
|
|
|
|
_wrappedCall = [channel wrappedCallWithPath:_path |
|
|
|
|
completionQueue:[GRPCCompletionQueue completionQueue] |
|
|
|
|
callOptions:_callOptions]; |
|
|
|
@ -843,9 +844,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
[self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
|
|
|
|
code:GRPCErrorCodeUnavailable |
|
|
|
|
userInfo:@{ |
|
|
|
|
NSLocalizedDescriptionKey : |
|
|
|
|
@"Failed to create call or channel." |
|
|
|
|
}]]; |
|
|
|
|
NSLocalizedDescriptionKey : |
|
|
|
|
@"Failed to create call or channel." |
|
|
|
|
}]]; |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -869,10 +870,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
_state = GRXWriterStateStarted; |
|
|
|
|
|
|
|
|
|
// Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). |
|
|
|
|
// This makes RPCs in which the call isn't externally retained possible (as long as it is started |
|
|
|
|
// before being autoreleased). |
|
|
|
|
// Care is taken not to retain self strongly in any of the blocks used in this implementation, so |
|
|
|
|
// that the life of the instance is determined by this retain cycle. |
|
|
|
|
// This makes RPCs in which the call isn't externally retained possible (as long as it is |
|
|
|
|
// started before being autoreleased). Care is taken not to retain self strongly in any of the |
|
|
|
|
// blocks used in this implementation, so that the life of the instance is determined by this |
|
|
|
|
// retain cycle. |
|
|
|
|
_retainSelf = self; |
|
|
|
|
|
|
|
|
|
if (_callOptions == nil) { |
|
|
|
@ -961,14 +962,14 @@ const char *kCFStreamVarName = "grpc_cfstream"; |
|
|
|
|
// Retain because connectivity manager only keeps weak reference to GRPCCall. |
|
|
|
|
__strong GRPCCall *strongSelf = self; |
|
|
|
|
if (strongSelf) { |
|
|
|
|
@synchronized (strongSelf) { |
|
|
|
|
@synchronized(strongSelf) { |
|
|
|
|
[_wrappedCall cancel]; |
|
|
|
|
[strongSelf |
|
|
|
|
finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
|
|
|
|
code:GRPCErrorCodeUnavailable |
|
|
|
|
userInfo:@{ |
|
|
|
|
NSLocalizedDescriptionKey : @"Connectivity lost." |
|
|
|
|
}]]; |
|
|
|
|
finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
|
|
|
|
code:GRPCErrorCodeUnavailable |
|
|
|
|
userInfo:@{ |
|
|
|
|
NSLocalizedDescriptionKey : @"Connectivity lost." |
|
|
|
|
}]]; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|