pull/7644/head
Craig Tiller 9 years ago
parent 1fccf89404
commit e0221ff340
  1. 17
      src/core/ext/client_config/subchannel.c
  2. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  3. 61
      src/core/lib/iomgr/combiner.c
  4. 11
      src/core/lib/iomgr/combiner.h
  5. 6
      src/core/lib/iomgr/exec_ctx.h
  6. 7
      src/core/lib/surface/channel.c
  7. 9
      src/core/lib/surface/channel_ping.c
  8. 56
      src/core/lib/surface/server.c
  9. 26
      src/core/lib/transport/transport.c
  10. 4
      src/core/lib/transport/transport.h
  11. 20
      test/core/iomgr/combiner_test.c
  12. 15
      test/core/surface/lame_client_test.c

@ -504,14 +504,14 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *interested_parties, grpc_pollset_set *interested_parties,
grpc_connectivity_state *state, grpc_connectivity_state *state,
grpc_closure *closure) { grpc_closure *closure) {
grpc_transport_op op; grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem; grpc_channel_element *elem;
memset(&op, 0, sizeof(op)); memset(&op, 0, sizeof(op));
op.connectivity_state = state; op->connectivity_state = state;
op.on_connectivity_state_change = closure; op->on_connectivity_state_change = closure;
op.bind_pollset_set = interested_parties; op->bind_pollset_set = interested_parties;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
elem->filter->start_transport_op(exec_ctx, elem, &op); elem->filter->start_transport_op(exec_ctx, elem, op);
} }
void grpc_connected_subchannel_notify_on_state_change( void grpc_connected_subchannel_notify_on_state_change(
@ -525,12 +525,11 @@ void grpc_connected_subchannel_notify_on_state_change(
void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *con, grpc_connected_subchannel *con,
grpc_closure *closure) { grpc_closure *closure) {
grpc_transport_op op; grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem; grpc_channel_element *elem;
memset(&op, 0, sizeof(op)); op->send_ping = closure;
op.send_ping = closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
elem->filter->start_transport_op(exec_ctx, elem, &op); elem->filter->start_transport_op(exec_ctx, elem, op);
} }
static void publish_transport_locked(grpc_exec_ctx *exec_ctx, static void publish_transport_locked(grpc_exec_ctx *exec_ctx,

@ -181,7 +181,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->new_stream_map); grpc_chttp2_stream_map_destroy(&t->new_stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
grpc_combiner_destroy(t->executor.combiner); grpc_combiner_destroy(exec_ctx, t->executor.combiner);
/* callback remaining pings: they're not allowed to call into the transpot, /* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */ and maybe they hold resources that need to be freed */

@ -32,6 +32,7 @@
*/ */
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/workqueue.h"
#include <string.h> #include <string.h>
@ -52,7 +53,9 @@ struct grpc_combiner {
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock)); grpc_combiner *lock = gpr_malloc(sizeof(*lock));
lock->optional_workqueue = optional_workqueue; lock->optional_workqueue =
optional_workqueue ? GRPC_WORKQUEUE_REF(optional_workqueue, "combiner")
: NULL;
gpr_atm_no_barrier_store(&lock->state, 1); gpr_atm_no_barrier_store(&lock->state, 1);
gpr_mpscq_init(&lock->queue); gpr_mpscq_init(&lock->queue);
lock->take_async_break_before_final_list = false; lock->take_async_break_before_final_list = false;
@ -60,15 +63,18 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
return lock; return lock;
} }
static void really_destroy(grpc_combiner *lock) { static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
gpr_mpscq_destroy(&lock->queue); gpr_mpscq_destroy(&lock->queue);
if (lock->optional_workqueue != NULL) {
GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner");
}
gpr_free(lock); gpr_free(lock);
} }
void grpc_combiner_destroy(grpc_combiner *lock) { void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) { if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) {
really_destroy(lock); really_destroy(exec_ctx, lock);
} }
} }
@ -77,7 +83,12 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg, static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
if (maybe_finish_one(exec_ctx, arg)) finish(exec_ctx, arg); grpc_combiner *lock = arg;
GPR_ASSERT(exec_ctx->active_combiner == NULL);
exec_ctx->active_combiner = lock;
if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
GPR_ASSERT(exec_ctx->active_combiner == lock);
exec_ctx->active_combiner = NULL;
} }
static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
@ -96,6 +107,8 @@ static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg, static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
grpc_combiner *lock = arg; grpc_combiner *lock = arg;
GPR_ASSERT(exec_ctx->active_combiner == NULL);
exec_ctx->active_combiner = lock;
// quick peek to see if new things have turned up on the queue: if so, go back // quick peek to see if new things have turned up on the queue: if so, go back
// to executing them before the final list // to executing them before the final list
if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) { if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) {
@ -104,9 +117,12 @@ static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
execute_final(exec_ctx, lock); execute_final(exec_ctx, lock);
finish(exec_ctx, lock); finish(exec_ctx, lock);
} }
GPR_ASSERT(exec_ctx->active_combiner == lock);
exec_ctx->active_combiner = NULL;
} }
static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_ASSERT(exec_ctx->active_combiner == lock);
if (lock->take_async_break_before_final_list) { if (lock->take_async_break_before_final_list) {
grpc_closure_init(&lock->continue_finishing, continue_executing_final, grpc_closure_init(&lock->continue_finishing, continue_executing_final,
lock); lock);
@ -121,6 +137,7 @@ static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
GPR_ASSERT(exec_ctx->active_combiner == lock);
if (n == NULL) { if (n == NULL) {
// queue is in an inconsistant state: use this as a cue that we should // queue is in an inconsistant state: use this as a cue that we should
// go off and do something else for a while (and come back later) // go off and do something else for a while (and come back later)
@ -151,7 +168,7 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
case 3: // had one count, one unorphaned --> unlocked unorphaned case 3: // had one count, one unorphaned --> unlocked unorphaned
return; return;
case 2: // and one count, one orphaned --> unlocked and orphaned case 2: // and one count, one orphaned --> unlocked and orphaned
really_destroy(lock); really_destroy(exec_ctx, lock);
return; return;
case 1: case 1:
case 0: case 0:
@ -166,19 +183,43 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *cl, grpc_error *error) { grpc_closure *cl, grpc_error *error) {
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GPR_ASSERT(last & 1); // ensure lock has not been destroyed GPR_ASSERT(last & 1); // ensure lock has not been destroyed
if (last == 1) { if (exec_ctx->active_combiner == NULL) {
cl->cb(exec_ctx, cl->cb_arg, error); if (last == 1) {
GRPC_ERROR_UNREF(error); exec_ctx->active_combiner = lock;
finish(exec_ctx, lock); cl->cb(exec_ctx, cl->cb_arg, error);
GRPC_ERROR_UNREF(error);
finish(exec_ctx, lock);
GPR_ASSERT(exec_ctx->active_combiner == lock);
exec_ctx->active_combiner = NULL;
} else {
cl->error = error;
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
}
} else { } else {
cl->error = error; cl->error = error;
gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
lock);
grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
lock->optional_workqueue);
} }
} }
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error) {
grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
GRPC_ERROR_REF(error), true);
}
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error, grpc_closure *closure, grpc_error *error,
bool force_async_break) { bool force_async_break) {
if (exec_ctx->active_combiner != lock) {
grpc_combiner_execute(exec_ctx, lock,
grpc_closure_create(enqueue_finally, closure), error);
return;
}
if (force_async_break) { if (force_async_break) {
lock->take_async_break_before_final_list = true; lock->take_async_break_before_final_list = true;
} }

@ -40,8 +40,6 @@
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/mpscq.h" #include "src/core/lib/support/mpscq.h"
typedef struct grpc_combiner grpc_combiner;
// Provides serialized access to some resource. // Provides serialized access to some resource.
// Each action queued on an aelock is executed serially in a borrowed thread. // Each action queued on an aelock is executed serially in a borrowed thread.
// The actual thread executing actions may change over time (but there will only // The actual thread executing actions may change over time (but there will only
@ -51,18 +49,19 @@ typedef struct grpc_combiner grpc_combiner;
// necessary // necessary
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
// Destroy the lock // Destroy the lock
void grpc_combiner_destroy(grpc_combiner *lock); void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
// Execute \a action within the lock. // Execute \a action within the lock.
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error); grpc_closure *closure, grpc_error *error);
// Execute \a action within the lock just prior to unlocking. // Execute \a action within the lock just prior to unlocking.
// if \a force_async_break is additionally set, the combiner is forced to trip // if \a hint_async_break is additionally set, the combiner is tries to trip
// through the workqueue between finishing the primary queue of combined // through the workqueue between finishing the primary queue of combined
// closures and executing the finally list. // closures and executing the finally list.
// Can only be called from within a closure scheduled by grpc_combiner_execute // Takes a very slow and round-about path if not called from a
// grpc_combiner_execute closure
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error, grpc_closure *closure, grpc_error *error,
bool force_async_break); bool hint_async_break);
void grpc_combiner_force_async_finally(grpc_combiner *lock); void grpc_combiner_force_async_finally(grpc_combiner *lock);
#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ #endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */

