Revert "Do not issue more message when the call is canceled"

This reverts commit e13c867826.
pull/16190/head
Muxi Yan 7 years ago
parent 76ddfcb6cb
commit e39c146f0f
  1. 29
      src/objective-c/GRPCClient/GRPCCall.m
  2. 39
      src/objective-c/ProtoRPC/ProtoRPC.m

@ -104,11 +104,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
dispatch_queue_t _dispatchQueue; dispatch_queue_t _dispatchQueue;
/** Flags whether call has started. */ /** Flags whether call has started. */
BOOL _started; BOOL _started;
/**
* Flags that the call has been canceled. When this is true, pending initial metadata and message
* should not be issued to \a _handler. This ivar must be accessed with lock to self.
*/
BOOL _canceled;
} }
- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
@ -140,7 +135,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
} }
dispatch_set_target_queue(responseHandler.dispatchQueue, _dispatchQueue); dispatch_set_target_queue(responseHandler.dispatchQueue, _dispatchQueue);
_started = NO; _started = NO;
_canceled = NO;
} }
return self; return self;
@ -223,9 +217,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
self->_pipe = nil; self->_pipe = nil;
} }
if (self->_handler) { if (self->_handler) {
@synchronized(self) {
self->_canceled = YES;
}
id<GRPCResponseHandler> handler = self->_handler; id<GRPCResponseHandler> handler = self->_handler;
dispatch_async(handler.dispatchQueue, ^{ dispatch_async(handler.dispatchQueue, ^{
if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
@ -261,43 +252,24 @@ const char *kCFStreamVarName = "grpc_cfstream";
} }
- (void)issueInitialMetadata:(NSDictionary *)initialMetadata { - (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
if (_handler != nil && initialMetadata != nil) {
id<GRPCResponseHandler> handler = _handler; id<GRPCResponseHandler> handler = _handler;
if ([handler respondsToSelector:@selector(receivedInitialMetadata:)]) { if ([handler respondsToSelector:@selector(receivedInitialMetadata:)]) {
dispatch_async(handler.dispatchQueue, ^{ dispatch_async(handler.dispatchQueue, ^{
// Do not issue initial metadata if the call is already canceled.
__block BOOL canceled = NO;
@synchronized(self) {
canceled = self->_canceled;
}
if (!canceled) {
[handler receivedInitialMetadata:initialMetadata]; [handler receivedInitialMetadata:initialMetadata];
}
}); });
} }
} }
}
- (void)issueMessage:(id)message { - (void)issueMessage:(id)message {
if (_handler != nil && message != nil) {
id<GRPCResponseHandler> handler = _handler; id<GRPCResponseHandler> handler = _handler;
if ([handler respondsToSelector:@selector(receivedRawMessage:)]) { if ([handler respondsToSelector:@selector(receivedRawMessage:)]) {
dispatch_async(handler.dispatchQueue, ^{ dispatch_async(handler.dispatchQueue, ^{
// Do not issue message if the call is already canceled.
__block BOOL canceled = NO;
@synchronized(self) {
canceled = self->_canceled;
}
if (!canceled) {
[handler receivedRawMessage:message]; [handler receivedRawMessage:message];
}
}); });
} }
} }
}
- (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { - (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
if (_handler != nil) {
id<GRPCResponseHandler> handler = _handler; id<GRPCResponseHandler> handler = _handler;
NSDictionary *trailers = _call.responseTrailers; NSDictionary *trailers = _call.responseTrailers;
if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
@ -306,7 +278,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
}); });
} }
} }
}
@end @end

@ -84,11 +84,6 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
GRPCCall2 *_call; GRPCCall2 *_call;
dispatch_queue_t _dispatchQueue; dispatch_queue_t _dispatchQueue;
/**
* Flags that the call has been canceled. When this is true, pending initial metadata and message
* should not be issued to \a _handler. This ivar must be accessed with lock to self.
*/
BOOL _canceled;
} }
- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
@ -118,7 +113,6 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
_dispatchQueue = dispatch_queue_create(nil, DISPATCH_QUEUE_SERIAL); _dispatchQueue = dispatch_queue_create(nil, DISPATCH_QUEUE_SERIAL);
} }
dispatch_set_target_queue(handler.dispatchQueue, _dispatchQueue); dispatch_set_target_queue(handler.dispatchQueue, _dispatchQueue);
_canceled = NO;
[self start]; [self start];
} }
@ -134,15 +128,12 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
- (void)cancel { - (void)cancel {
dispatch_async(_dispatchQueue, ^{ dispatch_async(_dispatchQueue, ^{
if (self->_call) { if (_call) {
[self->_call cancel]; [_call cancel];
self->_call = nil; _call = nil;
}
if (self->_handler) {
@synchronized(self) {
self->_canceled = YES;
} }
id<GRPCProtoResponseHandler> handler = self->_handler; if (_handler) {
id<GRPCProtoResponseHandler> handler = _handler;
if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(handler.dispatchQueue, ^{ dispatch_async(handler.dispatchQueue, ^{
[handler closedWithTrailingMetadata:nil [handler closedWithTrailingMetadata:nil
@ -154,7 +145,7 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
}]]; }]];
}); });
} }
self->_handler = nil; _handler = nil;
} }
}); });
} }
@ -182,17 +173,10 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
- (void)receivedInitialMetadata:(NSDictionary *_Nullable)initialMetadata { - (void)receivedInitialMetadata:(NSDictionary *_Nullable)initialMetadata {
if (_handler && initialMetadata != nil) { if (_handler && initialMetadata != nil) {
__block id<GRPCResponseHandler> handler = _handler; id<GRPCProtoResponseHandler> handler = _handler;
if ([handler respondsToSelector:@selector(initialMetadata:)]) { if ([handler respondsToSelector:@selector(initialMetadata:)]) {
dispatch_async(handler.dispatchQueue, ^{ dispatch_async(handler.dispatchQueue, ^{
// Do not issue initial metadata if the call is already canceled.
__block BOOL canceled = NO;
@synchronized(self) {
canceled = self->_canceled;
}
if (!canceled) {
[handler receivedInitialMetadata:initialMetadata]; [handler receivedInitialMetadata:initialMetadata];
}
}); });
} }
} }
@ -200,20 +184,13 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
- (void)receivedRawMessage:(NSData *_Nullable)message { - (void)receivedRawMessage:(NSData *_Nullable)message {
if (_handler && message != nil) { if (_handler && message != nil) {
__block id<GRPCProtoResponseHandler> handler = _handler; id<GRPCProtoResponseHandler> handler = _handler;
NSError *error = nil; NSError *error = nil;
GPBMessage *parsed = [_responseClass parseFromData:message error:&error]; GPBMessage *parsed = [_responseClass parseFromData:message error:&error];
if (parsed) { if (parsed) {
if ([handler respondsToSelector:@selector(receivedProtoMessage:)]) { if ([handler respondsToSelector:@selector(receivedProtoMessage:)]) {
dispatch_async(handler.dispatchQueue, ^{ dispatch_async(handler.dispatchQueue, ^{
// Do not issue message if the call is already canceled.
__block BOOL canceled = NO;
@synchronized(self) {
canceled = self->_canceled;
}
if (!canceled) {
[handler receivedProtoMessage:parsed]; [handler receivedProtoMessage:parsed];
}
}); });
} }
} else { } else {

Loading…
Cancel
Save