From e0341792d5a33469bbac481e9737bd00e3cb8900 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 8 Mar 2017 15:59:16 -0800 Subject: [PATCH 1/6] Remove most usage of the grpc_call lock --- src/core/lib/surface/call.c | 77 ++++++++++++++----------------------- 1 file changed, 29 insertions(+), 48 deletions(-) diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index c2547c5147a..9b01c46bd59 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -143,9 +143,10 @@ struct grpc_call { grpc_channel *channel; grpc_call *parent; grpc_call *first_child; + gpr_atm has_children; gpr_timespec start_time; - /* TODO(ctiller): share with cq if possible? */ - gpr_mu mu; + /* protects first_child, setting has_children, and child next/prev links */ + gpr_mu child_list_mu; /* client or server call */ bool is_client; @@ -161,7 +162,7 @@ struct grpc_call { bool receiving_message; bool requested_final_op; bool received_final_op; - bool sent_any_op; + gpr_atm num_ops_sent; /* have we received initial metadata */ bool has_initial_md_been_received; @@ -275,7 +276,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("grpc_call_create", 0); call = gpr_zalloc(sizeof(grpc_call) + channel_stack->call_stack_size); *out_call = call; - gpr_mu_init(&call->mu); + gpr_mu_init(&call->child_list_mu); call->channel = args->channel; call->cq = args->cq; call->parent = args->parent_call; @@ -313,7 +314,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(call->is_client); GPR_ASSERT(!args->parent_call->is_client); - gpr_mu_lock(&args->parent_call->mu); + gpr_mu_lock(&args->parent_call->child_list_mu); + gpr_atm_no_barrier_store(&args->parent_call->has_children, 1); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -353,7 +355,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, call; } - gpr_mu_unlock(&args->parent_call->mu); + gpr_mu_unlock(&args->parent_call->child_list_mu); } call->send_deadline = send_deadline; @@ -435,7 +437,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - gpr_mu_destroy(&c->mu); + gpr_mu_destroy(&c->child_list_mu); for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } @@ -473,7 +475,7 @@ void grpc_call_destroy(grpc_call *c) { GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); if (parent) { - gpr_mu_lock(&parent->mu); + gpr_mu_lock(&parent->child_list_mu); if (c == parent->first_child) { parent->first_child = c->sibling_next; if (c == parent->first_child) { @@ -482,15 +484,13 @@ void grpc_call_destroy(grpc_call *c) { c->sibling_prev->sibling_next = c->sibling_next; c->sibling_next->sibling_prev = c->sibling_prev; } - gpr_mu_unlock(&parent->mu); + gpr_mu_unlock(&parent->child_list_mu); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); } - gpr_mu_lock(&c->mu); GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = c->sent_any_op && !c->received_final_op; - gpr_mu_unlock(&c->mu); + cancel = gpr_atm_no_barrier_load(&c->num_ops_sent) && !c->received_final_op; if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); @@ -555,10 +555,8 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == NULL); - gpr_mu_lock(&c->mu); cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status, description); - gpr_mu_unlock(&c->mu); grpc_exec_ctx_finish(&exec_ctx); return GRPC_CALL_OK; } @@ -715,9 +713,7 @@ static void set_incoming_compression_algorithm( grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( grpc_call *call) { grpc_compression_algorithm algorithm; - gpr_mu_lock(&call->mu); algorithm = call->incoming_compression_algorithm; - gpr_mu_unlock(&call->mu); return algorithm; } @@ -729,9 +725,7 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked( uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { uint32_t flags; - gpr_mu_lock(&call->mu); flags = call->test_only_last_message_flags; - gpr_mu_unlock(&call->mu); return flags; } @@ -785,9 +779,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { uint32_t encodings_accepted_by_peer; - gpr_mu_lock(&call->mu); encodings_accepted_by_peer = call->encodings_accepted_by_peer; - gpr_mu_unlock(&call->mu); return encodings_accepted_by_peer; } @@ -1083,8 +1075,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, grpc_call *call = bctl->call; grpc_error *error = consolidate_batch_errors(bctl); - gpr_mu_lock(&call->mu); - if (bctl->send_initial_metadata) { grpc_metadata_batch_destroy( exec_ctx, @@ -1105,17 +1095,21 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, call->received_final_op = true; /* propagate cancellation to any interested children */ - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call, NULL); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); + if (gpr_atm_no_barrier_load(&call->has_children)) { + gpr_mu_lock(&call->child_list_mu); + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + grpc_call_cancel(child_call, NULL); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); + } + child_call = next_child_call; + } while (child_call != call->first_child); + } + gpr_mu_unlock(&call->child_list_mu); } if (call->is_client) { @@ -1130,7 +1124,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } - gpr_mu_unlock(&call->mu); if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ @@ -1221,7 +1214,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; - gpr_mu_lock(&bctl->call->mu); if (error != GRPC_ERROR_NONE) { if (call->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); @@ -1233,11 +1225,9 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { - gpr_mu_unlock(&bctl->call->mu); process_data_after_md(exec_ctx, bctlp); } else { call->saved_receiving_stream_ready_bctlp = bctlp; - gpr_mu_unlock(&bctl->call->mu); } } @@ -1309,8 +1299,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, batch_control *bctl = bctlp; grpc_call *call = bctl->call; - gpr_mu_lock(&call->mu); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = @@ -1336,11 +1324,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, grpc_schedule_on_exec_ctx); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_closure_sched(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); + grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); } - gpr_mu_unlock(&call->mu); - finish_batch_step(exec_ctx, bctl); } @@ -1393,7 +1379,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->notify_tag = notify_tag; bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); - gpr_mu_lock(&call->mu); grpc_transport_stream_op *stream_op = &bctl->op; memset(stream_op, 0, sizeof(*stream_op)); stream_op->covered_by_poller = true; @@ -1679,8 +1664,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; - call->sent_any_op = true; - gpr_mu_unlock(&call->mu); + gpr_atm_no_barrier_fetch_add(&call->num_ops_sent, 1); execute_op(exec_ctx, call, stream_op); @@ -1711,7 +1695,6 @@ done_with_error: if (bctl->recv_final_op) { call->requested_final_op = 0; } - gpr_mu_unlock(&call->mu); goto done; } @@ -1760,10 +1743,8 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; } grpc_compression_algorithm grpc_call_compression_for_level( grpc_call *call, grpc_compression_level level) { - gpr_mu_lock(&call->mu); grpc_compression_algorithm algo = compression_algorithm_for_level_locked(call, level); - gpr_mu_unlock(&call->mu); return algo; } From 676db291d2387603f795dd6bd5853188e0ff7f63 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 8 Mar 2017 16:10:56 -0800 Subject: [PATCH 2/6] Fix barriers --- src/core/lib/surface/call.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 9b01c46bd59..2f3ddd6bcb9 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -315,7 +315,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->parent_call->is_client); gpr_mu_lock(&args->parent_call->child_list_mu); - gpr_atm_no_barrier_store(&args->parent_call->has_children, 1); + gpr_atm_rel_store(&args->parent_call->has_children, 1); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -1095,7 +1095,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, call->received_final_op = true; /* propagate cancellation to any interested children */ - if (gpr_atm_no_barrier_load(&call->has_children)) { + if (gpr_atm_acq_load(&call->has_children)) { gpr_mu_lock(&call->child_list_mu); child_call = call->first_child; if (child_call != NULL) { From b597dcf53a5057425ba24a7c8dbed2f65f8c31f4 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 9 Mar 2017 07:02:11 -0800 Subject: [PATCH 3/6] Race fixes --- src/core/lib/surface/call.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 2f3ddd6bcb9..eafeef0a207 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -161,9 +161,9 @@ struct grpc_call { bool received_initial_metadata; bool receiving_message; bool requested_final_op; - bool received_final_op; - gpr_atm num_ops_sent; - + gpr_atm any_ops_sent_atm; + gpr_atm received_final_op_atm; + /* have we received initial metadata */ bool has_initial_md_been_received; @@ -458,7 +458,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, for (i = 0; i < STATUS_SOURCE_COUNT; i++) { GRPC_ERROR_UNREF( - unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error); + unpack_received_status(gpr_atm_acq_load(&c->status[i])).error); } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); @@ -490,7 +490,7 @@ void grpc_call_destroy(grpc_call *c) { GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = gpr_atm_no_barrier_load(&c->num_ops_sent) && !c->received_final_op; + cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && !gpr_atm_acq_load(&c->received_final_op_atm); if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); @@ -1048,7 +1048,7 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, } static grpc_error *consolidate_batch_errors(batch_control *bctl) { - size_t n = (size_t)gpr_atm_no_barrier_load(&bctl->num_errors); + size_t n = (size_t)gpr_atm_acq_load(&bctl->num_errors); if (n == 0) { return GRPC_ERROR_NONE; } else if (n == 1) { @@ -1093,7 +1093,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(exec_ctx, call, md); - call->received_final_op = true; + gpr_atm_rel_store(&call->received_final_op_atm, 1); /* propagate cancellation to any interested children */ if (gpr_atm_acq_load(&call->has_children)) { gpr_mu_lock(&call->child_list_mu); @@ -1286,7 +1286,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_error *error, bool has_cancelled) { if (error == GRPC_ERROR_NONE) return; - int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1); + int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1); if (idx == 0 && !has_cancelled) { cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error)); @@ -1664,7 +1664,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; - gpr_atm_no_barrier_fetch_add(&call->num_ops_sent, 1); + gpr_atm_rel_store(&call->any_ops_sent_atm, 1); execute_op(exec_ctx, call, stream_op); From 123c72bc3c7594e31620e603dce4322c4d7c57b1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Mar 2017 07:33:27 -0800 Subject: [PATCH 4/6] Handle a race between cancellation of a parent call and adding a child --- src/core/lib/surface/call.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index eafeef0a207..01efcafceba 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -163,7 +163,7 @@ struct grpc_call { bool requested_final_op; gpr_atm any_ops_sent_atm; gpr_atm received_final_op_atm; - + /* have we received initial metadata */ bool has_initial_md_been_received; @@ -343,6 +343,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; + if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { + cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + } } if (args->parent_call->first_child == NULL) { @@ -490,7 +494,8 @@ void grpc_call_destroy(grpc_call *c) { GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && !gpr_atm_acq_load(&c->received_final_op_atm); + cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && + !gpr_atm_acq_load(&c->received_final_op_atm); if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); @@ -1103,7 +1108,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, next_child_call = child_call->sibling_next; if (child_call->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call, NULL); + cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); } child_call = next_child_call; From c5b90df9b2738076acf6a6972d833489f6470335 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 10 Mar 2017 16:11:08 -0800 Subject: [PATCH 5/6] debug --- src/core/lib/surface/call.c | 74 +++++++++--------------------- src/core/lib/transport/transport.c | 3 +- 2 files changed, 24 insertions(+), 53 deletions(-) diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 01efcafceba..8b2f9fd3f53 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -143,9 +143,8 @@ struct grpc_call { grpc_channel *channel; grpc_call *parent; grpc_call *first_child; - gpr_atm has_children; gpr_timespec start_time; - /* protects first_child, setting has_children, and child next/prev links */ + /* protects first_child, and child next/prev links */ gpr_mu child_list_mu; /* client or server call */ @@ -315,7 +314,6 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->parent_call->is_client); gpr_mu_lock(&args->parent_call->child_list_mu); - gpr_atm_rel_store(&args->parent_call->has_children, 1); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -566,45 +564,19 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return GRPC_CALL_OK; } -typedef struct termination_closure { - grpc_closure closure; - grpc_call *call; - grpc_transport_stream_op op; -} termination_closure; - -static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, - grpc_error *error) { - termination_closure *tc = tcp; - GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "termination"); - gpr_free(tc); -} - -static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp, +static void done_termination(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { - termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.cancel_error = GRPC_ERROR_REF(error); - /* reuse closure to catch completion */ - tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc, - grpc_schedule_on_exec_ctx); - execute_op(exec_ctx, tc->call, &tc->op); -} - -static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_error *error) { - termination_closure *tc = gpr_malloc(sizeof(*tc)); - memset(tc, 0, sizeof(*tc)); - tc->call = c; - GRPC_CALL_INTERNAL_REF(tc->call, "termination"); - grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination, - tc, grpc_schedule_on_exec_ctx), - error); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination"); } static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { + GRPC_CALL_INTERNAL_REF(c, "termination"); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - terminate_with_error(exec_ctx, c, error); + grpc_transport_stream_op *op = grpc_make_transport_stream_op( + grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx)); + op->cancel_error = error; + execute_op(exec_ctx, c, op); } static grpc_error *error_from_status(grpc_status_code status, @@ -1100,23 +1072,21 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, gpr_atm_rel_store(&call->received_final_op_atm, 1); /* propagate cancellation to any interested children */ - if (gpr_atm_acq_load(&call->has_children)) { - gpr_mu_lock(&call->child_list_mu); - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); - } - gpr_mu_unlock(&call->child_list_mu); + gpr_mu_lock(&call->child_list_mu); + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); + } + child_call = next_child_call; + } while (child_call != call->first_child); } + gpr_mu_unlock(&call->child_list_mu); if (call->is_client) { get_final_status(call, set_status_value_directly, diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 165950e288e..3024f2ae781 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -254,8 +254,9 @@ typedef struct { static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { made_transport_stream_op *op = arg; - grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error)); + grpc_closure *c = op->inner_on_complete; gpr_free(op); + grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error)); } grpc_transport_stream_op *grpc_make_transport_stream_op( From b18c8ba0b6bbc2317dd92353288f2b1c0ff353d5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Mar 2017 15:51:37 -0700 Subject: [PATCH 6/6] Fix cancellation --- src/core/lib/surface/call.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 8b2f9fd3f53..47f36be5fda 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1070,8 +1070,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(exec_ctx, call, md); - gpr_atm_rel_store(&call->received_final_op_atm, 1); /* propagate cancellation to any interested children */ + gpr_atm_rel_store(&call->received_final_op_atm, 1); gpr_mu_lock(&call->child_list_mu); child_call = call->first_child; if (child_call != NULL) { @@ -1079,7 +1079,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, next_child_call = child_call->sibling_next; if (child_call->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); }