@ -40,8 +40,8 @@
/** A workqueue represents a list of work to be executed asynchronously. /** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */ Forward declared here to avoid a circular dependency with workqueue.h. */
struct grpc_workqueue;
typedef struct grpc_workqueue grpc_workqueue; typedef struct grpc_workqueue grpc_workqueue;
typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER #ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
/** Execution context. /** Execution context.
@ -66,13 +66,15 @@ typedef struct grpc_workqueue grpc_workqueue;
*/ */
struct grpc_exec_ctx { struct grpc_exec_ctx {
grpc_closure_list closure_list; grpc_closure_list closure_list;
/** currently active combiner: updated only via combiner.c */
grpc_combiner *active_combiner;
bool cached_ready_to_finish; bool cached_ready_to_finish;
void *check_ready_to_finish_arg; void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
}; };
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
{ GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check } { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check }
#else #else
struct grpc_exec_ctx { struct grpc_exec_ctx {
bool cached_ready_to_finish; bool cached_ready_to_finish;

@ -334,14 +334,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
} }
void grpc_channel_destroy(grpc_channel *channel) { void grpc_channel_destroy(grpc_channel *channel) {
grpc_transport_op op; grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem; grpc_channel_element *elem;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
memset(&op, 0, sizeof(op)); op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
op.disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
elem->filter->start_transport_op(&exec_ctx, elem, &op); elem->filter->start_transport_op(&exec_ctx, elem, op);
GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel"); GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel");

@ -61,19 +61,18 @@ static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
void *tag, void *reserved) { void *tag, void *reserved) {
grpc_transport_op op; grpc_transport_op *op = grpc_make_transport_op(NULL);
ping_result *pr = gpr_malloc(sizeof(*pr)); ping_result *pr = gpr_malloc(sizeof(*pr));
grpc_channel_element *top_elem = grpc_channel_element *top_elem =
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(reserved == NULL); GPR_ASSERT(reserved == NULL);
memset(&op, 0, sizeof(op));
pr->tag = tag; pr->tag = tag;
pr->cq = cq; pr->cq = cq;
grpc_closure_init(&pr->closure, ping_done, pr); grpc_closure_init(&pr->closure, ping_done, pr);
op.send_ping = &pr->closure; op->send_ping = &pr->closure;
op.bind_pollset = grpc_cq_pollset(cq); op->bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag); grpc_cq_begin_op(cq, tag);
top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }

