Revert "Revert "Refactor connectivity monitor on iOS""

pull/14554/head
Muxi Yan 7 years ago committed by GitHub
parent 1294d20b6b
commit 07767950c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 136
      src/objective-c/GRPCClient/GRPCCall.m
  2. 58
      src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h
  3. 199
      src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m
  4. 25
      src/objective-c/GRPCClient/private/GRPCHost.m

@ -108,6 +108,9 @@ static NSString * const kBearerPrefix = @"Bearer ";
// The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
// queue // queue
dispatch_queue_t _responseQueue; dispatch_queue_t _responseQueue;
// Whether the call is finished. If it is, should not call finishWithError again.
BOOL _finished;
} }
@synthesize state = _state; @synthesize state = _state;
@ -206,6 +209,8 @@ static NSString * const kBearerPrefix = @"Bearer ";
} else { } else {
[_responseWriteable enqueueSuccessfulCompletion]; [_responseWriteable enqueueSuccessfulCompletion];
} }
[GRPCConnectivityMonitor unregisterObserver:self];
} }
- (void)cancelCall { - (void)cancelCall {
@ -214,9 +219,10 @@ static NSString * const kBearerPrefix = @"Bearer ";
} }
- (void)cancel { - (void)cancel {
[self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled code:GRPCErrorCodeCancelled
userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]]; userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]];
if (!self.isWaitingForToken) { if (!self.isWaitingForToken) {
[self cancelCall]; [self cancelCall];
} else { } else {
@ -224,6 +230,19 @@ static NSString * const kBearerPrefix = @"Bearer ";
} }
} }
- (void)maybeFinishWithError:(NSError *)errorOrNil {
BOOL toFinish = NO;
@synchronized(self) {
if (_finished == NO) {
_finished = YES;
toFinish = YES;
}
}
if (toFinish == YES) {
[self finishWithError:errorOrNil];
}
}
- (void)dealloc { - (void)dealloc {
__block GRPCWrappedCall *wrappedCall = _wrappedCall; __block GRPCWrappedCall *wrappedCall = _wrappedCall;
dispatch_async(_callQueue, ^{ dispatch_async(_callQueue, ^{
@ -250,11 +269,13 @@ static NSString * const kBearerPrefix = @"Bearer ";
if (self.state == GRXWriterStatePaused) { if (self.state == GRXWriterStatePaused) {
return; return;
} }
__weak GRPCCall *weakSelf = self;
__weak GRXConcurrentWriteable *weakWriteable = _responseWriteable;
dispatch_async(_callQueue, ^{ dispatch_async(_callQueue, ^{
[weakSelf startReadWithHandler:^(grpc_byte_buffer *message) { __weak GRPCCall *weakSelf = self;
__weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable;
[self startReadWithHandler:^(grpc_byte_buffer *message) {
__strong GRPCCall *strongSelf = weakSelf;
__strong GRXConcurrentWriteable *strongWriteable = weakWriteable;
if (message == NULL) { if (message == NULL) {
// No more messages from the server // No more messages from the server
return; return;
@ -266,14 +287,14 @@ static NSString * const kBearerPrefix = @"Bearer ";
// don't want to throw, because the app shouldn't crash for a behavior // don't want to throw, because the app shouldn't crash for a behavior
// that's on the hands of any server to have. Instead we finish and ask // that's on the hands of any server to have. Instead we finish and ask
// the server to cancel. // the server to cancel.
[weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeResourceExhausted code:GRPCErrorCodeResourceExhausted
userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]]; userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]];
[weakSelf cancelCall]; [strongSelf cancelCall];
return; return;
} }
[weakWriteable enqueueValue:data completionHandler:^{ [strongWriteable enqueueValue:data completionHandler:^{
[weakSelf startNextRead]; [strongSelf startNextRead];
}]; }];
}]; }];
}); });
@ -333,12 +354,17 @@ static NSString * const kBearerPrefix = @"Bearer ";
_requestWriter.state = GRXWriterStatePaused; _requestWriter.state = GRXWriterStatePaused;
} }
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{ dispatch_async(_callQueue, ^{
[weakSelf writeMessage:value withErrorHandler:^{ __weak GRPCCall *weakSelf = self;
[weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain [self writeMessage:value withErrorHandler:^{
code:GRPCErrorCodeInternal __strong GRPCCall *strongSelf = weakSelf;
userInfo:nil]]; if (strongSelf != nil) {
[strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
// Wrapped call must be canceled when error is reported to upper layers
[strongSelf cancelCall];
}
}]; }];
}); });
} }
@ -360,12 +386,15 @@ static NSString * const kBearerPrefix = @"Bearer ";
if (errorOrNil) { if (errorOrNil) {
[self cancel]; [self cancel];
} else { } else {
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{ dispatch_async(_callQueue, ^{
[weakSelf finishRequestWithErrorHandler:^{ __weak GRPCCall *weakSelf = self;
[weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain [self finishRequestWithErrorHandler:^{
code:GRPCErrorCodeInternal __strong GRPCCall *strongSelf = weakSelf;
userInfo:nil]]; [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
// Wrapped call must be canceled when error is reported to upper layers
[strongSelf cancelCall];
}]; }];
}); });
} }
@ -387,30 +416,37 @@ static NSString * const kBearerPrefix = @"Bearer ";
} }
- (void)invokeCall { - (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithHeadersHandler:^(NSDictionary *headers) { [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
// Response headers received. // Response headers received.
self.responseHeaders = headers; __strong GRPCCall *strongSelf = weakSelf;
[self startNextRead]; if (strongSelf) {
strongSelf.responseHeaders = headers;
[strongSelf startNextRead];
}
} completionHandler:^(NSError *error, NSDictionary *trailers) { } completionHandler:^(NSError *error, NSDictionary *trailers) {
self.responseTrailers = trailers; __strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseTrailers = trailers;
if (error) { if (error) {
NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
if (error.userInfo) { if (error.userInfo) {
[userInfo addEntriesFromDictionary:error.userInfo]; [userInfo addEntriesFromDictionary:error.userInfo];
} }
userInfo[kGRPCTrailersKey] = self.responseTrailers; userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
// TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
// called before this one, so an error might end up with trailers but no headers. We // called before this one, so an error might end up with trailers but no headers. We
// shouldn't call finishWithError until ater both blocks are called. It is also when this is // shouldn't call finishWithError until ater both blocks are called. It is also when this is
// done that we can provide a merged view of response headers and trailers in a thread-safe // done that we can provide a merged view of response headers and trailers in a thread-safe
// way. // way.
if (self.responseHeaders) { if (strongSelf.responseHeaders) {
userInfo[kGRPCHeadersKey] = self.responseHeaders; userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
}
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
} }
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; [strongSelf maybeFinishWithError:error];
} }
[self finishWithError:error];
}]; }];
// Now that the RPC has been initiated, request writes can start. // Now that the RPC has been initiated, request writes can start.
@synchronized(_requestWriter) { @synchronized(_requestWriter) {
@ -439,16 +475,8 @@ static NSString * const kBearerPrefix = @"Bearer ";
// TODO(jcanizales): Check this on init. // TODO(jcanizales): Check this on init.
[NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host]; [NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host];
} }
_connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host]; [GRPCConnectivityMonitor registerObserver:self
__weak typeof(self) weakSelf = self; selector:@selector(connectivityChanged:)];
void (^handler)(void) = ^{
typeof(self) strongSelf = weakSelf;
[strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeUnavailable
userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]];
};
[_connectivityMonitor handleLossWithHandler:handler
wifiStatusChangeHandler:nil];
} }
- (void)startWithWriteable:(id<GRXWriteable>)writeable { - (void)startWithWriteable:(id<GRXWriteable>)writeable {
@ -512,4 +540,12 @@ static NSString * const kBearerPrefix = @"Bearer ";
} }
} }
- (void)connectivityChanged:(NSNotification *)note {
[self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeUnavailable
userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]];
// Cancel underlying call upon this notification
[self cancelCall];
}
@end @end

