Merge pull request #2861 from jcanizales/fix-race-condition

Fix race condition in GRPCCall
pull/2867/head
Jorge Canizales 9 years ago
commit 7a75936001
  1. 41
      src/objective-c/GRPCClient/GRPCCall.m
  2. 9
      src/objective-c/RxLibrary/GRXBufferedPipe.h
  3. 10
      src/objective-c/RxLibrary/GRXForwardingWriter.h
  4. 6
      src/objective-c/RxLibrary/GRXForwardingWriter.m
  5. 13
      src/objective-c/RxLibrary/GRXImmediateWriter.h
  6. 91
      src/objective-c/RxLibrary/GRXWriter.h

@ -74,11 +74,20 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRXConcurrentWriteable *_responseWriteable;
// The network thread wants the requestWriter to resume (when the server is ready for more input),
// or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
// it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
// We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
// writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
// pause the writer immediately on writeValue:, so we need our locking to be recursive.
GRXWriter *_requestWriter;
// To create a retain cycle when a call is started, up until it finishes. See
// |startWithWriteable:| and |finishWithError:|.
GRPCCall *_self;
// |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
// reference to the call object if all they're interested in is the handler being executed when
// the response arrives.
GRPCCall *_retainSelf;
NSMutableDictionary *_requestMetadata;
NSMutableDictionary *_responseMetadata;
@ -136,11 +145,12 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
- (void)finishWithError:(NSError *)errorOrNil {
// If the call isn't retained anywhere else, it can be deallocated now.
_self = nil;
_retainSelf = nil;
// If there were still request messages coming, stop them.
_requestWriter.state = GRXWriterStateFinished;
_requestWriter = nil;
@synchronized(_requestWriter) {
_requestWriter.state = GRXWriterStateFinished;
}
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
@ -240,12 +250,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf->_requestWriter.state = GRXWriterStateStarted;
@synchronized(strongSelf->_requestWriter) {
strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
}
};
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc]
initWithMessage:message
handler:resumingHandler]] errorHandler:errorHandler];
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
handler:resumingHandler]]
errorHandler:errorHandler];
}
- (void)writeValue:(id)value {
@ -253,7 +265,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
_requestWriter.state = GRXWriterStatePaused;
@synchronized(_requestWriter) {
_requestWriter.state = GRXWriterStatePaused;
}
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
@ -273,7 +287,6 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
_requestWriter = nil;
if (errorOrNil) {
[self cancel];
} else {
@ -327,7 +340,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
}];
// Now that the RPC has been initiated, request writes can start.
[_requestWriter startWithWriteable:self];
@synchronized(_requestWriter) {
[_requestWriter startWithWriteable:self];
}
}
#pragma mark GRXWriter implementation
@ -338,7 +353,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// before being autoreleased).
// Care is taken not to retain self strongly in any of the blocks used in this implementation, so
// that the life of the instance is determined by this retain cycle.
_self = self;
_retainSelf = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
[self sendHeaders:_requestMetadata];

@ -36,13 +36,11 @@
#import "GRXWriteable.h"
#import "GRXWriter.h"
// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
// to -startWithWriteable:).
// A buffered pipe is a Writer that also acts as a Writeable.
// Once it is started, whatever values are written into it (via -writeValue:) will be propagated
// immediately, unless flow control prevents it.
// If it is throttled and keeps receiving values, as well as if it receives values before being
// started, it will buffer them and propagate them in order as soon as its state becomes
// GRXWriterStateStarted.
// started, it will buffer them and propagate them in order as soon as its state becomes Started.
// If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
// propagate the error immediately.
//
@ -51,6 +49,9 @@
// pipe will keep buffering all data written to it, your application could run out of memory and
// crash. If you want to react to flow control signals to prevent that, instead of using this class
// you can implement an object that conforms to GRXWriter.
//
// Thread-safety:
// The methods of an object of this class should not be called concurrently from different threads.
@interface GRXBufferedPipe : GRXWriter<GRXWriteable>
// Convenience constructor.