@ -272,22 +272,20 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
int send_goaway, grpc_error *send_disconnect) { int send_goaway, grpc_error *send_disconnect) {
grpc_transport_op op; struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
struct shutdown_cleanup_args *sc; grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
grpc_channel_element *elem; grpc_channel_element *elem;
memset(&op, 0, sizeof(op)); op->send_goaway = send_goaway;
op.send_goaway = send_goaway;
sc = gpr_malloc(sizeof(*sc));
sc->slice = gpr_slice_from_copied_string("Server shutdown"); sc->slice = gpr_slice_from_copied_string("Server shutdown");
op.goaway_message = &sc->slice; op->goaway_message = &sc->slice;
op.goaway_status = GRPC_STATUS_OK; op->goaway_status = GRPC_STATUS_OK;
op.disconnect_with_error = send_disconnect; op->disconnect_with_error = send_disconnect;
grpc_closure_init(&sc->closure, shutdown_cleanup, sc); op->on_consumed = &sc->closure;
op.on_consumed = &sc->closure;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
elem->filter->start_transport_op(exec_ctx, elem, &op); elem->filter->start_transport_op(exec_ctx, elem, op);
} }
static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
@ -434,14 +432,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand; chand->finish_destroy_channel_closure.cb_arg = chand;
grpc_transport_op op; grpc_transport_op *op =
memset(&op, 0, sizeof(op)); grpc_make_transport_op(&chand->finish_destroy_channel_closure);
op.set_accept_stream = true; op->set_accept_stream = true;
op.on_consumed = &chand->finish_destroy_channel_closure;
grpc_channel_next_op(exec_ctx, grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element( grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0), grpc_channel_get_channel_stack(chand->channel), 0),
&op); op);
} }
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
@ -832,14 +829,13 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
channel_data *chand = cd; channel_data *chand = cd;
grpc_server *server = chand->server; grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_transport_op op; grpc_transport_op *op = grpc_make_transport_op(NULL);
memset(&op, 0, sizeof(op)); op->on_connectivity_state_change = &chand->channel_connectivity_changed,
op.on_connectivity_state_change = &chand->channel_connectivity_changed, op->connectivity_state = &chand->connectivity_state;
op.connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(exec_ctx, grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element( grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0), grpc_channel_get_channel_stack(chand->channel), 0),
&op); op);
} else { } else {
gpr_mu_lock(&server->mu_global); gpr_mu_lock(&server->mu_global);
destroy_channel(exec_ctx, chand); destroy_channel(exec_ctx, chand);
@ -1101,7 +1097,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
size_t slots; size_t slots;
uint32_t probes; uint32_t probes;
uint32_t max_probes = 0; uint32_t max_probes = 0;
grpc_transport_op op; grpc_transport_op *op = NULL;
channel = channel =
grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
@ -1161,16 +1157,16 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
gpr_mu_unlock(&s->mu_global); gpr_mu_unlock(&s->mu_global);
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
memset(&op, 0, sizeof(op)); op = grpc_make_transport_op(NULL);
op.set_accept_stream = true; op->set_accept_stream = true;
op.set_accept_stream_fn = accept_stream; op->set_accept_stream_fn = accept_stream;
op.set_accept_stream_user_data = chand; op->set_accept_stream_user_data = chand;
op.on_connectivity_state_change = &chand->channel_connectivity_changed; op->on_connectivity_state_change = &chand->channel_connectivity_changed;
op.connectivity_state = &chand->connectivity_state; op->connectivity_state = &chand->connectivity_state;
if (gpr_atm_acq_load(&s->shutdown_flag) != 0) { if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
op.disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown"); op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
} }
grpc_transport_perform_op(exec_ctx, transport, &op); grpc_transport_perform_op(exec_ctx, transport, op);
} }
void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg, void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,

