Merge branch 'enable-epoll1' of https://github.com/ctiller/grpc into ctiller-enable-epoll1

reviewable/pr11816/r3
Sree Kuchibhotla 7 years ago
commit 48d90c2272
  1. 2
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  2. 4
      src/objective-c/RxLibrary/GRXBufferedPipe.h
  3. 112
      src/objective-c/RxLibrary/GRXBufferedPipe.m
  4. 26
      src/objective-c/tests/RxLibraryUnitTests.m

@ -158,6 +158,7 @@ static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
sd->user_data = NULL;
}
}
gpr_free(subchannel_list->subchannels);
@ -578,6 +579,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
sd->user_data = NULL;
}
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */

@ -27,8 +27,8 @@
* immediately, unless flow control prevents it.
* If it is throttled and keeps receiving values, as well as if it receives values before being
* started, it will buffer them and propagate them in order as soon as its state becomes Started.
* If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
* propagate the error immediately.
* If it receives an end of stream (via -writesFinishedWithError:), it will buffer the EOS after the
* last buffered value and issue it to the writeable after all buffered values are issued.
*
* Beware that a pipe of this type can't prevent receiving more values when it is paused (for
* example if used to write data to a congested network connection). Because in such situations the

@ -18,11 +18,13 @@
#import "GRXBufferedPipe.h"
@interface GRXBufferedPipe ()
@property(atomic) id<GRXWriteable> writeable;
@end
@implementation GRXBufferedPipe {
id<GRXWriteable> _writeable;
NSMutableArray *_queue;
BOOL _inputIsFinished;
NSError *_errorOrNil;
dispatch_queue_t _writeQueue;
}
@synthesize state = _state;
@ -33,99 +35,79 @@
- (instancetype)init {
if (self = [super init]) {
_queue = [NSMutableArray array];
_state = GRXWriterStateNotStarted;
_writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
dispatch_suspend(_writeQueue);
}
return self;
}
- (id)popValue {
id value = _queue[0];
[_queue removeObjectAtIndex:0];
return value;
}
- (void)writeBufferUntilPausedOrStopped {
while (_state == GRXWriterStateStarted && _queue.count > 0) {
[_writeable writeValue:[self popValue]];
}
if (_inputIsFinished && _queue.count == 0) {
// Our writer finished normally while we were paused or not-started-yet.
[self finishWithError:_errorOrNil];
}
}
#pragma mark GRXWriteable implementation
// Returns whether events can be simply propagated to the other end of the pipe.
- (BOOL)shouldFastForward {
return _state == GRXWriterStateStarted && _queue.count == 0;
}
- (void)writeValue:(id)value {
if (self.shouldFastForward) {
// Skip the queue.
[_writeable writeValue:value];
} else {
if ([value respondsToSelector:@selector(copy)]) {
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
// So just buffer the new value.
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
if ([value respondsToSelector:@selector(copy)]) {
value = [value copy];
}
[_queue addObject:value];
value = [value copy];
}
__weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
[weakSelf.writeable writeValue:value];
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
_inputIsFinished = YES;
_errorOrNil = errorOrNil;
if (errorOrNil || self.shouldFastForward) {
// No need to write pending values.
[self finishWithError:_errorOrNil];
}
__weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^{
[weakSelf finishWithError:errorOrNil];
});
}
#pragma mark GRXWriter implementation
- (void)setState:(GRXWriterState)newState {
// Manual transitions are only allowed from the started or paused states.
if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
}
switch (newState) {
case GRXWriterStateFinished:
_state = newState;
_queue = nil;
// Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
// writeable to be messaged anymore.
_writeable = nil;
return;
case GRXWriterStatePaused:
_state = newState;
@synchronized (self) {
// Manual transitions are only allowed from the started or paused states.
if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
case GRXWriterStateStarted:
if (_state == GRXWriterStatePaused) {
}
switch (newState) {
case GRXWriterStateFinished:
self.writeable = nil;
if (_state == GRXWriterStatePaused) {
dispatch_resume(_writeQueue);
}
_state = newState;
[self writeBufferUntilPausedOrStopped];
}
return;
case GRXWriterStateNotStarted:
return;
return;
case GRXWriterStatePaused:
if (_state == GRXWriterStateStarted) {
_state = newState;
dispatch_suspend(_writeQueue);
}
return;
case GRXWriterStateStarted:
if (_state == GRXWriterStatePaused) {
_state = newState;
dispatch_resume(_writeQueue);
}
return;
case GRXWriterStateNotStarted:
return;
}
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
self.writeable = writeable;
_state = GRXWriterStateStarted;
_writeable = writeable;
[self writeBufferUntilPausedOrStopped];
dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
id<GRXWriteable> writeable = _writeable;
[self.writeable writesFinishedWithError:errorOrNil];
self.state = GRXWriterStateFinished;
[writeable writesFinishedWithError:errorOrNil];
}
@end

@ -23,6 +23,8 @@
#import <RxLibrary/GRXWriteable.h>
#import <RxLibrary/GRXWriter.h>
#define TEST_TIMEOUT 1
// A mock of a GRXSingleValueHandler block that can be queried for how many times it was called and
// what were the last values passed to it.
//
@ -140,26 +142,38 @@
#pragma mark BufferedPipe
- (void)testBufferedPipePropagatesValue {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
handler.block(value, errorOrNil);
[expectation fulfill];
}];
id anyValue = @7;
// If:
GRXBufferedPipe *pipe = [GRXBufferedPipe pipe];
[pipe startWithWriteable:writeable];
[pipe writeValue:anyValue];
[pipe writesFinishedWithError:nil];
// Then:
[self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, anyValue);
XCTAssertEqualObjects(handler.errorOrNil, nil);
}
- (void)testBufferedPipePropagatesError {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
handler.block(value, errorOrNil);
[expectation fulfill];
}];
NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil];
// If:
@ -168,15 +182,20 @@
[pipe writesFinishedWithError:anyError];
// Then:
[self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, nil);
XCTAssertEqualObjects(handler.errorOrNil, anyError);
}
- (void)testBufferedPipeFinishWriteWhilePaused {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
handler.block(value, errorOrNil);
[expectation fulfill];
}];
id anyValue = @7;
// If:
@ -188,6 +207,7 @@
[pipe startWithWriteable:writeable];
// Then:
[self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, anyValue);
XCTAssertEqualObjects(handler.errorOrNil, nil);

Loading…
Cancel
Save