|
|
|
@ -145,16 +145,28 @@ typedef struct batch_control { |
|
|
|
|
grpc_transport_stream_op_batch op; |
|
|
|
|
} batch_control; |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
gpr_mu child_list_mu; |
|
|
|
|
grpc_call *first_child; |
|
|
|
|
} parent_call; |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
grpc_call *parent; |
|
|
|
|
/** siblings: children of the same parent form a list, and this list is
|
|
|
|
|
protected under |
|
|
|
|
parent->mu */ |
|
|
|
|
grpc_call *sibling_next; |
|
|
|
|
grpc_call *sibling_prev; |
|
|
|
|
} child_call; |
|
|
|
|
|
|
|
|
|
struct grpc_call { |
|
|
|
|
gpr_arena *arena; |
|
|
|
|
grpc_completion_queue *cq; |
|
|
|
|
grpc_polling_entity pollent; |
|
|
|
|
grpc_channel *channel; |
|
|
|
|
grpc_call *parent; |
|
|
|
|
grpc_call *first_child; |
|
|
|
|
gpr_timespec start_time; |
|
|
|
|
/* protects first_child, and child next/prev links */ |
|
|
|
|
gpr_mu child_list_mu; |
|
|
|
|
/* parent_call* */ gpr_atm parent_call_atm; |
|
|
|
|
child_call *child_call; |
|
|
|
|
|
|
|
|
|
/* client or server call */ |
|
|
|
|
bool is_client; |
|
|
|
@ -206,12 +218,6 @@ struct grpc_call { |
|
|
|
|
int send_extra_metadata_count; |
|
|
|
|
gpr_timespec send_deadline; |
|
|
|
|
|
|
|
|
|
/** siblings: children of the same parent form a list, and this list is
|
|
|
|
|
protected under |
|
|
|
|
parent->mu */ |
|
|
|
|
grpc_call *sibling_next; |
|
|
|
|
grpc_call *sibling_prev; |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer_stream sending_stream; |
|
|
|
|
|
|
|
|
|
grpc_byte_stream *receiving_stream; |
|
|
|
@ -276,6 +282,23 @@ static void add_init_error(grpc_error **composite, grpc_error *new) { |
|
|
|
|
*composite = grpc_error_add_child(*composite, new); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static parent_call *get_or_create_parent_call(grpc_call *call) { |
|
|
|
|
parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); |
|
|
|
|
if (p == NULL) { |
|
|
|
|
p = gpr_arena_alloc(call->arena, sizeof(*p)); |
|
|
|
|
gpr_mu_init(&p->child_list_mu); |
|
|
|
|
if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) { |
|
|
|
|
gpr_mu_destroy(&p->child_list_mu); |
|
|
|
|
p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return p; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static parent_call *get_parent_call(grpc_call *call) { |
|
|
|
|
return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
const grpc_call_create_args *args, |
|
|
|
|
grpc_call **out_call) { |
|
|
|
@ -291,10 +314,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
sizeof(grpc_call) + channel_stack->call_stack_size); |
|
|
|
|
call->arena = arena; |
|
|
|
|
*out_call = call; |
|
|
|
|
gpr_mu_init(&call->child_list_mu); |
|
|
|
|
call->channel = args->channel; |
|
|
|
|
call->cq = args->cq; |
|
|
|
|
call->parent = args->parent_call; |
|
|
|
|
call->start_time = gpr_now(GPR_CLOCK_MONOTONIC); |
|
|
|
|
/* Always support no compression */ |
|
|
|
|
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); |
|
|
|
@ -326,11 +347,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); |
|
|
|
|
|
|
|
|
|
if (args->parent_call != NULL) { |
|
|
|
|
child_call *cc = call->child_call = |
|
|
|
|
gpr_arena_alloc(arena, sizeof(child_call)); |
|
|
|
|
call->child_call->parent = args->parent_call; |
|
|
|
|
|
|
|
|
|
GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); |
|
|
|
|
GPR_ASSERT(call->is_client); |
|
|
|
|
GPR_ASSERT(!args->parent_call->is_client); |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&args->parent_call->child_list_mu); |
|
|
|
|
parent_call *pc = get_or_create_parent_call(args->parent_call); |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&pc->child_list_mu); |
|
|
|
|
|
|
|
|
|
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { |
|
|
|
|
send_deadline = gpr_time_min( |
|
|
|
@ -364,17 +391,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (args->parent_call->first_child == NULL) { |
|
|
|
|
args->parent_call->first_child = call; |
|
|
|
|
call->sibling_next = call->sibling_prev = call; |
|
|
|
|
if (pc->first_child == NULL) { |
|
|
|
|
pc->first_child = call; |
|
|
|
|
cc->sibling_next = cc->sibling_prev = call; |
|
|
|
|
} else { |
|
|
|
|
call->sibling_next = args->parent_call->first_child; |
|
|
|
|
call->sibling_prev = args->parent_call->first_child->sibling_prev; |
|
|
|
|
call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = |
|
|
|
|
call; |
|
|
|
|
cc->sibling_next = pc->first_child; |
|
|
|
|
cc->sibling_prev = pc->first_child->child_call->sibling_prev; |
|
|
|
|
cc->sibling_next->child_call->sibling_prev = |
|
|
|
|
cc->sibling_prev->child_call->sibling_next = call; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&args->parent_call->child_list_mu); |
|
|
|
|
gpr_mu_unlock(&pc->child_list_mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
call->send_deadline = send_deadline; |
|
|
|
@ -469,7 +496,10 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, |
|
|
|
|
if (c->receiving_stream != NULL) { |
|
|
|
|
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); |
|
|
|
|
} |
|
|
|
|
gpr_mu_destroy(&c->child_list_mu); |
|
|
|
|
parent_call *pc = get_parent_call(c); |
|
|
|
|
if (pc != NULL) { |
|
|
|
|
gpr_mu_destroy(&pc->child_list_mu); |
|
|
|
|
} |
|
|
|
|
for (ii = 0; ii < c->send_extra_metadata_count; ii++) { |
|
|
|
|
GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); |
|
|
|
|
} |
|
|
|
@ -499,31 +529,31 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_call_destroy(grpc_call *c) { |
|
|
|
|
int cancel; |
|
|
|
|
grpc_call *parent = c->parent; |
|
|
|
|
child_call *cc = c->child_call; |
|
|
|
|
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
|
|
|
|
|
|
|
|
|
GPR_TIMER_BEGIN("grpc_call_destroy", 0); |
|
|
|
|
GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); |
|
|
|
|
|
|
|
|
|
if (parent) { |
|
|
|
|
gpr_mu_lock(&parent->child_list_mu); |
|
|
|
|
if (c == parent->first_child) { |
|
|
|
|
parent->first_child = c->sibling_next; |
|
|
|
|
if (c == parent->first_child) { |
|
|
|
|
parent->first_child = NULL; |
|
|
|
|
if (cc) { |
|
|
|
|
parent_call *pc = get_parent_call(cc->parent); |
|
|
|
|
gpr_mu_lock(&pc->child_list_mu); |
|
|
|
|
if (c == pc->first_child) { |
|
|
|
|
pc->first_child = cc->sibling_next; |
|
|
|
|
if (c == pc->first_child) { |
|
|
|
|
pc->first_child = NULL; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
c->sibling_prev->sibling_next = c->sibling_next; |
|
|
|
|
c->sibling_next->sibling_prev = c->sibling_prev; |
|
|
|
|
gpr_mu_unlock(&parent->child_list_mu); |
|
|
|
|
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); |
|
|
|
|
cc->sibling_prev->child_call->sibling_next = cc->sibling_next; |
|
|
|
|
cc->sibling_next->child_call->sibling_prev = cc->sibling_prev; |
|
|
|
|
gpr_mu_unlock(&pc->child_list_mu); |
|
|
|
|
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(!c->destroy_called); |
|
|
|
|
c->destroy_called = 1; |
|
|
|
|
cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && |
|
|
|
|
!gpr_atm_acq_load(&c->received_final_op_atm); |
|
|
|
|
bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 && |
|
|
|
|
gpr_atm_acq_load(&c->received_final_op_atm) == 0; |
|
|
|
|
if (cancel) { |
|
|
|
|
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, |
|
|
|
|
GRPC_ERROR_CANCELLED); |
|
|
|
@ -1079,7 +1109,6 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) { |
|
|
|
|
|
|
|
|
|
static void post_batch_completion(grpc_exec_ctx *exec_ctx, |
|
|
|
|
batch_control *bctl) { |
|
|
|
|
grpc_call *child_call; |
|
|
|
|
grpc_call *next_child_call; |
|
|
|
|
grpc_call *call = bctl->call; |
|
|
|
|
grpc_error *error = consolidate_batch_errors(bctl); |
|
|
|
@ -1104,21 +1133,25 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
/* propagate cancellation to any interested children */ |
|
|
|
|
gpr_atm_rel_store(&call->received_final_op_atm, 1); |
|
|
|
|
gpr_mu_lock(&call->child_list_mu); |
|
|
|
|
child_call = call->first_child; |
|
|
|
|
if (child_call != NULL) { |
|
|
|
|
do { |
|
|
|
|
next_child_call = child_call->sibling_next; |
|
|
|
|
if (child_call->cancellation_is_inherited) { |
|
|
|
|
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); |
|
|
|
|
cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE, |
|
|
|
|
GRPC_ERROR_CANCELLED); |
|
|
|
|
GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); |
|
|
|
|
} |
|
|
|
|
child_call = next_child_call; |
|
|
|
|
} while (child_call != call->first_child); |
|
|
|
|
parent_call *pc = get_parent_call(call); |
|
|
|
|
if (pc != NULL) { |
|
|
|
|
grpc_call *child; |
|
|
|
|
gpr_mu_lock(&pc->child_list_mu); |
|
|
|
|
child = pc->first_child; |
|
|
|
|
if (child != NULL) { |
|
|
|
|
do { |
|
|
|
|
next_child_call = child->child_call->sibling_next; |
|
|
|
|
if (child->cancellation_is_inherited) { |
|
|
|
|
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); |
|
|
|
|
cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE, |
|
|
|
|
GRPC_ERROR_CANCELLED); |
|
|
|
|
GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel"); |
|
|
|
|
} |
|
|
|
|
child = next_child_call; |
|
|
|
|
} while (child != pc->first_child); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&pc->child_list_mu); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&call->child_list_mu); |
|
|
|
|
|
|
|
|
|
if (call->is_client) { |
|
|
|
|
get_final_status(call, set_status_value_directly, |
|
|
|
|