Made startBatch more strongly typed

pull/1363/head
murgatroid99 10 years ago
parent a89cc64947
commit 54e93d4b9a
  1. 35
      src/objective-c/GRPCClient/GRPCCall.m
  2. 49
      src/objective-c/GRPCClient/private/GRPCWrappedCall.h
  3. 337
      src/objective-c/GRPCClient/private/GRPCWrappedCall.m

@ -175,8 +175,8 @@ static void AssertNoErrorInCall(grpc_call_error error) {
// Only called from the call queue.
// The handler will be called from the network queue.
- (void)startReadWithHandler:(GRPCCompletionHandler)handler {
[_wrappedCall startBatchWithOperations:@{@(GRPC_OP_RECV_MESSAGE): @YES} handleCompletion:handler];
- (void)startReadWithHandler:(void(^)(NSData *))handler {
[_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMessage alloc] initWithHandler:handler]]];
}
// Called initially from the network queue once response headers are received,
@ -193,9 +193,8 @@ static void AssertNoErrorInCall(grpc_call_error error) {
__weak GRPCDelegateWrapper *weakWriteable = _responseWriteable;
dispatch_async(_callQueue, ^{
[weakSelf startReadWithHandler:^(NSDictionary *result) {
NSData *data = result[@(GRPC_OP_RECV_MESSAGE)];
if (data == [NSNull null]) {
[weakSelf startReadWithHandler:^(NSData *data) {
if (data == nil) {
return;
}
if (!data) {
@ -224,7 +223,7 @@ static void AssertNoErrorInCall(grpc_call_error error) {
// TODO(jcanizales): Rename to commitHeaders.
- (void)sendHeaders:(NSDictionary *)metadata {
[_wrappedCall startBatchWithOperations:@{@(GRPC_OP_SEND_INITIAL_METADATA): metadata?:@{}} handleCompletion:nil];
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:metadata?:@{} handler:nil]]];
}
#pragma mark GRXWriteable implementation
@ -234,7 +233,7 @@ static void AssertNoErrorInCall(grpc_call_error error) {
- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
__weak GRPCCall *weakSelf = self;
GRPCCompletionHandler resumingHandler = ^(NSDictionary *result) {
void(^resumingHandler)(void) = ^{
// Resume the request writer (even in the case of error).
// TODO(jcanizales): No need to do it in the case of errors anymore?
GRPCCall *strongSelf = weakSelf;
@ -242,7 +241,7 @@ static void AssertNoErrorInCall(grpc_call_error error) {
strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
};
[_wrappedCall startBatchWithOperations:@{@(GRPC_OP_SEND_MESSAGE): message} handleCompletion:resumingHandler errorHandler:errorHandler];
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler]] errorHandler:errorHandler];
}
- (void)didReceiveValue:(id)value {
@ -265,7 +264,7 @@ static void AssertNoErrorInCall(grpc_call_error error) {
// Only called from the call queue. The error handler will be called from the
// network queue if the requests stream couldn't be closed successfully.
- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
[_wrappedCall startBatchWithOperations:@{@(GRPC_OP_SEND_CLOSE_FROM_CLIENT): @YES} handleCompletion:nil errorHandler:errorHandler];
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] initWithHandler:nil]] errorHandler:errorHandler];
}
- (void)didFinishWithError:(NSError *)errorOrNil {
@ -289,27 +288,23 @@ static void AssertNoErrorInCall(grpc_call_error error) {
// after this.
// The first one (metadataHandler), when the response headers are received.
// The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithMetadataHandler:(GRPCCompletionHandler)metadataHandler
completionHandler:(GRPCCompletionHandler)completionHandler {
[_wrappedCall startBatchWithOperations:@{@(GRPC_OP_RECV_INITIAL_METADATA): @YES} handleCompletion:metadataHandler];
[_wrappedCall startBatchWithOperations:@{@(GRPC_OP_RECV_STATUS_ON_CLIENT): @YES} handleCompletion:completionHandler];
- (void)invokeCallWithMetadataHandler:(void(^)(NSDictionary *))metadataHandler
completionHandler:(void(^)(NSError *))completionHandler {
[_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc] initWithHandler:metadataHandler]]];
[_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc] initWithHandler:completionHandler]]];
}
- (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithMetadataHandler:^(NSDictionary *result) {
[self invokeCallWithMetadataHandler:^(NSDictionary *metadata) {
// Response metadata received.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseMetadata = result[@(GRPC_OP_RECV_INITIAL_METADATA)];
strongSelf.responseMetadata = metadata;
[strongSelf startNextRead];
}
} completionHandler:^(NSDictionary *result) {
} completionHandler:^(NSError *error) {
// TODO(jcanizales): Merge HTTP2 trailers into response metadata.
id error = result[@(GRPC_OP_RECV_STATUS_ON_CLIENT)];
if (error == [NSNull null]) {
error = nil;
}
[weakSelf finishWithError:error];
}];
// Now that the RPC has been initiated, request writes can start.

@ -32,17 +32,62 @@
*/
#import <Foundation/Foundation.h>
#include <grpc/grpc.h>
#import "GRPCChannel.h"
typedef void(^GRPCCompletionHandler)(NSDictionary *);
@protocol GRPCOp <NSObject>
- (void)getOp:(grpc_op *)op;
- (void(^)(void))opProcessor;
@end
@interface GRPCOpSendMetadata : NSObject <GRPCOp>
- (instancetype)initWithMetadata:(NSDictionary *)metadata handler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpSendMessage : NSObject <GRPCOp>
- (instancetype)initWithMessage:(NSData *)message handler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpSendClose : NSObject <GRPCOp>
- (instancetype)initWithHandler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpRecvMetadata : NSObject <GRPCOp>
- (instancetype)initWithHandler:(void(^)(NSDictionary *))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpRecvMessage : NSObject <GRPCOp>
- (instancetype)initWithHandler:(void(^)(NSData *))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCOpRecvStatus : NSObject <GRPCOp>
- (instancetype)initWithHandler:(void(^)(NSError *))handler NS_DESIGNATED_INITIALIZER;
@end
@interface GRPCWrappedCall : NSObject
- (instancetype)initWithChannel:(GRPCChannel *)channel method:(NSString *)method host:(NSString *)host NS_DESIGNATED_INITIALIZER;
- (void)startBatchWithOperations:(NSDictionary *)ops handleCompletion:(GRPCCompletionHandler)handleCompletion errorHandler:(void(^)())errorHandler;
- (void)startBatchWithOperations:(NSArray *)ops errorHandler:(void(^)())errorHandler;
- (void)startBatchWithOperations:(NSDictionary *)ops handleCompletion:(GRPCCompletionHandler)handleCompletion;
- (void)startBatchWithOperations:(NSArray *)ops;
- (void)cancel;
@end

@ -41,6 +41,230 @@
#import "NSData+GRPC.h"
#import "NSError+GRPC.h"
@implementation GRPCOpSendMetadata{
void(^_handler)(void);
grpc_metadata *_send_metadata;
size_t _count;
}
- (instancetype)init {
return [self initWithMetadata:nil handler:nil];
}
- (instancetype)initWithMetadata:(NSDictionary *)metadata handler:(void (^)(void))handler {
if (self = [super init]) {
if (metadata) {
[metadata grpc_getMetadataArray:&_send_metadata];
_count = metadata.count;
} else {
_send_metadata = NULL;
_count = 0;
}
_handler = handler;
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = _count;
}
- (void (^)(void))opProcessor {
return ^{
gpr_free(_send_metadata);
if (_handler) {
_handler();
}
};
}
@end
@implementation GRPCOpSendMessage{
void(^_handler)(void);
grpc_byte_buffer *_byte_buffer;
}
- (instancetype)init {
return [self initWithMessage:nil handler:nil];
}
- (instancetype)initWithMessage:(NSData *)message handler:(void (^)(void))handler {
if (!message) {
[NSException raise:NSInvalidArgumentException format:@"message cannot be null"];
}
if (self = [super init]) {
_byte_buffer = [message grpc_byteBuffer];
_handler = handler;
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = _byte_buffer;
}
- (void (^)(void))opProcessor {
return ^{
gpr_free(_byte_buffer);
if (_handler) {
_handler();
}
};
}
@end
@implementation GRPCOpSendClose{
void(^_handler)(void);
}
- (instancetype)init {
return [self initWithHandler:nil];
}
- (instancetype)initWithHandler:(void (^)(void))handler {
if (self = [super init]) {
_handler = handler;
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
}
- (void (^)(void))opProcessor {
return ^{
if (_handler) {
_handler();
}
};
}
@end
@implementation GRPCOpRecvMetadata{
void(^_handler)(NSDictionary *);
grpc_metadata_array *_recv_initial_metadata;
}
- (instancetype) init {
return [self initWithHandler:nil];
}
- (instancetype) initWithHandler:(void (^)(NSDictionary *))handler {
if (self = [super init]) {
_handler = handler;
_recv_initial_metadata = gpr_malloc(sizeof(grpc_metadata_array));
grpc_metadata_array_init(_recv_initial_metadata);
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = _recv_initial_metadata;
}
- (void (^)(void))opProcessor {
return ^{
NSDictionary *metadata = [NSDictionary grpc_dictionaryFromMetadata:_recv_initial_metadata->metadata count:_recv_initial_metadata->count];
grpc_metadata_array_destroy(_recv_initial_metadata);
if (_handler) {
_handler(metadata);
}
};
}
@end
@implementation GRPCOpRecvMessage{
void(^_handler)(NSData *);
grpc_byte_buffer **_recv_message;
}
- (instancetype)init {
return [self initWithHandler:nil];
}
- (instancetype)initWithHandler:(void (^)(NSData *))handler {
if (self = [super init]) {
_handler = handler;
_recv_message = gpr_malloc(sizeof(grpc_byte_buffer*));
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = _recv_message;
}
- (void (^)(void))opProcessor {
return ^{
NSData *message = [NSData grpc_dataWithByteBuffer:*_recv_message];
grpc_byte_buffer_destroy(*_recv_message);
gpr_free(_recv_message);
if (_handler) {
_handler(message);
}
};
}
@end
@implementation GRPCOpRecvStatus{
void(^_handler)(NSError *);
grpc_status_code *_code;
char **_details;
size_t *_details_capacity;
grpc_metadata_array *_recv_trailing_metadata;
}
- (instancetype) init {
return [self initWithHandler:nil];
}
- (instancetype) initWithHandler:(void (^)(NSError *))handler {
if (self = [super init]) {
_handler = handler;
_code = gpr_malloc(sizeof(grpc_status_code));
_details = gpr_malloc(sizeof(char*));
_details_capacity = gpr_malloc(sizeof(size_t));
*_details_capacity = 0;
_recv_trailing_metadata = gpr_malloc(sizeof(grpc_metadata_array));
}
return self;
}
- (void)getOp:(grpc_op *)op {
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.status = _code;
op->data.recv_status_on_client.status_details = _details;
op->data.recv_status_on_client.status_details_capacity = _details_capacity;
op->data.recv_status_on_client.trailing_metadata = _recv_trailing_metadata;
}
- (void (^)(void))opProcessor {
return ^{
grpc_status status;
status.status = *_code;
status.details = *_details;
status.metadata = _recv_trailing_metadata;
gpr_free(_code);
gpr_free(_details);
gpr_free(_details_capacity);
if (_handler) {
_handler([NSError grpc_errorFromStatus:&status]);
}
};
}
@end
@implementation GRPCWrappedCall{
grpc_call *_call;
GRPCCompletionQueue *_queue;
@ -70,101 +294,18 @@
return self;
}
- (void)startBatchWithOperations:(NSDictionary *)operations handleCompletion:(GRPCCompletionHandler)handleCompletion {
[self startBatchWithOperations:operations handleCompletion:handleCompletion errorHandler:nil];
- (void)startBatchWithOperations:(NSArray *)operations {
[self startBatchWithOperations:operations errorHandler:nil];
}
- (void)startBatchWithOperations:(NSDictionary *)operations handleCompletion:(GRPCCompletionHandler)handleCompletion errorHandler:(void (^)())errorHandler {
- (void)startBatchWithOperations:(NSArray *)operations errorHandler:(void (^)())errorHandler {
NSMutableArray *opProcessors = [NSMutableArray array];
size_t nops = operations.count;
grpc_op *ops_array = gpr_malloc(nops * sizeof(grpc_op));
size_t index = 0;
NSMutableDictionary * __block opProcessors = [NSMutableDictionary dictionary];
grpc_metadata *send_metadata = NULL;
grpc_metadata_array *recv_initial_metadata;
grpc_metadata_array *recv_trailing_metadata;
grpc_byte_buffer *send_message;
grpc_byte_buffer **recv_message = NULL;
grpc_status_code *status_code;
char **status_details;
size_t *status_details_capacity;
for (id key in operations) {
id (^opBlock)(void);
grpc_op *current = &ops_array[index];
switch ([key intValue]) {
case GRPC_OP_SEND_INITIAL_METADATA:
// TODO(jcanizales): Name the type of current->data.send_initial_metadata in the C library so a pointer to it can be returned from methods.
current->data.send_initial_metadata.count = [operations[key] count];
[operations[key] grpc_getMetadataArray:&send_metadata];
current->data.send_initial_metadata.metadata = send_metadata;
opBlock = ^{
gpr_free(send_metadata);
return @YES;
};
break;
case GRPC_OP_SEND_MESSAGE:
send_message = [operations[key] grpc_byteBuffer];
current->data.send_message = send_message;
opBlock = ^{
grpc_byte_buffer_destroy(send_message);
return @YES;
};
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
opBlock = ^{
return @YES;
};
break;
case GRPC_OP_RECV_INITIAL_METADATA:
recv_initial_metadata = gpr_malloc(sizeof(grpc_metadata_array));
grpc_metadata_array_init(recv_initial_metadata);
current->data.recv_initial_metadata = recv_initial_metadata;
opBlock = ^{
NSDictionary *metadata = [NSDictionary grpc_dictionaryFromMetadata:recv_initial_metadata->metadata count:recv_initial_metadata->count];
grpc_metadata_array_destroy(recv_initial_metadata);
return metadata;
};
break;
case GRPC_OP_RECV_MESSAGE:
recv_message = gpr_malloc(sizeof(grpc_byte_buffer*));
current->data.recv_message = recv_message;
opBlock = ^{
NSData *data = [NSData grpc_dataWithByteBuffer:*recv_message];
grpc_byte_buffer_destroy(*recv_message);
gpr_free(recv_message);
return data;
};
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
status_code = gpr_malloc(sizeof(status_code));
current->data.recv_status_on_client.status = status_code;
status_details = gpr_malloc(sizeof(char*));
*status_details = NULL;
current->data.recv_status_on_client.status_details = status_details;
status_details_capacity = gpr_malloc(sizeof(grpc_status_code));
*status_details_capacity = 0;
current->data.recv_status_on_client.status_details_capacity = status_details_capacity;
recv_trailing_metadata = gpr_malloc(sizeof(grpc_metadata_array));
grpc_metadata_array_init(recv_trailing_metadata);
current->data.recv_status_on_client.trailing_metadata = recv_trailing_metadata;
opBlock = ^{
grpc_status status;
status.status = *status_code;
status.details = *status_details;
status.metadata = recv_trailing_metadata;
gpr_free(status_code);
gpr_free(status_details);
gpr_free(status_details_capacity);
return [NSError grpc_errorFromStatus:&status];
};
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
[NSException raise:NSInvalidArgumentException format:@"Not a server: cannot send status"];
default:
[NSException raise:NSInvalidArgumentException format:@"Unrecognized dictionary key"];
}
current->op = [key intValue];
opProcessors[key] = opBlock;
size_t i = 0;
for (id op in operations) {
[op getOp:&ops_array[i]];
[opProcessors addObject:[op opProcessor]];
}
grpc_call_error error = grpc_call_start_batch(_call, ops_array, nops, (__bridge_retained void *)(^(grpc_op_error error){
if (error != GRPC_OP_OK) {
@ -174,19 +315,11 @@
[NSException raise:@"Operation Exception" format:@"The batch failed with an unknown error"];
}
}
NSMutableDictionary *result = [NSMutableDictionary dictionary];
for (id key in opProcessors) {
id(^block)(void) = opProcessors[key];
id value = block();
if (value == nil) {
value = [NSNull null];
}
result[key] = value;
}
if (handleCompletion) {
handleCompletion(result);
for (void(^processor)(void) in opProcessors) {
processor();
}
}));
if (error != GRPC_CALL_OK) {
[NSException raise:NSInvalidArgumentException format:@"The batch did not start successfully"];
}

Loading…
Cancel
Save