@ -32,10 +32,14 @@
*/ */
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
#include <string.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport_impl.h" #include "src/core/lib/transport/transport_impl.h"
@ -247,3 +251,25 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
add_error(op, &op->close_error, error); add_error(op, &op->close_error, error);
} }
typedef struct {
grpc_closure outer_on_complete;
grpc_closure *inner_on_complete;
grpc_transport_op op;
} made_transport_op;
static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
made_transport_op *op = arg;
grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error),
NULL);
gpr_free(op);
}
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
made_transport_op *op = gpr_malloc(sizeof(*op));
grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op);
op->inner_on_complete = on_complete;
memset(&op->op, 0, sizeof(op->op));
return &op->op;
}

@ -285,4 +285,8 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
grpc_transport *transport); grpc_transport *transport);
/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
\a on_consumed and then delete the returned transport op */
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed);
#endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */ #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */

@ -43,7 +43,9 @@
static void test_no_op(void) { static void test_no_op(void) {
gpr_log(GPR_DEBUG, "test_no_op"); gpr_log(GPR_DEBUG, "test_no_op");
grpc_combiner_destroy(grpc_combiner_create(NULL)); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_destroy(&exec_ctx, grpc_combiner_create(NULL));
grpc_exec_ctx_finish(&exec_ctx);
} }
static void set_bool_to_true(grpc_exec_ctx *exec_ctx, void *value, static void set_bool_to_true(grpc_exec_ctx *exec_ctx, void *value,
@ -60,9 +62,10 @@ static void test_execute_one(void) {
grpc_combiner_execute(&exec_ctx, lock, grpc_combiner_execute(&exec_ctx, lock,
grpc_closure_create(set_bool_to_true, &done), grpc_closure_create(set_bool_to_true, &done),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(done); GPR_ASSERT(done);
grpc_combiner_destroy(lock); grpc_combiner_destroy(&exec_ctx, lock);
grpc_exec_ctx_finish(&exec_ctx);
} }
typedef struct { typedef struct {
@ -89,7 +92,7 @@ static void execute_many_loop(void *a) {
size_t n = 1; size_t n = 1;
for (size_t i = 0; i < 10; i++) { for (size_t i = 0; i < 10; i++) {
for (size_t j = 0; j < 10000; j++) { for (size_t j = 0; j < 10000; j++) {
ex_args *c = gpr_malloc(sizeof(*a)); ex_args *c = gpr_malloc(sizeof(*c));
c->ctr = &args->ctr; c->ctr = &args->ctr;
c->value = n++; c->value = n++;
grpc_combiner_execute(&exec_ctx, args->lock, grpc_combiner_execute(&exec_ctx, args->lock,
@ -117,7 +120,9 @@ static void test_execute_many(void) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
gpr_thd_join(thds[i]); gpr_thd_join(thds[i]);
} }
grpc_combiner_destroy(lock); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_destroy(&exec_ctx, lock);
grpc_exec_ctx_finish(&exec_ctx);
} }
static bool got_in_finally = false; static bool got_in_finally = false;
@ -139,9 +144,10 @@ static void test_execute_finally(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_combiner_execute(&exec_ctx, lock, grpc_closure_create(add_finally, lock), grpc_combiner_execute(&exec_ctx, lock, grpc_closure_create(add_finally, lock),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
GPR_ASSERT(got_in_finally); GPR_ASSERT(got_in_finally);
grpc_combiner_destroy(lock); grpc_combiner_destroy(&exec_ctx, lock);
grpc_exec_ctx_finish(&exec_ctx);
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {

@ -57,24 +57,23 @@ void verify_connectivity(grpc_exec_ctx *exec_ctx, void *arg,
void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
void test_transport_op(grpc_channel *channel) { void test_transport_op(grpc_channel *channel) {
grpc_transport_op op; grpc_transport_op *op;
grpc_channel_element *elem; grpc_channel_element *elem;
grpc_connectivity_state state = GRPC_CHANNEL_IDLE; grpc_connectivity_state state = GRPC_CHANNEL_IDLE;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
memset(&op, 0, sizeof(op));
grpc_closure_init(&transport_op_cb, verify_connectivity, &op); grpc_closure_init(&transport_op_cb, verify_connectivity, &op);
op.on_connectivity_state_change = &transport_op_cb; op = grpc_make_transport_op(NULL);
op.connectivity_state = &state; op->on_connectivity_state_change = &transport_op_cb;
op->connectivity_state = &state;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
elem->filter->start_transport_op(&exec_ctx, elem, &op); elem->filter->start_transport_op(&exec_ctx, elem, op);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
memset(&op, 0, sizeof(op));
grpc_closure_init(&transport_op_cb, do_nothing, NULL); grpc_closure_init(&transport_op_cb, do_nothing, NULL);
op.on_consumed = &transport_op_cb; op = grpc_make_transport_op(&transport_op_cb);
elem->filter->start_transport_op(&exec_ctx, elem, &op); elem->filter->start_transport_op(&exec_ctx, elem, op);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }

Loading…
Cancel
Save