Eliminate race in GRPCCall’s operation of the requests writer

pull/2861/head
Jorge Canizales 10 years ago
parent 67ce098ccf
commit 238ad7819f
  1. 35
      src/objective-c/GRPCClient/GRPCCall.m

@ -74,6 +74,13 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRXConcurrentWriteable *_responseWriteable;
// The network thread wants the requestWriter to resume (when the server is ready for more input),
// or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
// it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
// We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
// writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
// pause the writer immediately on writeValue:, so we need our locking to be recursive.
GRXWriter *_requestWriter;
// To create a retain cycle when a call is started, up until it finishes. See
@ -139,8 +146,10 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
_self = nil;
// If there were still request messages coming, stop them.
_requestWriter.state = GRXWriterStateFinished;
_requestWriter = nil;
@synchronized(_requestWriter) {
_requestWriter.state = GRXWriterStateFinished;
_requestWriter = nil;
}
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
@ -240,12 +249,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf->_requestWriter.state = GRXWriterStateStarted;
@synchronized(_requestWriter) {
strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
}
};
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc]
initWithMessage:message
handler:resumingHandler]] errorHandler:errorHandler];
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
handler:resumingHandler]]
errorHandler:errorHandler];
}
- (void)writeValue:(id)value {
@ -253,7 +264,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
_requestWriter.state = GRXWriterStatePaused;
@synchronized(_requestWriter) {
_requestWriter.state = GRXWriterStatePaused;
}
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
@ -273,7 +286,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
_requestWriter = nil;
@synchronized(_requestWriter) {
_requestWriter = nil;
}
if (errorOrNil) {
[self cancel];
} else {
@ -327,7 +342,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
}];
// Now that the RPC has been initiated, request writes can start.
[_requestWriter startWithWriteable:self];
@synchronized(_requestWriter) {
[_requestWriter startWithWriteable:self];
}
}
#pragma mark GRXWriter implementation

Loading…
Cancel
Save