@ -19,44 +19,30 @@
#import <Foundation/Foundation.h> #import <Foundation/Foundation.h>
#import <SystemConfiguration/SystemConfiguration.h> #import <SystemConfiguration/SystemConfiguration.h>
@interface GRPCReachabilityFlags : NSObject typedef NS_ENUM(NSInteger, GRPCConnectivityStatus) {
GRPCConnectivityUnknown = 0,
+ (nonnull instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags; GRPCConnectivityNoNetwork = 1,
GRPCConnectivityCellular = 2,
/** GRPCConnectivityWiFi = 3,
* One accessor method to query each of the different flags. Example: };
@property(nonatomic, readonly) BOOL isCell; extern NSString * _Nonnull kGRPCConnectivityNotification;
*/ // This interface monitors OS reachability interface for any network status
#define GRPC_XMACRO_ITEM(methodName, FlagName) \ // change. Parties interested in these events should register themselves as
@property(nonatomic, readonly) BOOL methodName; // observer.
#include "GRPCReachabilityFlagNames.xmacro.h"
#undef GRPC_XMACRO_ITEM
@property(nonatomic, readonly) BOOL isHostReachable;
@end
@interface GRPCConnectivityMonitor : NSObject @interface GRPCConnectivityMonitor : NSObject
+ (nullable instancetype)monitorWithHost:(nonnull NSString *)hostName;
- (nonnull instancetype)init NS_UNAVAILABLE; - (nonnull instancetype)init NS_UNAVAILABLE;
/** // Register an object as observer of network status change. \a observer
* Queue on which callbacks will be dispatched. Default is the main queue. Set it before calling // must have a notification method with one parameter of type
* handleLossWithHandler:. // (NSNotification *) and should pass it to parameter \a selector. The
*/ // parameter of this notification method is not used for now.
// TODO(jcanizales): Default to a serial background queue instead. + (void)registerObserver:(_Nonnull id)observer
@property(nonatomic, strong, null_resettable) dispatch_queue_t queue; selector:(_Nonnull SEL)selector;
/** // Ungegister an object from observers of network status change.
* Calls handler every time the connectivity to this instance's host is lost. If this instance is + (void)unregisterObserver:(_Nonnull id)observer;
* released before that happens, the handler won't be called.
* Only one handler is active at a time, so if this method is called again before the previous
* handler has been called, it might never be called at all (or yes, if it has already been queued).
*/
- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler
wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler;
@end @end

@ -18,175 +18,74 @@
#import "GRPCConnectivityMonitor.h" #import "GRPCConnectivityMonitor.h"
#pragma mark Flags #include <netinet/in.h>
@implementation GRPCReachabilityFlags { NSString *kGRPCConnectivityNotification = @"kGRPCConnectivityNotification";
SCNetworkReachabilityFlags _flags;
}
+ (instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags { static SCNetworkReachabilityRef reachability;
return [[self alloc] initWithFlags:flags]; static GRPCConnectivityStatus currentStatus;
}
- (instancetype)initWithFlags:(SCNetworkReachabilityFlags)flags { // Aggregate information in flags into network status.
if ((self = [super init])) { GRPCConnectivityStatus CalculateConnectivityStatus(SCNetworkReachabilityFlags flags) {
_flags = flags; GRPCConnectivityStatus result = GRPCConnectivityUnknown;
if (((flags & kSCNetworkReachabilityFlagsReachable) == 0) ||
((flags & kSCNetworkReachabilityFlagsConnectionRequired) != 0)) {
return GRPCConnectivityNoNetwork;
} }
return self; result = GRPCConnectivityWiFi;
} #if TARGET_OS_IPHONE
if (flags & kSCNetworkReachabilityFlagsIsWWAN) {
/* return result = GRPCConnectivityCellular;
* One accessor method implementation per flag. Example:
- (BOOL)isCell { \
return !!(_flags & kSCNetworkReachabilityFlagsIsWWAN); \
}
*/
#define GRPC_XMACRO_ITEM(methodName, FlagName) \
- (BOOL)methodName { \
return !!(_flags & kSCNetworkReachabilityFlags ## FlagName); \
}
#include "GRPCReachabilityFlagNames.xmacro.h"
#undef GRPC_XMACRO_ITEM
- (BOOL)isHostReachable {
// Note: connectionOnDemand means it'll be reachable only if using the CFSocketStream API or APIs
// on top of it.
// connectionRequired means we can't tell until a connection is attempted (e.g. for VPN on
// demand).
return self.reachable && !self.interventionRequired && !self.connectionOnDemand;
}
- (NSString *)description {
NSMutableArray *activeOptions = [NSMutableArray arrayWithCapacity:9];
/*
* For each flag, add its name to the array if it's ON. Example:
if (self.isCell) {
[activeOptions addObject:@"isCell"];
} }
#endif
*/ return result;
#define GRPC_XMACRO_ITEM(methodName, FlagName) \
if (self.methodName) { \
[activeOptions addObject:@ #methodName]; \
}
#include "GRPCReachabilityFlagNames.xmacro.h"
#undef GRPC_XMACRO_ITEM
return activeOptions.count == 0 ? @"(none)" : [activeOptions componentsJoinedByString:@", "];
}
- (BOOL)isEqual:(id)object {
return [object isKindOfClass:[GRPCReachabilityFlags class]] &&
_flags == ((GRPCReachabilityFlags *)object)->_flags;
}
- (NSUInteger)hash {
return _flags;
}
@end
#pragma mark Connectivity Monitor
// Assumes the third argument is a block that accepts a GRPCReachabilityFlags object, and passes the
// received ones to it.
static void PassFlagsToContextInfoBlock(SCNetworkReachabilityRef target,
SCNetworkReachabilityFlags flags,
void *info) {
#pragma unused (target)
// This can be called many times with the same info. The info is retained by SCNetworkReachability
// while this function is being executed.
void (^handler)(GRPCReachabilityFlags *) = (__bridge void (^)(GRPCReachabilityFlags *))info;
handler([[GRPCReachabilityFlags alloc] initWithFlags:flags]);
} }
@implementation GRPCConnectivityMonitor { static void ReachabilityCallback(
SCNetworkReachabilityRef _reachabilityRef; SCNetworkReachabilityRef target, SCNetworkReachabilityFlags flags, void* info) {
GRPCReachabilityFlags *_previousReachabilityFlags; GRPCConnectivityStatus newStatus = CalculateConnectivityStatus(flags);
}
- (nullable instancetype)initWithReachability:(nullable SCNetworkReachabilityRef)reachability { if (newStatus != currentStatus) {
if (!reachability) { [[NSNotificationCenter defaultCenter] postNotificationName:kGRPCConnectivityNotification
return nil; object:nil];
currentStatus = newStatus;
} }
if ((self = [super init])) {
_reachabilityRef = CFRetain(reachability);
_queue = dispatch_get_main_queue();
_previousReachabilityFlags = nil;
}
return self;
} }
+ (nullable instancetype)monitorWithHost:(nonnull NSString *)host { @implementation GRPCConnectivityMonitor
const char *hostName = host.UTF8String;
if (!hostName) {
[NSException raise:NSInvalidArgumentException
format:@"host.UTF8String returns NULL for %@", host];
}
SCNetworkReachabilityRef reachability =
SCNetworkReachabilityCreateWithName(NULL, hostName);
GRPCConnectivityMonitor *returnValue = [[self alloc] initWithReachability:reachability]; + (void)initialize {
if (reachability) { if (self == [GRPCConnectivityMonitor self]) {
CFRelease(reachability); struct sockaddr_in addr = {0};
} addr.sin_len = sizeof(addr);
return returnValue; addr.sin_family = AF_INET;
} reachability = SCNetworkReachabilityCreateWithAddress(NULL, (struct sockaddr *)&addr);
currentStatus = GRPCConnectivityUnknown;
- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler SCNetworkConnectionFlags flags;
wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler { if (SCNetworkReachabilityGetFlags(reachability, &flags)) {
__weak typeof(self) weakSelf = self; currentStatus = CalculateConnectivityStatus(flags);
[self startListeningWithHandler:^(GRPCReachabilityFlags *flags) {
typeof(self) strongSelf = weakSelf;
if (strongSelf) {
if (lossHandler && !flags.reachable) {
lossHandler();
#if TARGET_OS_IPHONE
} else if (wifiStatusChangeHandler &&
strongSelf->_previousReachabilityFlags &&
(flags.isWWAN ^
strongSelf->_previousReachabilityFlags.isWWAN)) {
wifiStatusChangeHandler();
#endif
}
strongSelf->_previousReachabilityFlags = flags;
} }
}];
}
- (void)startListeningWithHandler:(void (^)(GRPCReachabilityFlags *))handler { SCNetworkReachabilityContext context = {0, (__bridge void *)(self), NULL, NULL, NULL};
// Copy to ensure the handler block is in the heap (and so can't be deallocated when this method if (!SCNetworkReachabilitySetCallback(reachability, ReachabilityCallback, &context) ||
// returns). !SCNetworkReachabilityScheduleWithRunLoop(
void (^copiedHandler)(GRPCReachabilityFlags *) = [handler copy]; reachability, CFRunLoopGetMain(), kCFRunLoopCommonModes)) {
SCNetworkReachabilityContext context = { NSLog(@"gRPC connectivity monitor fail to set");
.version = 0, }
.info = (__bridge void *)copiedHandler, }
.retain = CFRetain,
.release = CFRelease,
};
// The following will retain context.info, and release it when the callback is set to NULL.
SCNetworkReachabilitySetCallback(_reachabilityRef, PassFlagsToContextInfoBlock, &context);
SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, _queue);
}
- (void)stopListening {
// This releases the block on context.info.
SCNetworkReachabilitySetCallback(_reachabilityRef, NULL, NULL);
SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, NULL);
} }
- (void)setQueue:(dispatch_queue_t)queue { + (void)registerObserver:(_Nonnull id)observer
_queue = queue ?: dispatch_get_main_queue(); selector:(SEL)selector {
[[NSNotificationCenter defaultCenter] addObserver:observer
selector:selector
name:kGRPCConnectivityNotification
object:nil];
} }
- (void)dealloc { + (void)unregisterObserver:(_Nonnull id)observer {
if (_reachabilityRef) { [[NSNotificationCenter defaultCenter] removeObserver:observer];
[self stopListening];
CFRelease(_reachabilityRef);
}
} }
@end @end

@ -37,12 +37,6 @@ NS_ASSUME_NONNULL_BEGIN
static NSMutableDictionary *kHostCache; static NSMutableDictionary *kHostCache;
// This connectivity monitor flushes the host cache when connectivity status
// changes or when connection switch between Wifi and Cellular data, so that a
// new call will use a new channel. Otherwise, a new call will still use the
// cached channel which is no longer available and will cause gRPC to hang.
static GRPCConnectivityMonitor *connectivityMonitor = nil;
@implementation GRPCHost { @implementation GRPCHost {
// TODO(mlumish): Investigate whether caching channels with strong links is a good idea. // TODO(mlumish): Investigate whether caching channels with strong links is a good idea.
GRPCChannel *_channel; GRPCChannel *_channel;
@ -90,17 +84,7 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil;
kHostCache[address] = self; kHostCache[address] = self;
_compressAlgorithm = GRPC_COMPRESS_NONE; _compressAlgorithm = GRPC_COMPRESS_NONE;
} }
// Keep a single monitor to flush the cache if the connectivity status changes [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChange:)];
// Thread safety guarded by @synchronized(kHostCache)
if (!connectivityMonitor) {
connectivityMonitor =
[GRPCConnectivityMonitor monitorWithHost:hostURL.host];
void (^handler)(void) = ^{
[GRPCHost flushChannelCache];
};
[connectivityMonitor handleLossWithHandler:handler
wifiStatusChangeHandler:handler];
}
} }
return self; return self;
} }
@ -281,6 +265,13 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil;
} }
} }
// Flushes the host cache when connectivity status changes or when connection switch between Wifi
// and Cellular data, so that a new call will use a new channel. Otherwise, a new call will still
// use the cached channel which is no longer available and will cause gRPC to hang.
- (void)connectivityChange:(NSNotification *)note {
[GRPCHost flushChannelCache];
}
@end @end
NS_ASSUME_NONNULL_END NS_ASSUME_NONNULL_END

Loading…
Cancel
Save