Do not issue more message when the call is canceled

pull/16190/head
Muxi Yan 6 years ago
parent 3566540f16
commit e13c867826
  1. 61
      src/objective-c/GRPCClient/GRPCCall.m
  2. 43
      src/objective-c/ProtoRPC/ProtoRPC.m

@ -104,6 +104,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
dispatch_queue_t _dispatchQueue;
/** Flags whether call has started. */
BOOL _started;
/**
* Flags that the call has been canceled. When this is true, pending initial metadata and message
* should not be issued to \a _handler. This ivar must be accessed with lock to self.
*/
BOOL _canceled;
}
- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
@ -135,6 +140,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
dispatch_set_target_queue(responseHandler.dispatchQueue, _dispatchQueue);
_started = NO;
_canceled = NO;
}
return self;
@ -217,6 +223,9 @@ const char *kCFStreamVarName = "grpc_cfstream";
self->_pipe = nil;
}
if (self->_handler) {
@synchronized(self) {
self->_canceled = YES;
}
id<GRPCResponseHandler> handler = self->_handler;
dispatch_async(handler.dispatchQueue, ^{
if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
@ -252,30 +261,50 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
id<GRPCResponseHandler> handler = _handler;
if ([handler respondsToSelector:@selector(receivedInitialMetadata:)]) {
dispatch_async(handler.dispatchQueue, ^{
[handler receivedInitialMetadata:initialMetadata];
});
if (_handler != nil && initialMetadata != nil) {
id<GRPCResponseHandler> handler = _handler;
if ([handler respondsToSelector:@selector(receivedInitialMetadata:)]) {
dispatch_async(handler.dispatchQueue, ^{
// Do not issue initial metadata if the call is already canceled.
__block BOOL canceled = NO;
@synchronized(self) {
canceled = self->_canceled;
}
if (!canceled) {
[handler receivedInitialMetadata:initialMetadata];
}
});
}
}
}
- (void)issueMessage:(id)message {
id<GRPCResponseHandler> handler = _handler;
if ([handler respondsToSelector:@selector(receivedRawMessage:)]) {
dispatch_async(handler.dispatchQueue, ^{
[handler receivedRawMessage:message];
});
if (_handler != nil && message != nil) {
id<GRPCResponseHandler> handler = _handler;
if ([handler respondsToSelector:@selector(receivedRawMessage:)]) {
dispatch_async(handler.dispatchQueue, ^{
// Do not issue message if the call is already canceled.
__block BOOL canceled = NO;
@synchronized(self) {
canceled = self->_canceled;
}
if (!canceled) {
[handler receivedRawMessage:message];
}
});
}
}
}
- (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
id<GRPCResponseHandler> handler = _handler;
NSDictionary *trailers = _call.responseTrailers;
if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(handler.dispatchQueue, ^{
[handler closedWithTrailingMetadata:trailers error:error];
});
if (_handler != nil) {
id<GRPCResponseHandler> handler = _handler;
NSDictionary *trailers = _call.responseTrailers;
if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(handler.dispatchQueue, ^{
[handler closedWithTrailingMetadata:trailers error:error];
});
}
}
}

@ -66,6 +66,11 @@
GRPCCall2 *_call;
dispatch_queue_t _dispatchQueue;
/**
* Flags that the call has been canceled. When this is true, pending initial metadata and message
* should not be issued to \a _handler. This ivar must be accessed with lock to self.
*/
BOOL _canceled;
}
- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
@ -95,6 +100,7 @@
_dispatchQueue = dispatch_queue_create(nil, DISPATCH_QUEUE_SERIAL);
}
dispatch_set_target_queue(handler.dispatchQueue, _dispatchQueue);
_canceled = NO;
[self start];
}
@ -110,12 +116,15 @@
- (void)cancel {
dispatch_async(_dispatchQueue, ^{
if (_call) {
[_call cancel];
_call = nil;
if (self->_call) {
[self->_call cancel];
self->_call = nil;
}
if (_handler) {
id<GRPCProtoResponseHandler> handler = _handler;
if (self->_handler) {
@synchronized(self) {
self->_canceled = YES;
}
id<GRPCProtoResponseHandler> handler = self->_handler;
if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(handler.dispatchQueue, ^{
[handler closedWithTrailingMetadata:nil
@ -127,7 +136,7 @@
}]];
});
}
_handler = nil;
self->_handler = nil;
}
});
}
@ -155,10 +164,17 @@
- (void)receivedInitialMetadata:(NSDictionary *_Nullable)initialMetadata {
if (_handler && initialMetadata != nil) {
id<GRPCProtoResponseHandler> handler = _handler;
__block id<GRPCResponseHandler> handler = _handler;
if ([handler respondsToSelector:@selector(initialMetadata:)]) {
dispatch_async(handler.dispatchQueue, ^{
[handler receivedInitialMetadata:initialMetadata];
// Do not issue initial metadata if the call is already canceled.
__block BOOL canceled = NO;
@synchronized(self) {
canceled = self->_canceled;
}
if (!canceled) {
[handler receivedInitialMetadata:initialMetadata];
}
});
}
}
@ -166,13 +182,20 @@
- (void)receivedRawMessage:(NSData *_Nullable)message {
if (_handler && message != nil) {
id<GRPCProtoResponseHandler> handler = _handler;
__block id<GRPCProtoResponseHandler> handler = _handler;
NSError *error = nil;
GPBMessage *parsed = [_responseClass parseFromData:message error:&error];
if (parsed) {
if ([handler respondsToSelector:@selector(receivedProtoMessage:)]) {
dispatch_async(handler.dispatchQueue, ^{
[handler receivedProtoMessage:parsed];
// Do not issue message if the call is already canceled.
__block BOOL canceled = NO;
@synchronized(self) {
canceled = self->_canceled;
}
if (!canceled) {
[handler receivedProtoMessage:parsed];
}
});
}
} else {

Loading…
Cancel
Save