Fix resource leak

reviewable/pr8008/r3
Craig Tiller 9 years ago
parent df844f8a5c
commit fc2636d7a6
  1. 32
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 23
      src/core/lib/iomgr/combiner.c
  3. 11
      src/core/lib/iomgr/iomgr.c
  4. 3
      src/core/lib/support/log.c
  5. 6
      test/core/end2end/tests/payload.c

@ -564,11 +564,11 @@ static const char *write_state_name(grpc_chttp2_write_state st) {
} }
static void set_write_state(grpc_chttp2_transport *t, static void set_write_state(grpc_chttp2_transport *t,
grpc_chttp2_write_state st) { grpc_chttp2_write_state st, const char *reason) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s", t, GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t,
t->is_client ? "CLIENT" : "SERVER", t->is_client ? "CLIENT" : "SERVER",
write_state_name(t->write_state), write_state_name(t->write_state),
write_state_name(st))); write_state_name(st), reason));
t->write_state = st; t->write_state = st;
} }
@ -579,7 +579,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
switch (t->write_state) { switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE: case GRPC_CHTTP2_WRITE_STATE_IDLE:
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner, grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked, &t->write_action_begin_locked,
@ -590,12 +590,14 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
t, t,
covered_by_poller covered_by_poller
? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER
: GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE); : GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
reason);
break; break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
if (covered_by_poller) { if (covered_by_poller) {
set_write_state( set_write_state(
t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER); t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
reason);
} }
break; break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
@ -620,10 +622,10 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = gt; grpc_chttp2_transport *t = gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) { if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) {
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "begin writing");
grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL);
} else { } else {
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
} }
GPR_TIMER_END("write_action_begin_locked", 0); GPR_TIMER_END("write_action_begin_locked", 0);
@ -661,11 +663,12 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_UNREACHABLE_CODE(break); GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITE_STATE_WRITING: case GRPC_CHTTP2_WRITE_STATE_WRITING:
GPR_TIMER_MARK("state=writing", 0); GPR_TIMER_MARK("state=writing", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
break; break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0); GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [!covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner, grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked, &t->write_action_begin_locked,
@ -673,7 +676,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
break; break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0); GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner, grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked, &t->write_action_begin_locked,
@ -1887,13 +1891,17 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
max_recv_bytes += t->stream_lookahead; max_recv_bytes += t->stream_lookahead;
if (s->max_recv_bytes < max_recv_bytes) { if (s->max_recv_bytes < max_recv_bytes) {
uint32_t add_max_recv_bytes = max_recv_bytes - s->max_recv_bytes; uint32_t add_max_recv_bytes = max_recv_bytes - s->max_recv_bytes;
bool new_window_write_is_covered_by_poller =
s->max_recv_bytes < have_already;
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, max_recv_bytes, GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, max_recv_bytes,
add_max_recv_bytes); add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window, GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window,
add_max_recv_bytes); add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
add_max_recv_bytes); add_max_recv_bytes);
grpc_chttp2_become_writable(exec_ctx, t, s, true, "read_incoming_stream"); grpc_chttp2_become_writable(exec_ctx, t, s,
new_window_write_is_covered_by_poller,
"read_incoming_stream");
} }
} }

@ -142,11 +142,11 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *cl, grpc_error *error, grpc_closure *cl, grpc_error *error,
bool covered_by_poller) { bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_execute c=%p cov=%d", lock,
cl, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute", 0); GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock,
cl, covered_by_poller, last));
GPR_ASSERT(last & 1); // ensure lock has not been destroyed GPR_ASSERT(last & 1); // ensure lock has not been destroyed
cl->error_data.scratch = cl->error_data.scratch =
pack_error_data((error_data){error, covered_by_poller}); pack_error_data((error_data){error, covered_by_poller});
@ -177,6 +177,8 @@ static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
move_next(exec_ctx); move_next(exec_ctx);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock,
lock->optional_workqueue));
grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload, grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
@ -189,6 +191,15 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
return false; return false;
} }
GRPC_COMBINER_TRACE(
gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
"is_covered_by_poller=%d exec_ctx_ready_to_finish=%d "
"time_to_execute_final_list=%d",
lock, lock->optional_workqueue, is_covered_by_poller(lock),
grpc_exec_ctx_ready_to_finish(exec_ctx),
lock->time_to_execute_final_list));
if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) && if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) &&
grpc_exec_ctx_ready_to_finish(exec_ctx)) { grpc_exec_ctx_ready_to_finish(exec_ctx)) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
@ -289,9 +300,9 @@ static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error, grpc_closure *closure, grpc_error *error,
bool covered_by_poller) { bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, GRPC_COMBINER_TRACE(gpr_log(
"C:%p grpc_combiner_execute_finally c=%p; ac=%p", GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock,
lock, closure, exec_ctx->active_combiner)); closure, exec_ctx->active_combiner, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute_finally", 0); GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) { if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0); GPR_TIMER_MARK("slowpath", 0);

@ -112,6 +112,14 @@ void grpc_iomgr_shutdown(void) {
continue; continue;
} }
if (g_root_object.next != &g_root_object) { if (g_root_object.next != &g_root_object) {
if (grpc_iomgr_abort_on_leaks()) {
gpr_log(GPR_DEBUG, "Failed to free %" PRIuPTR
" iomgr objects before shutdown deadline: "
"memory leaks are likely",
count_objects());
dump_objects("LEAKED");
abort();
}
gpr_timespec short_deadline = gpr_time_add( gpr_timespec short_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) { if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) {
@ -122,9 +130,6 @@ void grpc_iomgr_shutdown(void) {
"memory leaks are likely", "memory leaks are likely",
count_objects()); count_objects());
dump_objects("LEAKED"); dump_objects("LEAKED");
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
} }
break; break;
} }

@ -60,8 +60,9 @@ const char *gpr_log_severity_string(gpr_log_severity severity) {
void gpr_log_message(const char *file, int line, gpr_log_severity severity, void gpr_log_message(const char *file, int line, gpr_log_severity severity,
const char *message) { const char *message) {
if ((gpr_atm)severity < gpr_atm_no_barrier_load(&g_min_severity_to_print)) if ((gpr_atm)severity < gpr_atm_no_barrier_load(&g_min_severity_to_print)) {
return; return;
}
gpr_log_func_args lfargs; gpr_log_func_args lfargs;
memset(&lfargs, 0, sizeof(lfargs)); memset(&lfargs, 0, sizeof(lfargs));

@ -99,11 +99,11 @@ static void end_test(grpc_end2end_test_fixture *f) {
static gpr_slice generate_random_slice() { static gpr_slice generate_random_slice() {
size_t i; size_t i;
static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890"; static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
char output[1024 * 1024]; /* 1 MB */ char output[1024 * 1024];
for (i = 0; i < 1024 * 1024 - 1; ++i) { for (i = 0; i < GPR_ARRAY_SIZE(output) - 1; ++i) {
output[i] = chars[rand() % (int)(sizeof(chars) - 1)]; output[i] = chars[rand() % (int)(sizeof(chars) - 1)];
} }
output[1024 * 1024 - 1] = '\0'; output[GPR_ARRAY_SIZE(output) - 1] = '\0';
return gpr_slice_from_copied_string(output); return gpr_slice_from_copied_string(output);
} }

Loading…
Cancel
Save