From 294d0ecc05097753982e89f14bb0b63c67451038 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 21 Sep 2015 14:59:45 -0700 Subject: [PATCH] Call lists compile --- test/core/iomgr/tcp_posix_test.c | 166 +++++++++--------------- test/core/iomgr/tcp_server_posix_test.c | 33 +++-- test/core/iomgr/udp_server_test.c | 26 +++- test/core/iomgr/workqueue_test.c | 27 ++-- 4 files changed, 126 insertions(+), 126 deletions(-) diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 40f3d98936f..09e67155e41 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -49,7 +49,6 @@ #include "test/core/iomgr/endpoint_tests.h" static grpc_pollset g_pollset; -static grpc_workqueue *g_workqueue; /* General test notes: @@ -139,7 +138,7 @@ static size_t count_slices(gpr_slice *slices, size_t nslices, return num_bytes; } -static void read_cb(void *user_data, int success) { +static void read_cb(void *user_data, int success, grpc_call_list *call_list) { struct read_socket_state *state = (struct read_socket_state *)user_data; size_t read_bytes; int current_data; @@ -156,19 +155,8 @@ static void read_cb(void *user_data, int success) { if (state->read_bytes >= state->target_read_bytes) { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { - switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) { - case GRPC_ENDPOINT_DONE: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - read_cb(user_data, 1); - break; - case GRPC_ENDPOINT_ERROR: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - read_cb(user_data, 0); - break; - case GRPC_ENDPOINT_PENDING: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - break; - } + grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb, call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } } @@ -179,15 +167,15 @@ static void read_test(size_t num_bytes, size_t slice_size) { struct read_socket_state state; size_t written_bytes; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_log(GPR_INFO, "Read test of size %d, slice size %d", num_bytes, slice_size); create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], g_workqueue, "read_test"), - slice_size, "test"); - grpc_endpoint_add_to_pollset(ep, &g_pollset); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); + grpc_endpoint_add_to_pollset(ep, &g_pollset, &call_list); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -198,28 +186,23 @@ static void read_test(size_t num_bytes, size_t slice_size) { gpr_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state); - switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { - case GRPC_ENDPOINT_DONE: - read_cb(&state, 1); - break; - case GRPC_ENDPOINT_ERROR: - read_cb(&state, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_read(ep, &state.incoming, &state.read_cb, &call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + deadline, &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&state.incoming); - grpc_endpoint_destroy(ep); + grpc_endpoint_destroy(ep, &call_list); + grpc_call_list_run(&call_list); } /* Write to a socket until it fills up, then read from it using the grpc_tcp @@ -230,14 +213,15 @@ static void large_read_test(size_t slice_size) { struct read_socket_state state; ssize_t written_bytes; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_log(GPR_INFO, "Start large read test, slice size %d", slice_size); create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], g_workqueue, "large_read_test"), - slice_size, "test"); - grpc_endpoint_add_to_pollset(ep, &g_pollset); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size, + "test"); + grpc_endpoint_add_to_pollset(ep, &g_pollset, &call_list); written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -248,28 +232,23 @@ static void large_read_test(size_t slice_size) { gpr_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state); - switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { - case GRPC_ENDPOINT_DONE: - read_cb(&state, 1); - break; - case GRPC_ENDPOINT_ERROR: - read_cb(&state, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_read(ep, &state.incoming, &state.read_cb, &call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + deadline, &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&state.incoming); - grpc_endpoint_destroy(ep); + grpc_endpoint_destroy(ep, &call_list); + grpc_call_list_run(&call_list); } struct write_socket_state { @@ -300,7 +279,8 @@ static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size, return slices; } -static void write_done(void *user_data /* write_socket_state */, int success) { +static void write_done(void *user_data /* write_socket_state */, int success, + grpc_call_list *call_list) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -317,6 +297,7 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { int flags; int current = 0; int i; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; flags = fcntl(fd, F_GETFL, 0); GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); @@ -325,8 +306,9 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { grpc_pollset_worker worker; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10), &call_list); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); do { bytes_read = read(fd, buf, bytes_left > read_size ? read_size : bytes_left); @@ -345,26 +327,6 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { gpr_free(buf); } -static ssize_t drain_socket(int fd) { - ssize_t read_bytes; - ssize_t total_bytes = 0; - unsigned char buf[256]; - int current = 0; - int i; - do { - read_bytes = read(fd, buf, 256); - if (read_bytes > 0) { - total_bytes += read_bytes; - for (i = 0; i < read_bytes; ++i) { - GPR_ASSERT(buf[i] == current); - current = (current + 1) % 256; - } - } - } while (read_bytes >= 0 || errno == EINTR); - GPR_ASSERT(errno == EAGAIN); - return total_bytes; -} - /* Write to a socket using the grpc_tcp API, then drain it directly. Note that if the write does not complete immediately we need to drain the socket in parallel with the read. */ @@ -372,22 +334,22 @@ static void write_test(size_t num_bytes, size_t slice_size) { int sv[2]; grpc_endpoint *ep; struct write_socket_state state; - ssize_t read_bytes; size_t num_blocks; gpr_slice *slices; gpr_uint8 current_data = 0; gpr_slice_buffer outgoing; grpc_closure write_done_closure; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes, slice_size); create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], g_workqueue, "write_test"), + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); - grpc_endpoint_add_to_pollset(ep, &g_pollset); + grpc_endpoint_add_to_pollset(ep, &g_pollset, &call_list); state.ep = ep; state.write_done = 0; @@ -398,33 +360,26 @@ static void write_test(size_t num_bytes, size_t slice_size) { gpr_slice_buffer_addn(&outgoing, slices, num_blocks); grpc_closure_init(&write_done_closure, write_done, &state); - switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) { - case GRPC_ENDPOINT_DONE: - /* Write completed immediately */ - read_bytes = drain_socket(sv[0]); - GPR_ASSERT((size_t)read_bytes == num_bytes); - break; - case GRPC_ENDPOINT_PENDING: - drain_socket_blocking(sv[0], num_bytes, num_bytes); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - for (;;) { - grpc_pollset_worker worker; - if (state.write_done) { - break; - } - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); - } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_endpoint_write(ep, &outgoing, &write_done_closure, &call_list); + drain_socket_blocking(sv[0], num_bytes, num_bytes); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + for (;;) { + grpc_pollset_worker worker; + if (state.write_done) { break; - case GRPC_ENDPOINT_ERROR: - gpr_log(GPR_ERROR, "endpoint got error"); - abort(); + } + grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + deadline, &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&outgoing); - grpc_endpoint_destroy(ep); + grpc_endpoint_destroy(ep, &call_list); gpr_free(slices); + grpc_call_list_run(&call_list); } void run_tests(void) { @@ -454,14 +409,17 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( size_t slice_size) { int sv[2]; grpc_endpoint_test_fixture f; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; create_sockets(sv); - f.client_ep = grpc_tcp_create( - grpc_fd_create(sv[0], g_workqueue, "fixture:client"), slice_size, "test"); - f.server_ep = grpc_tcp_create( - grpc_fd_create(sv[1], g_workqueue, "fixture:server"), slice_size, "test"); - grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset); - grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset); + f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), + slice_size, "test"); + f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), + slice_size, "test"); + grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset, &call_list); + grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset, &call_list); + + grpc_call_list_run(&call_list); return f; } @@ -470,17 +428,21 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up}, }; -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_init(); - g_workqueue = grpc_workqueue_create(); grpc_pollset_init(&g_pollset); run_tests(); grpc_endpoint_tests(configs[0], &g_pollset); - GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); return 0; diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 29a20cba8e5..5bd3792a012 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -48,9 +48,10 @@ static grpc_pollset g_pollset; static int g_nconnects = 0; -static void on_connect(void *arg, grpc_endpoint *tcp) { - grpc_endpoint_shutdown(tcp); - grpc_endpoint_destroy(tcp); +static void on_connect(void *arg, grpc_endpoint *tcp, + grpc_call_list *call_list) { + grpc_endpoint_shutdown(tcp, call_list); + grpc_endpoint_destroy(tcp, call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); g_nconnects++; @@ -64,10 +65,12 @@ static void test_no_op(void) { } static void test_no_op_with_start(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST("test_no_op_with_start"); - grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); + grpc_tcp_server_start(s, NULL, 0, on_connect, NULL, &call_list); grpc_tcp_server_destroy(s, NULL, NULL); + grpc_call_list_run(&call_list); } static void test_no_op_with_port(void) { @@ -84,6 +87,7 @@ static void test_no_op_with_port(void) { } static void test_no_op_with_port_and_start(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; struct sockaddr_in addr; grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST("test_no_op_with_port_and_start"); @@ -93,12 +97,14 @@ static void test_no_op_with_port_and_start(void) { GPR_ASSERT( grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); - grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); + grpc_tcp_server_start(s, NULL, 0, on_connect, NULL, &call_list); grpc_tcp_server_destroy(s, NULL, NULL); + grpc_call_list_run(&call_list); } static void test_connect(int n) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; struct sockaddr_storage addr; socklen_t addr_len = sizeof(addr); int svrfd, clifd; @@ -120,7 +126,7 @@ static void test_connect(int n) { GPR_ASSERT(addr_len <= sizeof(addr)); pollsets[0] = &g_pollset; - grpc_tcp_server_start(s, pollsets, 1, on_connect, NULL); + grpc_tcp_server_start(s, pollsets, 1, on_connect, NULL, &call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -138,7 +144,10 @@ static void test_connect(int n) { gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + deadline, &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_log(GPR_DEBUG, "wait done"); @@ -151,9 +160,13 @@ static void test_connect(int n) { grpc_tcp_server_destroy(s, NULL, NULL); } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_iomgr_init(); grpc_pollset_init(&g_pollset); @@ -165,7 +178,9 @@ int main(int argc, char **argv) { test_connect(1); test_connect(10); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 24b6b1c717b..476dfd63bcd 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -69,10 +69,12 @@ static void test_no_op(void) { } static void test_no_op_with_start(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_udp_server *s = grpc_udp_server_create(); LOG_TEST("test_no_op_with_start"); - grpc_udp_server_start(s, NULL, 0); + grpc_udp_server_start(s, NULL, 0, &call_list); grpc_udp_server_destroy(s, NULL, NULL); + grpc_call_list_run(&call_list); } static void test_no_op_with_port(void) { @@ -89,6 +91,7 @@ static void test_no_op_with_port(void) { } static void test_no_op_with_port_and_start(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; struct sockaddr_in addr; grpc_udp_server *s = grpc_udp_server_create(); LOG_TEST("test_no_op_with_port_and_start"); @@ -98,12 +101,14 @@ static void test_no_op_with_port_and_start(void) { GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr), on_read)); - grpc_udp_server_start(s, NULL, 0); + grpc_udp_server_start(s, NULL, 0, &call_list); grpc_udp_server_destroy(s, NULL, NULL); + grpc_call_list_run(&call_list); } static void test_receive(int number_of_clients) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; struct sockaddr_storage addr; socklen_t addr_len = sizeof(addr); int clifd, svrfd; @@ -128,7 +133,7 @@ static void test_receive(int number_of_clients) { GPR_ASSERT(addr_len <= sizeof(addr)); pollsets[0] = &g_pollset; - grpc_udp_server_start(s, pollsets, 1); + grpc_udp_server_start(s, pollsets, 1, &call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -145,7 +150,10 @@ static void test_receive(int number_of_clients) { gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + deadline, &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1); close(clifd); @@ -157,9 +165,13 @@ static void test_receive(int number_of_clients) { grpc_udp_server_destroy(s, NULL, NULL); } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_iomgr_init(); grpc_pollset_init(&g_pollset); @@ -171,7 +183,9 @@ int main(int argc, char **argv) { test_receive(1); test_receive(10); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c index aa7b460f919..08030dfe832 100644 --- a/test/core/iomgr/workqueue_test.c +++ b/test/core/iomgr/workqueue_test.c @@ -40,7 +40,7 @@ static grpc_pollset g_pollset; -static void must_succeed(void *p, int success) { +static void must_succeed(void *p, int success, grpc_call_list *call_list) { GPR_ASSERT(success == 1); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); *(int *)p = 1; @@ -51,34 +51,43 @@ static void must_succeed(void *p, int success) { static void test_add_closure(void) { grpc_closure c; int done = 0; - grpc_workqueue *wq = grpc_workqueue_create(); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_workqueue *wq = grpc_workqueue_create(&call_list); gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); grpc_pollset_worker worker; grpc_closure_init(&c, must_succeed, &done); grpc_workqueue_push(wq, &c, 1); - grpc_workqueue_add_to_pollset(wq, &g_pollset); + grpc_workqueue_add_to_pollset(wq, &g_pollset, &call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); GPR_ASSERT(!done); - grpc_pollset_work(&g_pollset, &worker, gpr_now(deadline.clock_type), - deadline); - GPR_ASSERT(done); + grpc_pollset_work(&g_pollset, &worker, gpr_now(deadline.clock_type), deadline, + &call_list); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + GPR_ASSERT(done); - GRPC_WORKQUEUE_UNREF(wq, "destroy"); + GRPC_WORKQUEUE_UNREF(wq, "destroy", &call_list); + grpc_call_list_run(&call_list); } -static void done_shutdown(void *arg) { grpc_pollset_destroy(arg); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_init(); grpc_pollset_init(&g_pollset); test_add_closure(); - grpc_pollset_shutdown(&g_pollset, done_shutdown, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); return 0; }