Merge pull request #14554 from grpc/revert-14553-revert-14529-polish-connectivity-monitor

Revert "Revert "Refactor connectivity monitor on iOS""
reviewable/pr13290/r34
Muxi Yan 7 years ago committed by GitHub
commit ccd1d55807
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 136
      src/objective-c/GRPCClient/GRPCCall.m
  2. 58
      src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h
  3. 199
      src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m
  4. 25
      src/objective-c/GRPCClient/private/GRPCHost.m

@ -108,6 +108,9 @@ static NSString * const kBearerPrefix = @"Bearer ";
// The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
// queue
dispatch_queue_t _responseQueue;
// Whether the call is finished. If it is, should not call finishWithError again.
BOOL _finished;
}
@synthesize state = _state;
@ -206,6 +209,8 @@ static NSString * const kBearerPrefix = @"Bearer ";
} else {
[_responseWriteable enqueueSuccessfulCompletion];
}
[GRPCConnectivityMonitor unregisterObserver:self];
}
- (void)cancelCall {
@ -214,9 +219,10 @@ static NSString * const kBearerPrefix = @"Bearer ";
}
- (void)cancel {
[self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]];
[self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]];
if (!self.isWaitingForToken) {
[self cancelCall];
} else {
@ -224,6 +230,19 @@ static NSString * const kBearerPrefix = @"Bearer ";
}
}
- (void)maybeFinishWithError:(NSError *)errorOrNil {
BOOL toFinish = NO;
@synchronized(self) {
if (_finished == NO) {
_finished = YES;
toFinish = YES;
}
}
if (toFinish == YES) {
[self finishWithError:errorOrNil];
}
}
- (void)dealloc {
__block GRPCWrappedCall *wrappedCall = _wrappedCall;
dispatch_async(_callQueue, ^{
@ -250,11 +269,13 @@ static NSString * const kBearerPrefix = @"Bearer ";
if (self.state == GRXWriterStatePaused) {
return;
}
__weak GRPCCall *weakSelf = self;
__weak GRXConcurrentWriteable *weakWriteable = _responseWriteable;
dispatch_async(_callQueue, ^{
[weakSelf startReadWithHandler:^(grpc_byte_buffer *message) {
__weak GRPCCall *weakSelf = self;
__weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable;
[self startReadWithHandler:^(grpc_byte_buffer *message) {
__strong GRPCCall *strongSelf = weakSelf;
__strong GRXConcurrentWriteable *strongWriteable = weakWriteable;
if (message == NULL) {
// No more messages from the server
return;
@ -266,14 +287,14 @@ static NSString * const kBearerPrefix = @"Bearer ";
// don't want to throw, because the app shouldn't crash for a behavior
// that's on the hands of any server to have. Instead we finish and ask
// the server to cancel.
[weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeResourceExhausted
userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]];
[weakSelf cancelCall];
[strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeResourceExhausted
userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]];
[strongSelf cancelCall];
return;
}
[weakWriteable enqueueValue:data completionHandler:^{
[weakSelf startNextRead];
[strongWriteable enqueueValue:data completionHandler:^{
[strongSelf startNextRead];
}];
}];
});
@ -333,12 +354,17 @@ static NSString * const kBearerPrefix = @"Bearer ";
_requestWriter.state = GRXWriterStatePaused;
}
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
[weakSelf writeMessage:value withErrorHandler:^{
[weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
__weak GRPCCall *weakSelf = self;
[self writeMessage:value withErrorHandler:^{
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf != nil) {
[strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
// Wrapped call must be canceled when error is reported to upper layers
[strongSelf cancelCall];
}
}];
});
}
@ -360,12 +386,15 @@ static NSString * const kBearerPrefix = @"Bearer ";
if (errorOrNil) {
[self cancel];
} else {
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
[weakSelf finishRequestWithErrorHandler:^{
[weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
__weak GRPCCall *weakSelf = self;
[self finishRequestWithErrorHandler:^{
__strong GRPCCall *strongSelf = weakSelf;
[strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
// Wrapped call must be canceled when error is reported to upper layers
[strongSelf cancelCall];
}];
});
}
@ -387,30 +416,37 @@ static NSString * const kBearerPrefix = @"Bearer ";
}
- (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
// Response headers received.
self.responseHeaders = headers;
[self startNextRead];
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseHeaders = headers;
[strongSelf startNextRead];
}
} completionHandler:^(NSError *error, NSDictionary *trailers) {
self.responseTrailers = trailers;
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseTrailers = trailers;
if (error) {
NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
if (error.userInfo) {
[userInfo addEntriesFromDictionary:error.userInfo];
}
userInfo[kGRPCTrailersKey] = self.responseTrailers;
// TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
// called before this one, so an error might end up with trailers but no headers. We
// shouldn't call finishWithError until ater both blocks are called. It is also when this is
// done that we can provide a merged view of response headers and trailers in a thread-safe
// way.
if (self.responseHeaders) {
userInfo[kGRPCHeadersKey] = self.responseHeaders;
if (error) {
NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
if (error.userInfo) {
[userInfo addEntriesFromDictionary:error.userInfo];
}
userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
// TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
// called before this one, so an error might end up with trailers but no headers. We
// shouldn't call finishWithError until ater both blocks are called. It is also when this is
// done that we can provide a merged view of response headers and trailers in a thread-safe
// way.
if (strongSelf.responseHeaders) {
userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
}
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
}
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
[strongSelf maybeFinishWithError:error];
}
[self finishWithError:error];
}];
// Now that the RPC has been initiated, request writes can start.
@synchronized(_requestWriter) {
@ -439,16 +475,8 @@ static NSString * const kBearerPrefix = @"Bearer ";
// TODO(jcanizales): Check this on init.
[NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host];
}
_connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host];
__weak typeof(self) weakSelf = self;
void (^handler)(void) = ^{
typeof(self) strongSelf = weakSelf;
[strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeUnavailable
userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]];
};
[_connectivityMonitor handleLossWithHandler:handler
wifiStatusChangeHandler:nil];
[GRPCConnectivityMonitor registerObserver:self
selector:@selector(connectivityChanged:)];
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
@ -512,4 +540,12 @@ static NSString * const kBearerPrefix = @"Bearer ";
}
}
- (void)connectivityChanged:(NSNotification *)note {
[self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeUnavailable
userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]];
// Cancel underlying call upon this notification
[self cancelCall];
}
@end

