Merge pull request #2499 from jcanizales/move-delegate-wrapper-to-grx

Move GRPCDelegateWrapper to GRXConcurrentWriteable
pull/2427/head^2
Michael Lumish 10 years ago
commit b69d4c05ba
  1. 32
      src/objective-c/GRPCClient/GRPCCall.m
  2. 52
      src/objective-c/RxLibrary/GRXConcurrentWriteable.h
  3. 50
      src/objective-c/RxLibrary/GRXConcurrentWriteable.m

@ -35,10 +35,10 @@
#include <grpc/grpc.h>
#include <grpc/support/time.h>
#import <RxLibrary/GRXConcurrentWriteable.h>
#import "private/GRPCChannel.h"
#import "private/GRPCCompletionQueue.h"
#import "private/GRPCDelegateWrapper.h"
#import "private/GRPCWrappedCall.h"
#import "private/NSData+GRPC.h"
#import "private/NSDictionary+GRPC.h"
@ -78,9 +78,13 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// do. Particularly, in the face of errors, there's no ordering guarantee at
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRPCDelegateWrapper *_responseWriteable;
GRXConcurrentWriteable *_responseWriteable;
GRXWriter *_requestWriter;
// To create a retain cycle when a call is started, up until it finishes. See
// |startWithWriteable:| and |finishWithError:|.
GRPCCall *_self;
NSMutableDictionary *_requestMetadata;
NSMutableDictionary *_responseMetadata;
}
@ -143,8 +147,13 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
#pragma mark Finish
- (void)finishWithError:(NSError *)errorOrNil {
// If the call isn't retained anywhere else, it can be deallocated now.
_self = nil;
// If there were still request messages coming, stop them.
_requestWriter.state = GRXWriterStateFinished;
_requestWriter = nil;
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
} else {
@ -191,7 +200,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
return;
}
__weak GRPCCall *weakSelf = self;
__weak GRPCDelegateWrapper *weakWriteable = _responseWriteable;
__weak GRXConcurrentWriteable *weakWriteable = _responseWriteable;
dispatch_async(_callQueue, ^{
[weakSelf startReadWithHandler:^(grpc_byte_buffer *message) {
@ -216,7 +225,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
[weakSelf cancelCall];
return;
}
[weakWriteable enqueueMessage:data completionHandler:^{
[weakWriteable enqueueValue:data completionHandler:^{
[weakSelf startNextRead];
}];
}];
@ -276,6 +285,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
_requestWriter = nil;
if (errorOrNil) {
[self cancel];
} else {
@ -335,12 +345,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
#pragma mark GRXWriter implementation
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
// The following produces a retain cycle self:_responseWriteable:self, which is only
// broken when writesFinishedWithError: is sent to the wrapped writeable.
// Care is taken not to retain self strongly in any of the blocks used in
// the implementation of GRPCCall, so that the life of the instance is
// determined by this retain cycle.
_responseWriteable = [[GRPCDelegateWrapper alloc] initWithWriteable:writeable writer:self];
// Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
// This makes RPCs in which the call isn't externally retained possible (as long as it is started
// before being autoreleased).
// Care is taken not to retain self strongly in any of the blocks used in this implementation, so
// that the life of the instance is determined by this retain cycle.
_self = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
[self sendHeaders:_requestMetadata];
[self invokeCall];
}

@ -33,49 +33,39 @@
#import <Foundation/Foundation.h>
#import <RxLibrary/GRXWriter.h>
#import "GRXWriter.h"
#import "GRXWriteable.h"
@protocol GRXWriteable;
// This is a thread-safe wrapper over a GRXWriteable instance. It lets one
// enqueue calls to a GRXWriteable instance for the main thread, guaranteeing
// that writesFinishedWithError: is the last message sent to it (no matter what
// messages are sent to the wrapper, in what order, nor from which thread). It
// also guarantees that, if cancelWithError: is called from the main thread
// (e.g. by the app cancelling the writes), no further messages are sent to the
// writeable except writesFinishedWithError:.
// This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a
// GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last
// message sent to it (no matter what messages are sent to the wrapper, in what order, nor from
// which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g.
// by the app cancelling the writes), no further messages are sent to the writeable except
// writesFinishedWithError:.
//
// TODO(jcanizales): Let the user specify another queue for the writeable
// callbacks.
// TODO(jcanizales): Rename to GRXWriteableWrapper and move to the Rx library.
@interface GRPCDelegateWrapper : NSObject
// TODO(jcanizales): Let the user specify another queue for the writeable callbacks.
@interface GRXConcurrentWriteable : NSObject
// The GRXWriteable passed is the wrapped writeable.
// Both the GRXWriter instance and the GRXWriteable instance are retained until
// writesFinishedWithError: is sent to the writeable, and released after that.
// This is used to create a retain cycle that keeps both objects alive until the
// writing is explicitly finished.
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(GRXWriter *)writer
NS_DESIGNATED_INITIALIZER;
// The GRXWriteable instance is retained until writesFinishedWithError: is sent to it, and released
// after that.
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable NS_DESIGNATED_INITIALIZER;
// Enqueues writeValue: to be sent to the writeable in the main thread.
// The passed handler is invoked from the main thread after writeValue: returns.
- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler;
- (void)enqueueValue:(id)value completionHandler:(void (^)())handler;
// Enqueues writesFinishedWithError:nil to be sent to the writeable in the main
// thread. After that message is sent to the writeable, all other methods of
// this object are effectively noops.
// Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that
// message is sent to the writeable, all other methods of this object are effectively noops.
- (void)enqueueSuccessfulCompletion;
// If the writeable has not yet received a writesFinishedWithError: message, this
// will enqueue one to be sent to it in the main thread, and cancel all other
// pending messages to the writeable enqueued by this object (both past and
// future).
// If the writeable has not yet received a writesFinishedWithError: message, this will enqueue one
// to be sent to it in the main thread, and cancel all other pending messages to the writeable
// enqueued by this object (both past and future).
// The error argument cannot be nil.
- (void)cancelWithError:(NSError *)error;
// Cancels all pending messages to the writeable enqueued by this object (both
// past and future). Because the writeable won't receive writesFinishedWithError:,
// this also releases the writeable and the writer.
// Cancels all pending messages to the writeable enqueued by this object (both past and future).
// Because the writeable won't receive writesFinishedWithError:, this also releases the writeable.
- (void)cancelSilently;
@end

@ -31,45 +31,42 @@
*
*/
#import "GRPCDelegateWrapper.h"
#import "GRXConcurrentWriteable.h"
#import <RxLibrary/GRXWriteable.h>
@interface GRPCDelegateWrapper ()
// These are atomic so that cancellation can nillify them from any thread.
@interface GRXConcurrentWriteable ()
// This is atomic so that cancellation can nillify it from any thread.
@property(atomic, strong) id<GRXWriteable> writeable;
@property(atomic, strong) GRXWriter *writer;
@end
@implementation GRPCDelegateWrapper {
@implementation GRXConcurrentWriteable {
dispatch_queue_t _writeableQueue;
// This ensures that writesFinishedWithError: is only sent once to the writeable.
dispatch_once_t _alreadyFinished;
}
- (instancetype)init {
return [self initWithWriteable:nil writer:nil];
return [self initWithWriteable:nil];
}
// Designated initializer
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(GRXWriter *)writer {
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable {
if (self = [super init]) {
_writeableQueue = dispatch_get_main_queue();
_writeable = writeable;
_writer = writer;
}
return self;
}
- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler {
- (void)enqueueValue:(id)value completionHandler:(void (^)())handler {
dispatch_async(_writeableQueue, ^{
// We're racing a possible cancellation performed by another thread. To turn
// all already-enqueued messages into noops, cancellation nillifies the
// writeable property. If we get it before it's nil, we won
// the race.
// We're racing a possible cancellation performed by another thread. To turn all already-
// enqueued messages into noops, cancellation nillifies the writeable property. If we get it
// before it's nil, we won the race.
id<GRXWriteable> writeable = self.writeable;
if (writeable) {
[writeable writeValue:message];
[writeable writeValue:value];
handler();
}
});
@ -78,13 +75,11 @@
- (void)enqueueSuccessfulCompletion {
dispatch_async(_writeableQueue, ^{
dispatch_once(&_alreadyFinished, ^{
// Cancellation is now impossible. None of the other three blocks can run
// concurrently with this one.
// Cancellation is now impossible. None of the other three blocks can run concurrently with
// this one.
[self.writeable writesFinishedWithError:nil];
// Break the retain cycle with writer, and skip any possible message to the
// wrapped writeable enqueued after this one.
// Skip any possible message to the wrapped writeable enqueued after this one.
self.writeable = nil;
self.writer = nil;
});
});
}
@ -92,29 +87,24 @@
- (void)cancelWithError:(NSError *)error {
NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
dispatch_once(&_alreadyFinished, ^{
// Skip any of the still-enqueued messages to the wrapped writeable. We use
// the atomic setter to nillify writer and writeable because we might be
// running concurrently with the blocks in _writeableQueue, and assignment
// with ARC isn't atomic.
// Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
// nillify writeable because we might be running concurrently with the blocks in
// _writeableQueue, and assignment with ARC isn't atomic.
id<GRXWriteable> writeable = self.writeable;
self.writeable = nil;
dispatch_async(_writeableQueue, ^{
[writeable writesFinishedWithError:error];
// Break the retain cycle with writer.
self.writer = nil;
});
});
}
- (void)cancelSilently {
dispatch_once(&_alreadyFinished, ^{
// Skip any of the still-enqueued messages to the wrapped writeable. We use
// the atomic setter to nillify writer and writeable because we might be
// running concurrently with the blocks in _writeableQueue, and assignment
// with ARC isn't atomic.
// Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
// nillify writeable because we might be running concurrently with the blocks in
// _writeableQueue, and assignment with ARC isn't atomic.
self.writeable = nil;
self.writer = nil;
});
}
@end
Loading…
Cancel
Save