Packet coalescing Objc layer and interop tests

pull/9247/head
Muxi Yan 8 years ago
parent 6ff14349ff
commit a40ccd8580
  1. 3
      src/core/ext/transport/cronet/transport/cronet_api_dummy.c
  2. 46
      src/objective-c/GRPCClient/GRPCCall.m
  3. 50
      src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
  4. 83
      src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
  5. 3
      src/objective-c/RxLibrary/GRXWriter+Immediate.m
  6. 64
      src/objective-c/tests/InteropTests.m
  7. 11
      src/objective-c/tests/InteropTestsRemoteWithCronet/InteropTestsRemoteWithCronet.m
  8. 1
      src/objective-c/tests/Podfile
  9. 1
      src/objective-c/tests/Tests.xcodeproj/project.pbxproj
  10. 115
      third_party/objective_c/Cronet/cronet_c_for_grpc.h

@ -77,9 +77,8 @@ int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
return 0;
}
int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
GPR_ASSERT(0);
return 0;
}
#endif /* GRPC_COMPILE_WITH_CRONET */

@ -36,6 +36,7 @@
#include <grpc/grpc.h>
#include <grpc/support/time.h>
#import <RxLibrary/GRXConcurrentWriteable.h>
#import <RxLibrary/GRXImmediateSingleWriter.h>
#import "private/GRPCConnectivityMonitor.h"
#import "private/GRPCHost.h"
@ -100,6 +101,10 @@ static NSMutableDictionary *callFlags;
GRPCCall *_retainSelf;
GRPCRequestHeaders *_requestHeaders;
BOOL _unaryCall;
NSMutableArray *_unaryOpBatch;
}
@synthesize state = _state;
@ -157,6 +162,11 @@ static NSMutableDictionary *callFlags;
_requestWriter = requestWriter;
_requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
_unaryCall = true;
_unaryOpBatch = [[NSMutableArray alloc] init];
}
}
return self;
}
@ -165,6 +175,9 @@ static NSMutableDictionary *callFlags;
- (void)finishWithError:(NSError *)errorOrNil {
@synchronized(self) {
if (_state == GRXWriterStateFinished) {
return;
}
_state = GRXWriterStateFinished;
}
@ -254,9 +267,15 @@ static NSMutableDictionary *callFlags;
- (void)sendHeaders:(NSDictionary *)headers {
// TODO(jcanizales): Add error handlers for async failures
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers
flags:[GRPCCall callFlagsForHost:_host path:_path]
handler:nil]]];
if (!_unaryCall) {
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers
flags:[GRPCCall callFlagsForHost:_host path:_path]
handler:nil]]];
} else {
[_unaryOpBatch addObject:[[GRPCOpSendMetadata alloc] initWithMetadata:headers
flags:[GRPCCall callFlagsForHost:_host path:_path]
handler:nil]];
}
}
#pragma mark GRXWriteable implementation
@ -275,9 +294,14 @@ static NSMutableDictionary *callFlags;
}
}
};
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
handler:resumingHandler]]
errorHandler:errorHandler];
if (!_unaryCall) {
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
handler:resumingHandler]]
errorHandler:errorHandler];
} else {
[_unaryOpBatch addObject:[[GRPCOpSendMessage alloc] initWithMessage:message
handler:resumingHandler]];
}
}
- (void)writeValue:(id)value {
@ -302,8 +326,14 @@ static NSMutableDictionary *callFlags;
// Only called from the call queue. The error handler will be called from the
// network queue if the requests stream couldn't be closed successfully.
- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
errorHandler:errorHandler];
if (!_unaryOpBatch) {
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
errorHandler:errorHandler];
} else {
[_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
[_wrappedCall startBatchWithOperations:_unaryOpBatch
errorHandler:errorHandler];
}
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {

@ -0,0 +1,50 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import <Foundation/Foundation.h>
#import "GRXImmediateWriter.h"
/**
* Utility to construct GRXWriter instances from values that are immediately available when
* required.
*/
@interface GRXImmediateSingleWriter : GRXImmediateWriter
/**
* Returns a writer that sends the passed value to its writeable and then finishes (releasing the
* value).
*/
+ (GRXWriter *)writerWithValue:(id)value;
@end

@ -0,0 +1,83 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#import "GRXImmediateSingleWriter.h"
@implementation GRXImmediateSingleWriter {
id _value;
NSError *_errorOrNil;
id<GRXWriteable> _writeable;
}
@synthesize state = _state;
- (instancetype)initWithValue:(id)value error:(NSError *)errorOrNil {
if (self = [super init]) {
_value = value;
_errorOrNil = errorOrNil;
_state = GRXWriterStateNotStarted;
}
return self;
}
+ (GRXWriter *)writerWithValue:(id)value {
return [[self alloc] initWithValue:value error:nil];
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
_state = GRXWriterStateStarted;
_writeable = writeable;
[writeable writeValue:_value];
[self finishWithError:_errorOrNil];
}
- (void)finishWithError:(NSError *)errorOrNil {
_state = GRXWriterStateFinished;
_errorOrNil = nil;
_value = nil;
id<GRXWriteable> writeable = _writeable;
_writeable = nil;
[writeable writesFinishedWithError:errorOrNil];
}
- (void)setState:(GRXWriterState)newState {
// Manual state transition is not allowed
return;
}
- (GRXWriter *)map:(id (^)(id))map {
_value = map(_value);
return self;
}
@end

@ -34,6 +34,7 @@
#import "GRXWriter+Immediate.h"
#import "GRXImmediateWriter.h"
#import "GRXImmediateSingleWriter.h"
@implementation GRXWriter (Immediate)
@ -50,7 +51,7 @@
}
+ (instancetype)writerWithValue:(id)value {
return [GRXImmediateWriter writerWithValue:value];
return [GRXImmediateSingleWriter writerWithValue:value];
}
+ (instancetype)writerWithError:(NSError *)error {

@ -45,6 +45,8 @@
#import <RemoteTest/Test.pbrpc.h>
#import <RxLibrary/GRXBufferedPipe.h>
#import <RxLibrary/GRXWriter+Immediate.h>
#import <grpc/support/log.h>
#import <grpc/grpc.h>
#define TEST_TIMEOUT 32
@ -94,15 +96,6 @@
return 0;
}
+ (void)setUp {
#ifdef GRPC_COMPILE_WITH_CRONET
// Cronet setup
[Cronet setHttp2Enabled:YES];
[Cronet start];
[GRPCCall useCronetWithEngine:[Cronet getGlobalEngine]];
#endif
}
- (void)setUp {
self.continueAfterFailure = NO;
@ -152,6 +145,59 @@
[self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
}
// TODO (mxyan): Do the same test for chttp2
#ifdef GRPC_COMPILE_WITH_CRONET
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
static bool coalesced_message_and_eos;
static void log_processor(gpr_log_func_args *args) {
unsigned long file_len = strlen(args->file);
const char suffix[] = "call.c";
const int suffix_len = sizeof(suffix) - 1;
const char nops[] = "nops=3";
if (file_len > suffix_len &&
0 == strcmp(suffix, &args->file[file_len - suffix_len]) &&
strstr(args->message, nops)) {
fprintf(stderr, "%s, %s\n", args->file, args->message);
coalesced_message_and_eos = true;
}
}
- (void)testPacketCoalescing {
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
grpc_tracer_set_enabled("all", 1);
gpr_set_log_function(log_processor);
coalesced_message_and_eos = false;
XCTAssertNotNil(self.class.host);
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = 10;
request.payload.body = [NSMutableData dataWithLength:10];
[_service unaryCallWithRequest:request handler:^(RMTSimpleResponse *response, NSError *error) {
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body = [NSMutableData dataWithLength:10];
XCTAssertEqualObjects(response, expectedResponse);
XCTAssert(coalesced_message_and_eos);
[expectation fulfill];
}];
[self waitForExpectationsWithTimeout:16 handler:nil];
}
#endif
#endif
- (void)test4MBResponsesAreAccepted {
XCTAssertNotNil(self.class.host);
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"MaxResponseSize"];

@ -33,6 +33,9 @@
#import <GRPCClient/GRPCCall+Tests.h>
#import <Cronet/Cronet.h>
#import <GRPCClient/GRPCCall+Cronet.h>
#import "InteropTests.h"
static NSString * const kRemoteSSLHost = @"grpc-test.sandbox.googleapis.com";
@ -43,6 +46,14 @@ static NSString * const kRemoteSSLHost = @"grpc-test.sandbox.googleapis.com";
@implementation InteropTestsRemoteWithCronet
+ (void)setUp {
// Cronet setup
[Cronet setHttp2Enabled:YES];
[Cronet start];
[GRPCCall useCronetWithEngine:[Cronet getGlobalEngine]];
[Cronet startNetLogToFile:@"Documents/cronet_netlog.json" logBytes:YES];
}
+ (NSString *)host {
return kRemoteSSLHost;
}

@ -92,6 +92,7 @@ post_install do |installer|
# GPR_UNREACHABLE_CODE causes "Control may reach end of non-void
# function" warning
config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO'
config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1'
end
end

@ -1296,6 +1296,7 @@
"$(inherited)",
"GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS=1",
"GRPC_COMPILE_WITH_CRONET=1",
"GRPC_CRONET_WITH_PACKET_COALESCING=1",
);
INFOPLIST_FILE = InteropTestsRemoteWithCronet/Info.plist;
IPHONEOS_DEPLOYMENT_TARGET = 9.3;

@ -5,6 +5,8 @@
#ifndef COMPONENTS_CRONET_IOS_CRONET_C_FOR_GRPC_H_
#define COMPONENTS_CRONET_IOS_CRONET_C_FOR_GRPC_H_
#define CRONET_EXPORT __attribute__((visibility("default")))
#ifdef __cplusplus
extern "C" {
#endif
@ -15,12 +17,10 @@ extern "C" {
/* Opaque object representing Cronet Engine. Created and configured outside
* of this API to facilitate sharing with other components */
typedef struct cronet_engine { void* obj; } cronet_engine;
void cronet_engine_add_quic_hint(cronet_engine* engine,
const char* host,
int port,
int alternate_port);
typedef struct cronet_engine {
void* obj;
void* annotation;
} cronet_engine;
/* Cronet Bidirectional Stream API */
@ -45,11 +45,12 @@ typedef struct cronet_bidirectional_stream_header_array {
/* Set of callbacks used to receive callbacks from bidirectional stream. */
typedef struct cronet_bidirectional_stream_callback {
/* Invoked when request headers are sent. Indicates that stream has initiated
* the request. Consumer may call cronet_bidirectional_stream_write() to start
* writing data.
/* Invoked when the stream is ready for reading and writing.
* Consumer may call cronet_bidirectional_stream_read() to start reading data.
* Consumer may call cronet_bidirectional_stream_write() to start writing
* data.
*/
void (*on_request_headers_sent)(cronet_bidirectional_stream* stream);
void (*on_stream_ready)(cronet_bidirectional_stream* stream);
/* Invoked when initial response headers are received.
* Consumer must call cronet_bidirectional_stream_read() to start reading.
@ -67,20 +68,19 @@ typedef struct cronet_bidirectional_stream_callback {
* It may be invoked after on_response_trailers_received()}, if there was
* pending read data before trailers were received.
*
* If count is 0, it means the remote side has signaled that it will send no
* more data; future calls to cronet_bidirectional_stream_read() will result
* in the on_data_read() callback or on_succeded() callback if
* If |bytes_read| is 0, it means the remote side has signaled that it will
* send no more data; future calls to cronet_bidirectional_stream_read()
* will result in the on_data_read() callback or on_succeded() callback if
* cronet_bidirectional_stream_write() was invoked with end_of_stream set to
* true.
*/
void (*on_read_completed)(cronet_bidirectional_stream* stream,
char* data,
int count);
int bytes_read);
/**
* Invoked when all data passed to cronet_bidirectional_stream_write() is
* sent.
* To continue writing, call cronet_bidirectional_stream_write().
* sent. To continue writing, call cronet_bidirectional_stream_write().
*/
void (*on_write_completed)(cronet_bidirectional_stream* stream,
const char* data);
@ -117,7 +117,7 @@ typedef struct cronet_bidirectional_stream_callback {
void (*on_canceled)(cronet_bidirectional_stream* stream);
} cronet_bidirectional_stream_callback;
/* Create a new stream object that uses |engine| and |callback|. All stream
/* Creates a new stream object that uses |engine| and |callback|. All stream
* tasks are performed asynchronously on the |engine| network thread. |callback|
* methods are invoked synchronously on the |engine| network thread, but must
* not run tasks on the current thread to prevent blocking networking operations
@ -129,6 +129,7 @@ typedef struct cronet_bidirectional_stream_callback {
*
* Both |calback| and |engine| must remain valid until stream is destroyed.
*/
CRONET_EXPORT
cronet_bidirectional_stream* cronet_bidirectional_stream_create(
cronet_engine* engine,
void* annotation,
@ -136,15 +137,40 @@ cronet_bidirectional_stream* cronet_bidirectional_stream_create(
/* TBD: The following methods return int. Should it be a custom type? */
/* Destroy stream object. Destroy could be called from any thread, including
/* Destroys stream object. Destroy could be called from any thread, including
* network thread, but is posted, so |stream| is valid until calling task is
* complete.
*/
CRONET_EXPORT
int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream);
/* Start the stream by sending request to |url| using |method| and |headers|. If
* |end_of_stream| is true, then no data is expected to be written.
/**
* Disables or enables auto flush. By default, data is flushed after
* every cronet_bidirectional_stream_write(). If the auto flush is disabled,
* the client should explicitly call cronet_bidirectional_stream_flush to flush
* the data.
*/
CRONET_EXPORT void cronet_bidirectional_stream_disable_auto_flush(
cronet_bidirectional_stream* stream,
bool disable_auto_flush);
/**
* Delays sending request headers until cronet_bidirectional_stream_flush()
* is called. This flag is currently only respected when QUIC is negotiated.
* When true, QUIC will send request header frame along with data frame(s)
* as a single packet when possible.
*/
CRONET_EXPORT
void cronet_bidirectional_stream_delay_request_headers_until_flush(
cronet_bidirectional_stream* stream,
bool delay_headers_until_flush);
/* Starts the stream by sending request to |url| using |method| and |headers|.
* If |end_of_stream| is true, then no data is expected to be written. The
* |method| is HTTP verb, with PUT having a special meaning to mark idempotent
* request, which could use QUIC 0-RTT.
*/
CRONET_EXPORT
int cronet_bidirectional_stream_start(
cronet_bidirectional_stream* stream,
const char* url,
@ -153,46 +179,61 @@ int cronet_bidirectional_stream_start(
const cronet_bidirectional_stream_header_array* headers,
bool end_of_stream);
/* Read response data into |buffer| of |capacity| length. Must only be called at
* most once in response to each invocation of the
* on_response_headers_received() and on_read_completed() methods of the
* cronet_bidirectional_stream_callback.
* Each call will result in an invocation of one of the callback's
* on_read_completed method if data is read, its on_succeeded() method if
* the stream is closed, or its on_failed() method if there's an error.
/* Reads response data into |buffer| of |capacity| length. Must only be called
* at most once in response to each invocation of the
* on_stream_ready()/on_response_headers_received() and on_read_completed()
* methods of the cronet_bidirectional_stream_callback.
* Each call will result in an invocation of the callback's
* on_read_completed() method if data is read, or its on_failed() method if
* there's an error. The callback's on_succeeded() method is also invoked if
* there is no more data to read and |end_of_stream| was previously sent.
*/
CRONET_EXPORT
int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream,
char* buffer,
int capacity);
/* Read response data into |buffer| of |capacity| length. Must only be called at
* most once in response to each invocation of the
* on_response_headers_received() and on_read_completed() methods of the
* cronet_bidirectional_stream_callback.
* Each call will result in an invocation of one of the callback's
* on_read_completed method if data is read, its on_succeeded() method if
* the stream is closed, or its on_failed() method if there's an error.
/* Writes request data from |buffer| of |buffer_length| length. If auto flush is
* disabled, data will be sent only after cronet_bidirectional_stream_flush() is
* called.
* Each call will result in an invocation the callback's on_write_completed()
* method if data is sent, or its on_failed() method if there's an error.
* The callback's on_succeeded() method is also invoked if |end_of_stream| is
* set and all response data has been read.
*/
CRONET_EXPORT
int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
const char* buffer,
int count,
int buffer_length,
bool end_of_stream);
/**
* Flushes pending writes. This method should not be called before invocation of
* on_stream_ready() method of the cronet_bidirectional_stream_callback.
* For each previously called cronet_bidirectional_stream_write()
* a corresponding on_write_completed() callback will be invoked when the buffer
* is sent.
*/
CRONET_EXPORT
void cronet_bidirectional_stream_flush(cronet_bidirectional_stream* stream);
/* Cancels the stream. Can be called at any time after
* cronet_bidirectional_stream_start(). The on_canceled() method of
* cronet_bidirectional_stream_callback will be invoked when cancelation
* is complete and no further callback methods will be invoked. If the
* stream has completed or has not started, calling
* cronet_bidirectional_stream_cancel() has no effect and on_canceled() will not
* be invoked. At most one callback method may be invoked after
* be invoked. At most one callback method may be invoked after
* cronet_bidirectional_stream_cancel() has completed.
*/
int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream);
CRONET_EXPORT
void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream);
/* Returns true if the |stream| was successfully started and is now done
* (succeeded, canceled, or failed).
* Returns false if the |stream| stream is not yet started or is in progress.
*/
CRONET_EXPORT
bool cronet_bidirectional_stream_is_done(cronet_bidirectional_stream* stream);
#ifdef __cplusplus

Loading…
Cancel
Save