From e0221ff3401d397b981fbb3e057fa7812de37f35 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 11 Jul 2016 15:56:08 -0700 Subject: [PATCH] Debugging --- src/core/ext/client_config/subchannel.c | 17 +++--- .../chttp2/transport/chttp2_transport.c | 2 +- src/core/lib/iomgr/combiner.c | 61 ++++++++++++++++--- src/core/lib/iomgr/combiner.h | 11 ++-- src/core/lib/iomgr/exec_ctx.h | 6 +- src/core/lib/surface/channel.c | 7 +-- src/core/lib/surface/channel_ping.c | 9 ++- src/core/lib/surface/server.c | 56 ++++++++--------- src/core/lib/transport/transport.c | 26 ++++++++ src/core/lib/transport/transport.h | 4 ++ test/core/iomgr/combiner_test.c | 20 +++--- test/core/surface/lame_client_test.c | 15 +++-- 12 files changed, 152 insertions(+), 82 deletions(-) diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index d089cd4399e..2318bd1023b 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -504,14 +504,14 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *closure) { - grpc_transport_op op; + grpc_transport_op *op = grpc_make_transport_op(NULL); grpc_channel_element *elem; memset(&op, 0, sizeof(op)); - op.connectivity_state = state; - op.on_connectivity_state_change = closure; - op.bind_pollset_set = interested_parties; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); + elem->filter->start_transport_op(exec_ctx, elem, op); } void grpc_connected_subchannel_notify_on_state_change( @@ -525,12 +525,11 @@ void grpc_connected_subchannel_notify_on_state_change( void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) { - grpc_transport_op op; + grpc_transport_op *op = grpc_make_transport_op(NULL); grpc_channel_element *elem; - memset(&op, 0, sizeof(op)); - op.send_ping = closure; + op->send_ping = closure; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); + elem->filter->start_transport_op(exec_ctx, elem, op); } static void publish_transport_locked(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 194264c8f9a..4dc642e37ab 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -181,7 +181,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->new_stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - grpc_combiner_destroy(t->executor.combiner); + grpc_combiner_destroy(exec_ctx, t->executor.combiner); /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 5dfef4f25b0..99e112308b9 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -32,6 +32,7 @@ */ #include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/workqueue.h" #include @@ -52,7 +53,9 @@ struct grpc_combiner { grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); - lock->optional_workqueue = optional_workqueue; + lock->optional_workqueue = + optional_workqueue ? GRPC_WORKQUEUE_REF(optional_workqueue, "combiner") + : NULL; gpr_atm_no_barrier_store(&lock->state, 1); gpr_mpscq_init(&lock->queue); lock->take_async_break_before_final_list = false; @@ -60,15 +63,18 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { return lock; } -static void really_destroy(grpc_combiner *lock) { +static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); gpr_mpscq_destroy(&lock->queue); + if (lock->optional_workqueue != NULL) { + GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner"); + } gpr_free(lock); } -void grpc_combiner_destroy(grpc_combiner *lock) { +void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) { - really_destroy(lock); + really_destroy(exec_ctx, lock); } } @@ -77,7 +83,12 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - if (maybe_finish_one(exec_ctx, arg)) finish(exec_ctx, arg); + grpc_combiner *lock = arg; + GPR_ASSERT(exec_ctx->active_combiner == NULL); + exec_ctx->active_combiner = lock; + if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); + GPR_ASSERT(exec_ctx->active_combiner == lock); + exec_ctx->active_combiner = NULL; } static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { @@ -96,6 +107,8 @@ static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_combiner *lock = arg; + GPR_ASSERT(exec_ctx->active_combiner == NULL); + exec_ctx->active_combiner = lock; // quick peek to see if new things have turned up on the queue: if so, go back // to executing them before the final list if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) { @@ -104,9 +117,12 @@ static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg, execute_final(exec_ctx, lock); finish(exec_ctx, lock); } + GPR_ASSERT(exec_ctx->active_combiner == lock); + exec_ctx->active_combiner = NULL; } static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + GPR_ASSERT(exec_ctx->active_combiner == lock); if (lock->take_async_break_before_final_list) { grpc_closure_init(&lock->continue_finishing, continue_executing_final, lock); @@ -121,6 +137,7 @@ static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); + GPR_ASSERT(exec_ctx->active_combiner == lock); if (n == NULL) { // queue is in an inconsistant state: use this as a cue that we should // go off and do something else for a while (and come back later) @@ -151,7 +168,7 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { case 3: // had one count, one unorphaned --> unlocked unorphaned return; case 2: // and one count, one orphaned --> unlocked and orphaned - really_destroy(lock); + really_destroy(exec_ctx, lock); return; case 1: case 0: @@ -166,19 +183,43 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *cl, grpc_error *error) { gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); GPR_ASSERT(last & 1); // ensure lock has not been destroyed - if (last == 1) { - cl->cb(exec_ctx, cl->cb_arg, error); - GRPC_ERROR_UNREF(error); - finish(exec_ctx, lock); + if (exec_ctx->active_combiner == NULL) { + if (last == 1) { + exec_ctx->active_combiner = lock; + cl->cb(exec_ctx, cl->cb_arg, error); + GRPC_ERROR_UNREF(error); + finish(exec_ctx, lock); + GPR_ASSERT(exec_ctx->active_combiner == lock); + exec_ctx->active_combiner = NULL; + } else { + cl->error = error; + gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); + } } else { cl->error = error; gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); + grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, + lock); + grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, + lock->optional_workqueue); } } +static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, + grpc_error *error) { + grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, + GRPC_ERROR_REF(error), true); +} + void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *closure, grpc_error *error, bool force_async_break) { + if (exec_ctx->active_combiner != lock) { + grpc_combiner_execute(exec_ctx, lock, + grpc_closure_create(enqueue_finally, closure), error); + return; + } + if (force_async_break) { lock->take_async_break_before_final_list = true; } diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 3554202aae2..5b94d5bd99c 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -40,8 +40,6 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/mpscq.h" -typedef struct grpc_combiner grpc_combiner; - // Provides serialized access to some resource. // Each action queued on an aelock is executed serially in a borrowed thread. // The actual thread executing actions may change over time (but there will only @@ -51,18 +49,19 @@ typedef struct grpc_combiner grpc_combiner; // necessary grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); // Destroy the lock -void grpc_combiner_destroy(grpc_combiner *lock); +void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); // Execute \a action within the lock. void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *closure, grpc_error *error); // Execute \a action within the lock just prior to unlocking. -// if \a force_async_break is additionally set, the combiner is forced to trip +// if \a hint_async_break is additionally set, the combiner is tries to trip // through the workqueue between finishing the primary queue of combined // closures and executing the finally list. -// Can only be called from within a closure scheduled by grpc_combiner_execute +// Takes a very slow and round-about path if not called from a +// grpc_combiner_execute closure void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *closure, grpc_error *error, - bool force_async_break); + bool hint_async_break); void grpc_combiner_force_async_finally(grpc_combiner *lock); #endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 917f332f03d..1895ee6245f 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -40,8 +40,8 @@ /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ -struct grpc_workqueue; typedef struct grpc_workqueue grpc_workqueue; +typedef struct grpc_combiner grpc_combiner; #ifndef GRPC_EXECUTION_CONTEXT_SANITIZER /** Execution context. @@ -66,13 +66,15 @@ typedef struct grpc_workqueue grpc_workqueue; */ struct grpc_exec_ctx { grpc_closure_list closure_list; + /** currently active combiner: updated only via combiner.c */ + grpc_combiner *active_combiner; bool cached_ready_to_finish; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); }; #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ - { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check } + { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check } #else struct grpc_exec_ctx { bool cached_ready_to_finish; diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 6d2b1c49352..52e78567bd8 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -334,14 +334,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, } void grpc_channel_destroy(grpc_channel *channel) { - grpc_transport_op op; + grpc_transport_op *op = grpc_make_transport_op(NULL); grpc_channel_element *elem; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); - memset(&op, 0, sizeof(op)); - op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); + op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed"); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); - elem->filter->start_transport_op(&exec_ctx, elem, &op); + elem->filter->start_transport_op(&exec_ctx, elem, op); GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel"); diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 9818f9d2f24..5f9f07f89a4 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -61,19 +61,18 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, void *tag, void *reserved) { - grpc_transport_op op; + grpc_transport_op *op = grpc_make_transport_op(NULL); ping_result *pr = gpr_malloc(sizeof(*pr)); grpc_channel_element *top_elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(reserved == NULL); - memset(&op, 0, sizeof(op)); pr->tag = tag; pr->cq = cq; grpc_closure_init(&pr->closure, ping_done, pr); - op.send_ping = &pr->closure; - op.bind_pollset = grpc_cq_pollset(cq); + op->send_ping = &pr->closure; + op->bind_pollset = grpc_cq_pollset(cq); grpc_cq_begin_op(cq, tag); - top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op); + top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index def6e5068b0..fc9987f0aa2 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -272,22 +272,20 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, int send_goaway, grpc_error *send_disconnect) { - grpc_transport_op op; - struct shutdown_cleanup_args *sc; + struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc)); + grpc_closure_init(&sc->closure, shutdown_cleanup, sc); + grpc_transport_op *op = grpc_make_transport_op(&sc->closure); grpc_channel_element *elem; - memset(&op, 0, sizeof(op)); - op.send_goaway = send_goaway; - sc = gpr_malloc(sizeof(*sc)); + op->send_goaway = send_goaway; sc->slice = gpr_slice_from_copied_string("Server shutdown"); - op.goaway_message = &sc->slice; - op.goaway_status = GRPC_STATUS_OK; - op.disconnect_with_error = send_disconnect; - grpc_closure_init(&sc->closure, shutdown_cleanup, sc); - op.on_consumed = &sc->closure; + op->goaway_message = &sc->slice; + op->goaway_status = GRPC_STATUS_OK; + op->disconnect_with_error = send_disconnect; + op->on_consumed = &sc->closure; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); + elem->filter->start_transport_op(exec_ctx, elem, op); } static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, @@ -434,14 +432,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_transport_op op; - memset(&op, 0, sizeof(op)); - op.set_accept_stream = true; - op.on_consumed = &chand->finish_destroy_channel_closure; + grpc_transport_op *op = + grpc_make_transport_op(&chand->finish_destroy_channel_closure); + op->set_accept_stream = true; grpc_channel_next_op(exec_ctx, grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - &op); + op); } static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { @@ -832,14 +829,13 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, channel_data *chand = cd; grpc_server *server = chand->server; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { - grpc_transport_op op; - memset(&op, 0, sizeof(op)); - op.on_connectivity_state_change = &chand->channel_connectivity_changed, - op.connectivity_state = &chand->connectivity_state; + grpc_transport_op *op = grpc_make_transport_op(NULL); + op->on_connectivity_state_change = &chand->channel_connectivity_changed, + op->connectivity_state = &chand->connectivity_state; grpc_channel_next_op(exec_ctx, grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), - &op); + op); } else { gpr_mu_lock(&server->mu_global); destroy_channel(exec_ctx, chand); @@ -1101,7 +1097,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, size_t slots; uint32_t probes; uint32_t max_probes = 0; - grpc_transport_op op; + grpc_transport_op *op = NULL; channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); @@ -1161,16 +1157,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, gpr_mu_unlock(&s->mu_global); GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); - memset(&op, 0, sizeof(op)); - op.set_accept_stream = true; - op.set_accept_stream_fn = accept_stream; - op.set_accept_stream_user_data = chand; - op.on_connectivity_state_change = &chand->channel_connectivity_changed; - op.connectivity_state = &chand->connectivity_state; + op = grpc_make_transport_op(NULL); + op->set_accept_stream = true; + op->set_accept_stream_fn = accept_stream; + op->set_accept_stream_user_data = chand; + op->on_connectivity_state_change = &chand->channel_connectivity_changed; + op->connectivity_state = &chand->connectivity_state; if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { - op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); + op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); } - grpc_transport_perform_op(exec_ctx, transport, &op); + grpc_transport_perform_op(exec_ctx, transport, op); } void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 857c3909d26..970d9c389a7 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -32,10 +32,14 @@ */ #include "src/core/lib/transport/transport.h" + +#include + #include #include #include #include + #include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport_impl.h" @@ -247,3 +251,25 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); add_error(op, &op->close_error, error); } + +typedef struct { + grpc_closure outer_on_complete; + grpc_closure *inner_on_complete; + grpc_transport_op op; +} made_transport_op; + +static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + made_transport_op *op = arg; + grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error), + NULL); + gpr_free(op); +} + +grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { + made_transport_op *op = gpr_malloc(sizeof(*op)); + grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op); + op->inner_on_complete = on_complete; + memset(&op->op, 0, sizeof(op->op)); + return &op->op; +} diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index b04b60ec3d3..508b57a5b42 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -285,4 +285,8 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport); char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *transport); +/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to + \a on_consumed and then delete the returned transport op */ +grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed); + #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */ diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c index 0aaeada345a..7cf016d82cc 100644 --- a/test/core/iomgr/combiner_test.c +++ b/test/core/iomgr/combiner_test.c @@ -43,7 +43,9 @@ static void test_no_op(void) { gpr_log(GPR_DEBUG, "test_no_op"); - grpc_combiner_destroy(grpc_combiner_create(NULL)); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_combiner_destroy(&exec_ctx, grpc_combiner_create(NULL)); + grpc_exec_ctx_finish(&exec_ctx); } static void set_bool_to_true(grpc_exec_ctx *exec_ctx, void *value, @@ -60,9 +62,10 @@ static void test_execute_one(void) { grpc_combiner_execute(&exec_ctx, lock, grpc_closure_create(set_bool_to_true, &done), GRPC_ERROR_NONE); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(done); - grpc_combiner_destroy(lock); + grpc_combiner_destroy(&exec_ctx, lock); + grpc_exec_ctx_finish(&exec_ctx); } typedef struct { @@ -89,7 +92,7 @@ static void execute_many_loop(void *a) { size_t n = 1; for (size_t i = 0; i < 10; i++) { for (size_t j = 0; j < 10000; j++) { - ex_args *c = gpr_malloc(sizeof(*a)); + ex_args *c = gpr_malloc(sizeof(*c)); c->ctr = &args->ctr; c->value = n++; grpc_combiner_execute(&exec_ctx, args->lock, @@ -117,7 +120,9 @@ static void test_execute_many(void) { for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { gpr_thd_join(thds[i]); } - grpc_combiner_destroy(lock); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_combiner_destroy(&exec_ctx, lock); + grpc_exec_ctx_finish(&exec_ctx); } static bool got_in_finally = false; @@ -139,9 +144,10 @@ static void test_execute_finally(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_combiner_execute(&exec_ctx, lock, grpc_closure_create(add_finally, lock), GRPC_ERROR_NONE); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(got_in_finally); - grpc_combiner_destroy(lock); + grpc_combiner_destroy(&exec_ctx, lock); + grpc_exec_ctx_finish(&exec_ctx); } int main(int argc, char **argv) { diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index f36f9805752..cad45cc9fa1 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -57,24 +57,23 @@ void verify_connectivity(grpc_exec_ctx *exec_ctx, void *arg, void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} void test_transport_op(grpc_channel *channel) { - grpc_transport_op op; + grpc_transport_op *op; grpc_channel_element *elem; grpc_connectivity_state state = GRPC_CHANNEL_IDLE; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - memset(&op, 0, sizeof(op)); grpc_closure_init(&transport_op_cb, verify_connectivity, &op); - op.on_connectivity_state_change = &transport_op_cb; - op.connectivity_state = &state; + op = grpc_make_transport_op(NULL); + op->on_connectivity_state_change = &transport_op_cb; + op->connectivity_state = &state; elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); - elem->filter->start_transport_op(&exec_ctx, elem, &op); + elem->filter->start_transport_op(&exec_ctx, elem, op); grpc_exec_ctx_finish(&exec_ctx); - memset(&op, 0, sizeof(op)); grpc_closure_init(&transport_op_cb, do_nothing, NULL); - op.on_consumed = &transport_op_cb; - elem->filter->start_transport_op(&exec_ctx, elem, &op); + op = grpc_make_transport_op(&transport_op_cb); + elem->filter->start_transport_op(&exec_ctx, elem, op); grpc_exec_ctx_finish(&exec_ctx); }