@ -19,44 +19,30 @@
#import <Foundation/Foundation.h>
#import <SystemConfiguration/SystemConfiguration.h>
@interface GRPCReachabilityFlags : NSObject
+ (nonnull instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags;
/**
* One accessor method to query each of the different flags. Example:
@property(nonatomic, readonly) BOOL isCell;
*/
#define GRPC_XMACRO_ITEM(methodName, FlagName) \
@property(nonatomic, readonly) BOOL methodName;
#include "GRPCReachabilityFlagNames.xmacro.h"
#undef GRPC_XMACRO_ITEM
@property(nonatomic, readonly) BOOL isHostReachable;
@end
typedef NS_ENUM(NSInteger, GRPCConnectivityStatus) {
GRPCConnectivityUnknown = 0,
GRPCConnectivityNoNetwork = 1,
GRPCConnectivityCellular = 2,
GRPCConnectivityWiFi = 3,
};
extern NSString * _Nonnull kGRPCConnectivityNotification;
// This interface monitors OS reachability interface for any network status
// change. Parties interested in these events should register themselves as
// observer.
@interface GRPCConnectivityMonitor : NSObject
+ (nullable instancetype)monitorWithHost:(nonnull NSString *)hostName;
- (nonnull instancetype)init NS_UNAVAILABLE;
/**
* Queue on which callbacks will be dispatched. Default is the main queue. Set it before calling
* handleLossWithHandler:.
*/
// TODO(jcanizales): Default to a serial background queue instead.
@property(nonatomic, strong, null_resettable) dispatch_queue_t queue;
/**
* Calls handler every time the connectivity to this instance's host is lost. If this instance is
* released before that happens, the handler won't be called.
* Only one handler is active at a time, so if this method is called again before the previous
* handler has been called, it might never be called at all (or yes, if it has already been queued).
*/
- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler
wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler;
// Register an object as observer of network status change. \a observer
// must have a notification method with one parameter of type
// (NSNotification *) and should pass it to parameter \a selector. The
// parameter of this notification method is not used for now.
+ (void)registerObserver:(_Nonnull id)observer
selector:(_Nonnull SEL)selector;
// Ungegister an object from observers of network status change.
+ (void)unregisterObserver:(_Nonnull id)observer;
@end

@ -18,175 +18,74 @@
#import "GRPCConnectivityMonitor.h"
#pragma mark Flags
#include <netinet/in.h>
@implementation GRPCReachabilityFlags {
SCNetworkReachabilityFlags _flags;
}
NSString *kGRPCConnectivityNotification = @"kGRPCConnectivityNotification";
+ (instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags {
return [[self alloc] initWithFlags:flags];
}
static SCNetworkReachabilityRef reachability;
static GRPCConnectivityStatus currentStatus;
- (instancetype)initWithFlags:(SCNetworkReachabilityFlags)flags {
if ((self = [super init])) {
_flags = flags;
// Aggregate information in flags into network status.
GRPCConnectivityStatus CalculateConnectivityStatus(SCNetworkReachabilityFlags flags) {
GRPCConnectivityStatus result = GRPCConnectivityUnknown;
if (((flags & kSCNetworkReachabilityFlagsReachable) == 0) ||
((flags & kSCNetworkReachabilityFlagsConnectionRequired) != 0)) {
return GRPCConnectivityNoNetwork;
}
return self;
}
/*
* One accessor method implementation per flag. Example:
- (BOOL)isCell { \
return !!(_flags & kSCNetworkReachabilityFlagsIsWWAN); \
}
*/
#define GRPC_XMACRO_ITEM(methodName, FlagName) \
- (BOOL)methodName { \
return !!(_flags & kSCNetworkReachabilityFlags ## FlagName); \
}
#include "GRPCReachabilityFlagNames.xmacro.h"
#undef GRPC_XMACRO_ITEM
- (BOOL)isHostReachable {
// Note: connectionOnDemand means it'll be reachable only if using the CFSocketStream API or APIs
// on top of it.
// connectionRequired means we can't tell until a connection is attempted (e.g. for VPN on
// demand).
return self.reachable && !self.interventionRequired && !self.connectionOnDemand;
}
- (NSString *)description {
NSMutableArray *activeOptions = [NSMutableArray arrayWithCapacity:9];
/*
* For each flag, add its name to the array if it's ON. Example:
if (self.isCell) {
[activeOptions addObject:@"isCell"];
result = GRPCConnectivityWiFi;
#if TARGET_OS_IPHONE
if (flags & kSCNetworkReachabilityFlagsIsWWAN) {
return result = GRPCConnectivityCellular;
}
*/
#define GRPC_XMACRO_ITEM(methodName, FlagName) \
if (self.methodName) { \
[activeOptions addObject:@ #methodName]; \
}
#include "GRPCReachabilityFlagNames.xmacro.h"
#undef GRPC_XMACRO_ITEM
return activeOptions.count == 0 ? @"(none)" : [activeOptions componentsJoinedByString:@", "];
}
- (BOOL)isEqual:(id)object {
return [object isKindOfClass:[GRPCReachabilityFlags class]] &&
_flags == ((GRPCReachabilityFlags *)object)->_flags;
}
- (NSUInteger)hash {
return _flags;
}
@end
#pragma mark Connectivity Monitor
// Assumes the third argument is a block that accepts a GRPCReachabilityFlags object, and passes the
// received ones to it.
static void PassFlagsToContextInfoBlock(SCNetworkReachabilityRef target,
SCNetworkReachabilityFlags flags,
void *info) {
#pragma unused (target)
// This can be called many times with the same info. The info is retained by SCNetworkReachability
// while this function is being executed.
void (^handler)(GRPCReachabilityFlags *) = (__bridge void (^)(GRPCReachabilityFlags *))info;
handler([[GRPCReachabilityFlags alloc] initWithFlags:flags]);
#endif
return result;
}
@implementation GRPCConnectivityMonitor {
SCNetworkReachabilityRef _reachabilityRef;
GRPCReachabilityFlags *_previousReachabilityFlags;
}
static void ReachabilityCallback(
SCNetworkReachabilityRef target, SCNetworkReachabilityFlags flags, void* info) {
GRPCConnectivityStatus newStatus = CalculateConnectivityStatus(flags);
- (nullable instancetype)initWithReachability:(nullable SCNetworkReachabilityRef)reachability {
if (!reachability) {
return nil;
if (newStatus != currentStatus) {
[[NSNotificationCenter defaultCenter] postNotificationName:kGRPCConnectivityNotification
object:nil];
currentStatus = newStatus;
}
if ((self = [super init])) {
_reachabilityRef = CFRetain(reachability);
_queue = dispatch_get_main_queue();
_previousReachabilityFlags = nil;
}
return self;
}
+ (nullable instancetype)monitorWithHost:(nonnull NSString *)host {
const char *hostName = host.UTF8String;
if (!hostName) {
[NSException raise:NSInvalidArgumentException
format:@"host.UTF8String returns NULL for %@", host];
}
SCNetworkReachabilityRef reachability =
SCNetworkReachabilityCreateWithName(NULL, hostName);
@implementation GRPCConnectivityMonitor
GRPCConnectivityMonitor *returnValue = [[self alloc] initWithReachability:reachability];
if (reachability) {
CFRelease(reachability);
}
return returnValue;
}
+ (void)initialize {
if (self == [GRPCConnectivityMonitor self]) {
struct sockaddr_in addr = {0};
addr.sin_len = sizeof(addr);
addr.sin_family = AF_INET;
reachability = SCNetworkReachabilityCreateWithAddress(NULL, (struct sockaddr *)&addr);
currentStatus = GRPCConnectivityUnknown;
- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler
wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler {
__weak typeof(self) weakSelf = self;
[self startListeningWithHandler:^(GRPCReachabilityFlags *flags) {
typeof(self) strongSelf = weakSelf;
if (strongSelf) {
if (lossHandler && !flags.reachable) {
lossHandler();
#if TARGET_OS_IPHONE
} else if (wifiStatusChangeHandler &&
strongSelf->_previousReachabilityFlags &&
(flags.isWWAN ^
strongSelf->_previousReachabilityFlags.isWWAN)) {
wifiStatusChangeHandler();
#endif
}
strongSelf->_previousReachabilityFlags = flags;
SCNetworkConnectionFlags flags;
if (SCNetworkReachabilityGetFlags(reachability, &flags)) {
currentStatus = CalculateConnectivityStatus(flags);
}
}];
}
- (void)startListeningWithHandler:(void (^)(GRPCReachabilityFlags *))handler {
// Copy to ensure the handler block is in the heap (and so can't be deallocated when this method
// returns).
void (^copiedHandler)(GRPCReachabilityFlags *) = [handler copy];
SCNetworkReachabilityContext context = {
.version = 0,
.info = (__bridge void *)copiedHandler,
.retain = CFRetain,
.release = CFRelease,
};
// The following will retain context.info, and release it when the callback is set to NULL.
SCNetworkReachabilitySetCallback(_reachabilityRef, PassFlagsToContextInfoBlock, &context);
SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, _queue);
}
- (void)stopListening {
// This releases the block on context.info.
SCNetworkReachabilitySetCallback(_reachabilityRef, NULL, NULL);
SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, NULL);
SCNetworkReachabilityContext context = {0, (__bridge void *)(self), NULL, NULL, NULL};
if (!SCNetworkReachabilitySetCallback(reachability, ReachabilityCallback, &context) ||
!SCNetworkReachabilityScheduleWithRunLoop(
reachability, CFRunLoopGetMain(), kCFRunLoopCommonModes)) {
NSLog(@"gRPC connectivity monitor fail to set");
}
}
}
- (void)setQueue:(dispatch_queue_t)queue {
_queue = queue ?: dispatch_get_main_queue();
+ (void)registerObserver:(_Nonnull id)observer
selector:(SEL)selector {
[[NSNotificationCenter defaultCenter] addObserver:observer
selector:selector
name:kGRPCConnectivityNotification
object:nil];
}
- (void)dealloc {
if (_reachabilityRef) {
[self stopListening];
CFRelease(_reachabilityRef);
}
+ (void)unregisterObserver:(_Nonnull id)observer {
[[NSNotificationCenter defaultCenter] removeObserver:observer];
}
@end

@ -37,12 +37,6 @@ NS_ASSUME_NONNULL_BEGIN
static NSMutableDictionary *kHostCache;
// This connectivity monitor flushes the host cache when connectivity status
// changes or when connection switch between Wifi and Cellular data, so that a
// new call will use a new channel. Otherwise, a new call will still use the
// cached channel which is no longer available and will cause gRPC to hang.
static GRPCConnectivityMonitor *connectivityMonitor = nil;
@implementation GRPCHost {
// TODO(mlumish): Investigate whether caching channels with strong links is a good idea.
GRPCChannel *_channel;
@ -90,17 +84,7 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil;
kHostCache[address] = self;
_compressAlgorithm = GRPC_COMPRESS_NONE;
}
// Keep a single monitor to flush the cache if the connectivity status changes
// Thread safety guarded by @synchronized(kHostCache)
if (!connectivityMonitor) {
connectivityMonitor =
[GRPCConnectivityMonitor monitorWithHost:hostURL.host];
void (^handler)(void) = ^{
[GRPCHost flushChannelCache];
};
[connectivityMonitor handleLossWithHandler:handler
wifiStatusChangeHandler:handler];
}
[GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChange:)];
}
return self;
}
@ -281,6 +265,13 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil;
}
}
// Flushes the host cache when connectivity status changes or when connection switch between Wifi
// and Cellular data, so that a new call will use a new channel. Otherwise, a new call will still
// use the cached channel which is no longer available and will cause gRPC to hang.
- (void)connectivityChange:(NSNotification *)note {
[GRPCHost flushChannelCache];
}
@end
NS_ASSUME_NONNULL_END

Loading…
Cancel
Save