Merge pull request #12281 from muxi/fix-cronet-transport-refcount

Fix use of Cronet refcount
pull/12375/merge
Muxi Yan 8 years ago committed by GitHub
commit 38b9a4d8cc
  1. 75
      src/core/ext/transport/cronet/transport/cronet_transport.c

@ -187,9 +187,34 @@ struct stream_obj {
/* Mutex to protect storage */ /* Mutex to protect storage */
gpr_mu mu; gpr_mu mu;
/* Refcount object of the stream */
grpc_stream_refcount *refcount;
}; };
typedef struct stream_obj stream_obj; typedef struct stream_obj stream_obj;
#ifndef NDEBUG
#define GRPC_CRONET_STREAM_REF(stream, reason) \
grpc_cronet_stream_ref((stream), (reason))
#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
grpc_cronet_stream_unref((exec_ctx), (stream), (reason))
void grpc_cronet_stream_ref(stream_obj *s, const char *reason) {
grpc_stream_ref(s->refcount, reason);
}
void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s,
const char *reason) {
grpc_stream_unref(exec_ctx, s->refcount, reason);
}
#else
#define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
grpc_cronet_stream_unref((exec_ctx), (stream))
void grpc_cronet_stream_ref(stream_obj *s) { grpc_stream_ref(s->refcount); }
void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s) {
grpc_stream_unref(exec_ctx, s->refcount);
}
#endif
static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas); struct op_and_state *oas);
@ -346,13 +371,12 @@ static void remove_from_storage(struct stream_obj *s,
This can get executed from the Cronet network thread via cronet callback This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function. or on the application supplied thread via the perform_stream_op function.
*/ */
static void execute_from_storage(stream_obj *s) { static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
for (struct op_and_state *curr = s->storage.head; curr != NULL;) { for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
GPR_ASSERT(curr->done == 0); GPR_ASSERT(curr->done == 0);
enum e_op_result result = execute_stream_op(&exec_ctx, curr); enum e_op_result result = execute_stream_op(exec_ctx, curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr, CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string(result)); op_result_string(result));
/* if this op is done, then remove it and free memory */ /* if this op is done, then remove it and free memory */
@ -369,7 +393,6 @@ static void execute_from_storage(stream_obj *s) {
} }
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx);
} }
/* /*
@ -377,6 +400,8 @@ static void execute_from_storage(stream_obj *s) {
*/ */
static void on_failed(bidirectional_stream *stream, int net_error) { static void on_failed(bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs); bidirectional_stream_destroy(s->cbs);
@ -392,7 +417,9 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
} }
null_and_maybe_free_read_buffer(s); null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(&exec_ctx, s);
GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
grpc_exec_ctx_finish(&exec_ctx);
} }
/* /*
@ -400,6 +427,8 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
*/ */
static void on_canceled(bidirectional_stream *stream) { static void on_canceled(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs); bidirectional_stream_destroy(s->cbs);
@ -415,7 +444,9 @@ static void on_canceled(bidirectional_stream *stream) {
} }
null_and_maybe_free_read_buffer(s); null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(&exec_ctx, s);
GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
grpc_exec_ctx_finish(&exec_ctx);
} }
/* /*
@ -423,6 +454,8 @@ static void on_canceled(bidirectional_stream *stream) {
*/ */
static void on_succeeded(bidirectional_stream *stream) { static void on_succeeded(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs); bidirectional_stream_destroy(s->cbs);
@ -430,7 +463,9 @@ static void on_succeeded(bidirectional_stream *stream) {
s->cbs = NULL; s->cbs = NULL;
null_and_maybe_free_read_buffer(s); null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(&exec_ctx, s);
GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
grpc_exec_ctx_finish(&exec_ctx);
} }
/* /*
@ -438,6 +473,7 @@ static void on_succeeded(bidirectional_stream *stream) {
*/ */
static void on_stream_ready(bidirectional_stream *stream) { static void on_stream_ready(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
@ -457,7 +493,8 @@ static void on_stream_ready(bidirectional_stream *stream) {
} }
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
} }
/* /*
@ -513,14 +550,15 @@ static void on_response_headers_received(
s->state.pending_read_from_cronet = true; s->state.pending_read_from_cronet = true;
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
execute_from_storage(s);
} }
/* /*
Cronet callback Cronet callback
*/ */
static void on_write_completed(bidirectional_stream *stream, const char *data) { static void on_write_completed(bidirectional_stream *stream, const char *data) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
@ -530,7 +568,8 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) {
} }
s->state.state_callback_received[OP_SEND_MESSAGE] = true; s->state.state_callback_received[OP_SEND_MESSAGE] = true;
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
} }
/* /*
@ -538,6 +577,7 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) {
*/ */
static void on_read_completed(bidirectional_stream *stream, char *data, static void on_read_completed(bidirectional_stream *stream, char *data,
int count) { int count) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation; stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count); count);
@ -563,14 +603,15 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} else { } else {
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(&exec_ctx, s);
} }
} else { } else {
null_and_maybe_free_read_buffer(s); null_and_maybe_free_read_buffer(s);
s->state.rs.read_stream_closed = true; s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
execute_from_storage(s); execute_from_storage(&exec_ctx, s);
} }
grpc_exec_ctx_finish(&exec_ctx);
} }
/* /*
@ -625,12 +666,11 @@ static void on_response_trailers_received(
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx);
} else { } else {
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx); execute_from_storage(&exec_ctx, s);
execute_from_storage(s);
} }
grpc_exec_ctx_finish(&exec_ctx);
} }
/* /*
@ -1313,6 +1353,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount, grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data, gpr_arena *arena) { const void *server_data, gpr_arena *arena) {
stream_obj *s = (stream_obj *)gs; stream_obj *s = (stream_obj *)gs;
s->refcount = refcount;
GRPC_CRONET_STREAM_REF(s, "cronet transport");
memset(&s->storage, 0, sizeof(s->storage)); memset(&s->storage, 0, sizeof(s->storage));
s->storage.head = NULL; s->storage.head = NULL;
memset(&s->state, 0, sizeof(s->state)); memset(&s->state, 0, sizeof(s->state));
@ -1370,7 +1413,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
} }
stream_obj *s = (stream_obj *)gs; stream_obj *s = (stream_obj *)gs;
add_to_storage(s, op); add_to_storage(s, op);
execute_from_storage(s); execute_from_storage(exec_ctx, s);
} }
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,

Loading…
Cancel
Save