Fix CFStreamEndpointTests (#28812)

pull/28786/head
Esun Kim 3 years ago committed by GitHub
parent d5fc37f706
commit 84101427d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 102
      test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm

@ -36,6 +36,9 @@
#include "src/core/lib/resource_quota/api.h"
#include "test/core/util/test_config.h"
#include <chrono>
#include <future>
static const int kConnectTimeout = 5;
static const int kWriteTimeout = 5;
static const int kReadTimeout = 5;
@ -44,14 +47,15 @@ static const int kBufferSize = 10000;
static const int kRunLoopTimeout = 1;
static void set_atm(void *arg, grpc_error_handle error) {
gpr_atm *p = static_cast<gpr_atm *>(arg);
gpr_atm_full_cas(p, -1, reinterpret_cast<gpr_atm>(error));
static void set_error_handle_promise(void *arg, grpc_error_handle error) {
std::promise<grpc_error_handle> *p = static_cast<std::promise<grpc_error_handle> *>(arg);
p->set_value(error);
}
static void init_event_closure(grpc_closure *closure, gpr_atm *atm) {
*atm = -1;
GRPC_CLOSURE_INIT(closure, set_atm, static_cast<void *>(atm), grpc_schedule_on_exec_ctx);
static void init_event_closure(grpc_closure *closure,
std::promise<grpc_error_handle> *error_handle) {
GRPC_CLOSURE_INIT(closure, set_error_handle_promise, static_cast<void *>(error_handle),
grpc_schedule_on_exec_ctx);
}
static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
@ -80,16 +84,9 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
int svr_fd_;
}
- (BOOL)waitForEvent:(gpr_atm *)event timeout:(int)timeout {
- (BOOL)waitForEvent:(std::future<grpc_error_handle> *)event timeout:(int)timeout {
grpc_core::ExecCtx::Get()->Flush();
NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kConnectTimeout];
while (gpr_atm_acq_load(event) == -1 && [deadline timeIntervalSinceNow] > 0) {
NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kRunLoopTimeout];
[[NSRunLoop mainRunLoop] runMode:NSDefaultRunLoopMode beforeDate:deadline];
}
return (gpr_atm_acq_load(event) != -1);
return event->wait_for(std::chrono::seconds(timeout)) != std::future_status::timeout;
}
+ (void)setUp {
@ -111,7 +108,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr.addr);
int svr_fd;
int r;
gpr_atm connected = -1;
std::promise<grpc_error_handle> connected_promise;
grpc_closure done;
gpr_log(GPR_DEBUG, "test_succeeds");
@ -126,7 +123,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
/* connect to it */
XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0);
init_event_closure(&done, &connected);
init_event_closure(&done, &connected_promise);
const grpc_channel_args *args =
grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
nullptr);
@ -144,8 +141,9 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
svr_fd_ = r;
/* wait for the connection callback to finish */
XCTAssertEqual([self waitForEvent:&connected timeout:kConnectTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error_handle>(connected), GRPC_ERROR_NONE);
std::future<grpc_error_handle> connected_future = connected_promise.get_future();
XCTAssertEqual([self waitForEvent:&connected_future timeout:kConnectTimeout], YES);
XCTAssertEqual(connected_future.get(), GRPC_ERROR_NONE);
}
- (void)tearDown {
@ -157,11 +155,10 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
- (void)testReadWrite {
grpc_core::ExecCtx exec_ctx;
gpr_atm read;
grpc_closure read_done;
grpc_slice_buffer read_slices;
grpc_slice_buffer read_one_slice;
gpr_atm write;
std::promise<grpc_error_handle> write_promise;
grpc_closure write_done;
grpc_slice_buffer write_slices;
@ -173,11 +170,12 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
grpc_slice_buffer_init(&write_slices);
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
grpc_slice_buffer_add(&write_slices, slice);
init_event_closure(&write_done, &write);
init_event_closure(&write_done, &write_promise);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error_handle>(write), GRPC_ERROR_NONE);
std::future<grpc_error_handle> write_future = write_promise.get_future();
XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE);
while (recv_size < kBufferSize) {
ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
@ -193,10 +191,12 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
grpc_slice_buffer_init(&read_slices);
grpc_slice_buffer_init(&read_one_slice);
while (read_slices.length < kBufferSize) {
init_event_closure(&read_done, &read);
std::promise<grpc_error_handle> read_promise;
init_event_closure(&read_done, &read_promise);
grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false);
XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
std::future<grpc_error_handle> read_future = read_promise.get_future();
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
XCTAssertEqual(read_future.get(), GRPC_ERROR_NONE);
grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
}
@ -211,10 +211,10 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
- (void)testShutdownBeforeRead {
grpc_core::ExecCtx exec_ctx;
gpr_atm read;
std::promise<grpc_error_handle> read_promise;
grpc_closure read_done;
grpc_slice_buffer read_slices;
gpr_atm write;
std::promise<grpc_error_handle> write_promise;
grpc_closure write_done;
grpc_slice_buffer write_slices;
@ -224,17 +224,18 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
size_t recv_size = 0;
grpc_slice_buffer_init(&read_slices);
init_event_closure(&read_done, &read);
init_event_closure(&read_done, &read_promise);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
grpc_slice_buffer_init(&write_slices);
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
grpc_slice_buffer_add(&write_slices, slice);
init_event_closure(&write_done, &write);
init_event_closure(&write_done, &write_promise);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error_handle>(write), GRPC_ERROR_NONE);
std::future<grpc_error_handle> write_future = write_promise.get_future();
XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE);
while (recv_size < kBufferSize) {
ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
@ -245,13 +246,14 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
XCTAssertEqual(recv_size, kBufferSize);
XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], NO);
std::future<grpc_error_handle> read_future = read_promise.get_future();
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO);
grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush();
XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
XCTAssertNotEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE);
grpc_slice_buffer_reset_and_unref(&read_slices);
grpc_slice_buffer_reset_and_unref(&write_slices);
@ -260,10 +262,10 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
- (void)testRemoteClosed {
grpc_core::ExecCtx exec_ctx;
gpr_atm read;
std::promise<grpc_error_handle> read_promise;
grpc_closure read_done;
grpc_slice_buffer read_slices;
gpr_atm write;
std::promise<grpc_error_handle> write_promise;
grpc_closure write_done;
grpc_slice_buffer write_slices;
@ -272,18 +274,20 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
char read_buffer[kBufferSize];
size_t recv_size = 0;
init_event_closure(&read_done, &read);
init_event_closure(&read_done, &read_promise);
grpc_slice_buffer_init(&read_slices);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
grpc_slice_buffer_init(&write_slices);
slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
grpc_slice_buffer_add(&write_slices, slice);
init_event_closure(&write_done, &write);
init_event_closure(&write_done, &write_promise);
grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
XCTAssertEqual(reinterpret_cast<grpc_error_handle>(write), GRPC_ERROR_NONE);
std::future<grpc_error_handle> write_future = write_promise.get_future();
XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
XCTAssertEqual(write_future.get(), GRPC_ERROR_NONE);
while (recv_size < kBufferSize) {
ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
@ -296,8 +300,9 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
close(svr_fd_);
XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
XCTAssertNotEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
std::future<grpc_error_handle> read_future = read_promise.get_future();
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE);
grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
grpc_slice_buffer_reset_and_unref(&read_slices);
@ -307,11 +312,11 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
- (void)testRemoteReset {
grpc_core::ExecCtx exec_ctx;
gpr_atm read;
std::promise<grpc_error_handle> read_promise;
grpc_closure read_done;
grpc_slice_buffer read_slices;
init_event_closure(&read_done, &read);
init_event_closure(&read_done, &read_promise);
grpc_slice_buffer_init(&read_slices);
grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
@ -322,8 +327,9 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
close(svr_fd_);
XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
XCTAssertNotEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
std::future<grpc_error_handle> read_future = read_promise.get_future();
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
XCTAssertNotEqual(read_future.get(), GRPC_ERROR_NONE);
grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
grpc_slice_buffer_reset_and_unref(&read_slices);

Loading…
Cancel
Save