Allow asynchronous call destruction in some cases

If there's a chance we're being called via a callback, we need to delay destruction until later. Otherwise, for performance, destroy it inline.
pull/357/head
Craig Tiller 10 years ago
parent cb818ba745
commit aef25da3e8
  1. 25
      src/core/surface/call.c
  2. 2
      src/core/surface/call.h
  3. 2
      src/core/surface/completion_queue.c

@ -183,8 +183,9 @@ legacy_state *get_legacy_state(grpc_call *call) {
void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
static void destroy_call(grpc_call *c) {
static void destroy_call(void *call, int ignored_success) {
int i;
grpc_call *c = call;
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
grpc_channel_internal_unref(c->channel);
gpr_mu_destroy(&c->mu);
@ -203,9 +204,13 @@ static void destroy_call(grpc_call *c) {
gpr_free(c);
}
void grpc_call_internal_unref(grpc_call *c) {
void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
if (gpr_unref(&c->internal_refcount)) {
destroy_call(c);
if (allow_immediate_deletion) {
destroy_call(c, 1);
} else {
grpc_iomgr_add_callback(destroy_call, c);
}
}
}
@ -350,7 +355,7 @@ static void finish_write_step(void *pc, grpc_op_error error) {
}
call->sending = 0;
unlock(call);
grpc_call_internal_unref(call);
grpc_call_internal_unref(call, 0);
}
static void finish_finish_step(void *pc, grpc_op_error error) {
@ -359,7 +364,7 @@ static void finish_finish_step(void *pc, grpc_op_error error) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, error);
call->sending = 0;
unlock(call);
grpc_call_internal_unref(call);
grpc_call_internal_unref(call, 0);
}
static void finish_start_step(void *pc, grpc_op_error error) {
@ -368,7 +373,7 @@ static void finish_start_step(void *pc, grpc_op_error error) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
call->sending = 0;
unlock(call);
grpc_call_internal_unref(call);
grpc_call_internal_unref(call, 0);
}
static send_action choose_send_action(grpc_call *call) {
@ -464,7 +469,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
lock(call);
call->sending = 0;
unlock(call);
grpc_call_internal_unref(call);
grpc_call_internal_unref(call, 0);
break;
case SEND_FINISH:
if (!call->is_client) {
@ -656,7 +661,7 @@ void grpc_call_destroy(grpc_call *c) {
cancel = !c->stream_closed;
unlock(c);
if (cancel) grpc_call_cancel(c);
grpc_call_internal_unref(c);
grpc_call_internal_unref(c, 1);
}
grpc_call_error grpc_call_cancel(grpc_call *c) {
@ -958,7 +963,7 @@ static void call_alarm(void *arg, int success) {
grpc_call_cancel(call);
}
}
grpc_call_internal_unref(call);
grpc_call_internal_unref(call, 1);
}
void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
@ -998,7 +1003,7 @@ void grpc_call_stream_closed(grpc_call_element *elem) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
}
unlock(call);
grpc_call_internal_unref(call);
grpc_call_internal_unref(call, 0);
}
/* we offset status by a small amount when storing it into transport metadata

@ -46,7 +46,7 @@ grpc_call *grpc_call_create(grpc_channel *channel,
const void *server_transport_data);
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
/* Helpers for grpc_client, grpc_server filters to publish received data to
the completion queue/surface layer */

@ -388,7 +388,7 @@ void grpc_event_finish(grpc_event *base) {
event *ev = (event *)base;
ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
if (ev->base.call) {
grpc_call_internal_unref(ev->base.call);
grpc_call_internal_unref(ev->base.call, 1);
}
gpr_free(ev);
}

Loading…
Cancel
Save