@ -33,11 +33,17 @@
#import "GRXWriter.h"
// A "proxy" class that simply forwards values, completion, and errors from its
// input writer to its writeable.
// A "proxy" class that simply forwards values, completion, and errors from its input writer to its
// writeable.
// It is useful as a superclass for pipes that act as a transformation of their
// input writer, and for classes that represent objects with input and
// output sequences of values, like an RPC.
//
// Thread-safety:
// All messages sent to this object need to be serialized. When it is started, the writer it wraps
// is started in the same thread. Manual state changes are propagated to the wrapped writer in the
// same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be
// serialized with any message sent to this object.
@interface GRXForwardingWriter : GRXWriter
- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;
@end

@ -48,7 +48,11 @@
// Designated initializer
- (instancetype)initWithWriter:(GRXWriter *)writer {
if (!writer) {
[NSException raise:NSInvalidArgumentException format:@"writer can't be nil."];
return nil;
}
if (writer.state != GRXWriterStateNotStarted) {
[NSException raise:NSInvalidArgumentException
format:@"The writer argument must not have already started."];
}
if ((self = [super init])) {
_writer = writer;

@ -36,10 +36,17 @@
#import "GRXWriter.h"
// Utility to construct GRXWriter instances from values that are immediately available when
// required. The returned writers all support pausing and early termination.
// required.
//
// Unless the writeable callback pauses them or stops them early, these writers will do all their
// interactions with the writeable before the start method returns.
// Thread-safety:
//
// An object of this class shouldn't be messaged concurrently by more than one thread. It will start
// messaging the writeable before |startWithWriteable:| returns, in the same thread. That is the
// only place where the writer can be paused or stopped prematurely.
//
// If a paused writer of this class is resumed, it will start messaging the writeable, in the same
// thread, before |setState:| returns. Because the object can't be legally accessed concurrently,
// that's the only place where it can be paused again (or stopped).
@interface GRXImmediateWriter : GRXWriter
// Returns a writer that pulls values from the passed NSEnumerator instance and pushes them to

@ -35,84 +35,73 @@
#import "GRXWriteable.h"
// States of a writer.
typedef NS_ENUM(NSInteger, GRXWriterState) {
// The writer has not yet been given a writeable to which it can push its
// values. To have an writer transition to the Started state, send it a
// startWithWriteable: message.
// The writer has not yet been given a writeable to which it can push its values. To have a writer
// transition to the Started state, send it a startWithWriteable: message.
//
// An writer's state cannot be manually set to this value.
// A writer's state cannot be manually set to this value.
GRXWriterStateNotStarted,
// The writer might push values to the writeable at any moment.
GRXWriterStateStarted,
// The writer is temporarily paused, and won't send any more values to the
// writeable unless its state is set back to Started. The writer might still
// transition to the Finished state at any moment, and is allowed to send
// writesFinishedWithError: to its writeable.
//
// Not all implementations of writer have to support pausing, and thus
// trying to set an writer's state to this value might have no effect.
// The writer is temporarily paused, and won't send any more values to the writeable unless its
// state is set back to Started. The writer might still transition to the Finished state at any
// moment, and is allowed to send writesFinishedWithError: to its writeable.
GRXWriterStatePaused,
// The writer has released its writeable and won't interact with it anymore.
//
// One seldomly wants to set an writer's state to this value, as its
// writeable isn't notified with a writesFinishedWithError: message. Instead, sending
// finishWithError: to the writer will make it notify the writeable and then
// transition to this state.
// One seldomly wants to set a writer's state to this value, as its writeable isn't notified with
// a writesFinishedWithError: message. Instead, sending finishWithError: to the writer will make
// it notify the writeable and then transition to this state.
GRXWriterStateFinished
};
// An object that conforms to this protocol can produce, on demand, a sequence
// of values. The sequence may be produced asynchronously, and it may consist of
// any number of elements, including none or an infinite number.
// An GRXWriter object can produce, on demand, a sequence of values. The sequence may be produced
// asynchronously, and it may consist of any number of elements, including none or an infinite
// number.
//
// GRXWriter is the active dual of NSEnumerator. The difference between them is thus whether the
// object plays an active or passive role during usage: A user of NSEnumerator pulls values off it,
// and passes the values to a writeable. A user of GRXWriter, though, just gives it a writeable, and
// the GRXWriter instance pushes values to the writeable. This makes this protocol suitable to
// represent a sequence of future values, as well as collections with internal iteration.
//
// GRXWriter is the active dual of NSEnumerator. The difference between them
// is thus whether the object plays an active or passive role during usage: A
// user of NSEnumerator pulls values off it, and passes the values to a writeable.
// A user of GRXWriter, though, just gives it a writeable, and the
// GRXWriter instance pushes values to the writeable. This makes this protocol
// suitable to represent a sequence of future values, as well as collections
// with internal iteration.
// An instance of GRXWriter can start producing values after a writeable is passed to it. It can
// also be commanded to finish the sequence immediately (with an optional error). Finally, it can be
// asked to pause, and resumed later. All GRXWriter objects support pausing and early termination.
//
// An instance of GRXWriter can start producing values after a writeable is
// passed to it. It can also be commanded to finish the sequence immediately
// (with an optional error). Finally, it can be asked to pause, but the
// conforming instance is not required to oblige.
// Thread-safety:
//
// Unless otherwise indicated by a conforming class, no messages should be sent
// concurrently to a GRXWriter. I.e., conforming classes aren't required to
// be thread-safe.
// State transitions take immediate effect if the object is used from a single thread. Subclasses
// might offer stronger guarantees.
//
// Unless otherwise indicated by a conforming subclass, no messages should be sent concurrently to a
// GRXWriter. I.e., conforming classes aren't required to be thread-safe.
@interface GRXWriter : NSObject
// This property can be used to query the current state of the writer, which
// determines how it might currently use its writeable. Some state transitions can
// be triggered by setting this property to the corresponding value, and that's
// useful for advanced use cases like pausing an writer. For more details,
// see the documentation of the enum.
// This property can be used to query the current state of the writer, which determines how it might
// currently use its writeable. Some state transitions can be triggered by setting this property to
// the corresponding value, and that's useful for advanced use cases like pausing an writer. For
// more details, see the documentation of the enum further down.
@property(nonatomic) GRXWriterState state;
// Start sending messages to the writeable. Messages may be sent before the method
// returns, or they may be sent later in the future. See GRXWriteable.h for the
// different messages a writeable can receive.
// Transition to the Started state, and start sending messages to the writeable (a reference to it
// is retained). Messages to the writeable may be sent before the method returns, or they may be
// sent later in the future. See GRXWriteable.h for the different messages a writeable can receive.
//
// If this writer draws its values from an external source (e.g. from the
// filesystem or from a server), calling this method will commonly trigger side
// effects (like network connections).
// If this writer draws its values from an external source (e.g. from the filesystem or from a
// server), calling this method will commonly trigger side effects (like network connections).
//
// This method might only be called on writers in the NotStarted state.
- (void)startWithWriteable:(id<GRXWriteable>)writeable;
// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send
// any more messages to it.
//
// This method might only be called on writers in the Started or Paused
// state.
// Send writesFinishedWithError:errorOrNil to the writeable. Then release the reference to it and
// transition to the Finished state.
//
// TODO(jcanizales): Consider adding some guarantee about the immediacy of that
// stopping. I know I've relied on it in part of the code that uses this, but
// can't remember the details in the presence of concurrency.
// This method might only be called on writers in the Started or Paused state.
- (void)finishWithError:(NSError *)errorOrNil;
@end

Loading…
Cancel
Save