Merge pull request #5535 from ctiller/streams

Refcounting fix for bug exposed by qps_test
pull/5550/head^2
Vijay Pai 9 years ago
commit 40d27ba784
  1. 1
      grpc.def
  2. 4
      include/grpc/impl/codegen/sync.h
  3. 16
      src/core/iomgr/iomgr.c
  4. 6
      src/core/iomgr/iomgr_internal.h
  5. 7
      src/core/support/sync.c
  6. 18
      src/core/transport/chttp2/internal.h
  7. 6
      src/core/transport/chttp2/parsing.c
  8. 30
      src/core/transport/chttp2/stream_lists.c
  9. 35
      src/core/transport/chttp2/writing.c
  10. 43
      src/core/transport/chttp2_transport.c
  11. 8
      src/core/transport/metadata.c
  12. 2
      src/core/transport/transport.c
  13. 2
      src/python/grpcio/grpc/_cython/imports.generated.c
  14. 3
      src/python/grpcio/grpc/_cython/imports.generated.h
  15. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  16. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  17. 2
      test/cpp/interop/reconnect_interop_client.cc

@ -182,6 +182,7 @@ EXPORTS
gpr_event_wait gpr_event_wait
gpr_ref_init gpr_ref_init
gpr_ref gpr_ref
gpr_ref_non_zero
gpr_refn gpr_refn
gpr_unref gpr_unref
gpr_stats_init gpr_stats_init

@ -182,6 +182,10 @@ GPRAPI void gpr_ref_init(gpr_refcount *r, int n);
/* Increment the reference count *r. Requires *r initialized. */ /* Increment the reference count *r. Requires *r initialized. */
GPRAPI void gpr_ref(gpr_refcount *r); GPRAPI void gpr_ref(gpr_refcount *r);
/* Increment the reference count *r. Requires *r initialized.
Crashes if refcount is zero */
GPRAPI void gpr_ref_non_zero(gpr_refcount *r);
/* Increment the reference count *r by n. Requires *r initialized, n > 0. */ /* Increment the reference count *r by n. Requires *r initialized, n > 0. */
GPRAPI void gpr_refn(gpr_refcount *r, int n); GPRAPI void gpr_refn(gpr_refcount *r, int n);

@ -41,9 +41,11 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include <grpc/support/useful.h>
#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/timer.h" #include "src/core/iomgr/timer.h"
#include "src/core/support/env.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
static gpr_mu g_mu; static gpr_mu g_mu;
@ -116,6 +118,9 @@ void grpc_iomgr_shutdown(void) {
"memory leaks are likely", "memory leaks are likely",
count_objects()); count_objects());
dump_objects("LEAKED"); dump_objects("LEAKED");
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
} }
break; break;
} }
@ -154,3 +159,14 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
gpr_free(obj->name); gpr_free(obj->name);
} }
bool grpc_iomgr_abort_on_leaks(void) {
char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS");
if (env == NULL) return false;
static const char *truthy[] = {"yes", "Yes", "YES", "true",
"True", "TRUE", "1"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == strcmp(env, truthy[i])) return true;
}
return false;
}

@ -1,6 +1,6 @@
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H #ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
#include <stdbool.h>
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
@ -55,4 +57,6 @@ void grpc_iomgr_platform_flush(void);
/** tear down all platform specific global iomgr structures */ /** tear down all platform specific global iomgr structures */
void grpc_iomgr_platform_shutdown(void); void grpc_iomgr_platform_shutdown(void);
bool grpc_iomgr_abort_on_leaks(void);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */ #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */

@ -1,6 +1,6 @@
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -98,6 +98,11 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); }
void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); } void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); }
void gpr_ref_non_zero(gpr_refcount *r) {
gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1);
GPR_ASSERT(prior > 0);
}
void gpr_refn(gpr_refcount *r, int n) { void gpr_refn(gpr_refcount *r, int n) {
gpr_atm_no_barrier_fetch_add(&r->count, n); gpr_atm_no_barrier_fetch_add(&r->count, n);
} }

@ -417,7 +417,7 @@ typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */ /** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id; uint32_t id;
uint8_t fetching; uint8_t fetching;
uint8_t sent_initial_metadata; bool sent_initial_metadata;
uint8_t sent_message; uint8_t sent_message;
uint8_t sent_trailing_metadata; uint8_t sent_trailing_metadata;
uint8_t read_closed; uint8_t read_closed;
@ -509,7 +509,7 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global, grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing); grpc_chttp2_transport_parsing *parsing);
void grpc_chttp2_list_add_writable_stream( bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);
/** Get a writable stream /** Get a writable stream
@ -519,14 +519,13 @@ int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing); grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_remove_writable_stream( bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global) GRPC_MUST_USE_RESULT;
/* returns 1 if stream added, 0 if it was already present */ void grpc_chttp2_list_add_writing_stream(
int grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT; grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_have_writing_streams( int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing); grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_list_pop_writing_stream( int grpc_chttp2_list_pop_writing_stream(
@ -770,4 +769,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *parsing, grpc_chttp2_transport_parsing *parsing,
const uint8_t *opaque_8bytes); const uint8_t *opaque_8bytes);
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
#endif #endif

@ -1,6 +1,6 @@
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -149,7 +149,7 @@ void grpc_chttp2_publish_reads(
if (was_zero && !is_zero) { if (was_zero && !is_zero) {
while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
&stream_global)) { &stream_global)) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); grpc_chttp2_become_writable(transport_global, stream_global);
} }
} }
@ -178,7 +178,7 @@ void grpc_chttp2_publish_reads(
outgoing_window); outgoing_window);
is_zero = stream_global->outgoing_window <= 0; is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) { if (was_zero && !is_zero) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); grpc_chttp2_become_writable(transport_global, stream_global);
} }
stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( stream_global->max_recv_bytes -= (uint32_t)GPR_MIN(

@ -100,11 +100,14 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
} }
} }
static void stream_list_maybe_remove(grpc_chttp2_transport *t, static bool stream_list_maybe_remove(grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) { grpc_chttp2_stream_list_id id) {
if (s->included[id]) { if (s->included[id]) {
stream_list_remove(t, s, id); stream_list_remove(t, s, id);
return true;
} else {
return false;
} }
} }
@ -125,23 +128,24 @@ static void stream_list_add_tail(grpc_chttp2_transport *t,
s->included[id] = 1; s->included[id] = 1;
} }
static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, static bool stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) { grpc_chttp2_stream_list_id id) {
if (s->included[id]) { if (s->included[id]) {
return 0; return false;
} }
stream_list_add_tail(t, s, id); stream_list_add_tail(t, s, id);
return 1; return true;
} }
/* wrappers for specializations */ /* wrappers for specializations */
void grpc_chttp2_list_add_writable_stream( bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) { grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0); GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), return stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE);
} }
int grpc_chttp2_list_pop_writable_stream( int grpc_chttp2_list_pop_writable_stream(
@ -159,20 +163,20 @@ int grpc_chttp2_list_pop_writable_stream(
return r; return r;
} }
void grpc_chttp2_list_remove_writable_stream( bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) { grpc_chttp2_stream_global *stream_global) {
stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE); GRPC_CHTTP2_LIST_WRITABLE);
} }
int grpc_chttp2_list_add_writing_stream( void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) { grpc_chttp2_stream_writing *stream_writing) {
return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), GPR_ASSERT(stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing), STREAM_FROM_WRITING(stream_writing),
GRPC_CHTTP2_LIST_WRITING); GRPC_CHTTP2_LIST_WRITING));
} }
int grpc_chttp2_list_have_writing_streams( int grpc_chttp2_list_have_writing_streams(
@ -332,7 +336,7 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport(
while (stream_list_pop(transport, &stream, while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
if (is_window_available) { if (is_window_available) {
grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); grpc_chttp2_become_writable(&transport->global, &stream->global);
} else { } else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing, grpc_chttp2_list_add_stalled_by_transport(transport_writing,
&stream->writing); &stream->writing);

@ -83,7 +83,8 @@ int grpc_chttp2_unlocking_check_writes(
(according to available window sizes) and add to the output buffer */ (according to available window sizes) and add to the output buffer */
while (grpc_chttp2_list_pop_writable_stream( while (grpc_chttp2_list_pop_writable_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) { transport_global, transport_writing, &stream_global, &stream_writing)) {
uint8_t sent_initial_metadata; bool sent_initial_metadata = stream_writing->sent_initial_metadata;
bool become_writable = false;
stream_writing->id = stream_global->id; stream_writing->id = stream_global->id;
stream_writing->read_closed = stream_global->read_closed; stream_writing->read_closed = stream_global->read_closed;
@ -92,16 +93,12 @@ int grpc_chttp2_unlocking_check_writes(
outgoing_window, stream_global, outgoing_window, stream_global,
outgoing_window); outgoing_window);
sent_initial_metadata = stream_writing->sent_initial_metadata;
if (!sent_initial_metadata && stream_global->send_initial_metadata) { if (!sent_initial_metadata && stream_global->send_initial_metadata) {
stream_writing->send_initial_metadata = stream_writing->send_initial_metadata =
stream_global->send_initial_metadata; stream_global->send_initial_metadata;
stream_global->send_initial_metadata = NULL; stream_global->send_initial_metadata = NULL;
if (grpc_chttp2_list_add_writing_stream(transport_writing, become_writable = true;
stream_writing)) { sent_initial_metadata = true;
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
sent_initial_metadata = 1;
} }
if (sent_initial_metadata) { if (sent_initial_metadata) {
if (stream_global->send_message != NULL) { if (stream_global->send_message != NULL) {
@ -128,10 +125,7 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->flow_controlled_buffer.length > 0) && stream_writing->flow_controlled_buffer.length > 0) &&
stream_writing->outgoing_window > 0) { stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) { if (transport_writing->outgoing_window > 0) {
if (grpc_chttp2_list_add_writing_stream(transport_writing, become_writable = true;
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
} else { } else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing, grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing); stream_writing);
@ -141,10 +135,7 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->send_trailing_metadata = stream_writing->send_trailing_metadata =
stream_global->send_trailing_metadata; stream_global->send_trailing_metadata;
stream_global->send_trailing_metadata = NULL; stream_global->send_trailing_metadata = NULL;
if (grpc_chttp2_list_add_writing_stream(transport_writing, become_writable = true;
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
} }
} }
@ -153,10 +144,13 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing, GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
announce_window, stream_global, announce_window, stream_global,
unannounced_incoming_window_for_writing); unannounced_incoming_window_for_writing);
if (grpc_chttp2_list_add_writing_stream(transport_writing, become_writable = true;
stream_writing)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
} }
if (become_writable) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
} }
} }
@ -310,10 +304,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
(stream_writing->send_message && !stream_writing->fetching)) && (stream_writing->send_message && !stream_writing->fetching)) &&
stream_writing->outgoing_window > 0) { stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) { if (transport_writing->outgoing_window > 0) {
if (grpc_chttp2_list_add_writing_stream(transport_writing, grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
stream_writing)) {
/* do nothing - already reffed */
}
} else { } else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing); stream_writing);

@ -142,7 +142,7 @@ static void incoming_byte_stream_update_flow_control(
static void fail_pending_writes(grpc_exec_ctx *exec_ctx, static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);
/* /*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING * CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/ */
@ -521,7 +521,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.id) == NULL); s->global.id) == NULL);
} }
grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
&s->global); &s->global);
grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global); grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global);
@ -583,7 +582,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
return &accepting->parsing; return &accepting->parsing;
} }
/* /*******************************************************************************
* LOCK MANAGEMENT * LOCK MANAGEMENT
*/ */
@ -611,10 +610,18 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_END("unlock", 0); GPR_TIMER_END("unlock", 0);
} }
/* /*******************************************************************************
* OUTPUT PROCESSING * OUTPUT PROCESSING
*/ */
void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed &&
grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) {
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
}
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
uint32_t value) { uint32_t value) {
const grpc_chttp2_setting_parameters *sp = const grpc_chttp2_setting_parameters *sp =
@ -732,7 +739,7 @@ static void maybe_start_some_streams(
stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->id, STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = 1; stream_global->in_stream_map = 1;
transport_global->concurrent_stream_count++; transport_global->concurrent_stream_count++;
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); grpc_chttp2_become_writable(transport_global, stream_global);
} }
/* cancel out streams that will never be started */ /* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@ -821,7 +828,7 @@ static void perform_stream_op_locked(
maybe_start_some_streams(exec_ctx, transport_global); maybe_start_some_streams(exec_ctx, transport_global);
} else { } else {
GPR_ASSERT(stream_global->id != 0); GPR_ASSERT(stream_global->id != 0);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); grpc_chttp2_become_writable(transport_global, stream_global);
} }
} else { } else {
grpc_chttp2_complete_closure_step( grpc_chttp2_complete_closure_step(
@ -838,7 +845,7 @@ static void perform_stream_op_locked(
exec_ctx, &stream_global->send_message_finished, 0); exec_ctx, &stream_global->send_message_finished, 0);
} else if (stream_global->id != 0) { } else if (stream_global->id != 0) {
stream_global->send_message = op->send_message; stream_global->send_message = op->send_message;
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); grpc_chttp2_become_writable(transport_global, stream_global);
} }
} }
@ -858,7 +865,7 @@ static void perform_stream_op_locked(
} else if (stream_global->id != 0) { } else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding /* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */ bytes before going writable */
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); grpc_chttp2_become_writable(transport_global, stream_global);
} }
} }
@ -999,7 +1006,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
} }
} }
/* /*******************************************************************************
* INPUT PROCESSING * INPUT PROCESSING
*/ */
@ -1064,7 +1071,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (!s) { if (!s) {
s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id); s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
} }
grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
GPR_ASSERT(s); GPR_ASSERT(s);
s->global.in_stream_map = 0; s->global.in_stream_map = 0;
if (t->parsing.incoming_stream == &s->parsing) { if (t->parsing.incoming_stream == &s->parsing) {
@ -1080,6 +1086,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(exec_ctx, t); close_transport_locked(exec_ctx, t);
} }
if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
}
new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
grpc_chttp2_stream_map_size(&t->new_stream_map); grpc_chttp2_stream_map_size(&t->new_stream_map);
@ -1331,7 +1340,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
is_zero = stream_global->outgoing_window <= 0; is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) { if (was_zero && !is_zero) {
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); grpc_chttp2_become_writable(transport_global, stream_global);
} }
} }
@ -1426,7 +1435,7 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
GPR_TIMER_END("recv_data", 0); GPR_TIMER_END("recv_data", 0);
} }
/* /*******************************************************************************
* CALLBACK LOOP * CALLBACK LOOP
*/ */
@ -1440,7 +1449,7 @@ static void connectivity_state_set(
state, reason); state, reason);
} }
/* /*******************************************************************************
* POLLSET STUFF * POLLSET STUFF
*/ */
@ -1468,7 +1477,7 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
unlock(exec_ctx, t); unlock(exec_ctx, t);
} }
/* /*******************************************************************************
* BYTE STREAM * BYTE STREAM
*/ */
@ -1508,7 +1517,7 @@ static void incoming_byte_stream_update_flow_control(
add_max_recv_bytes); add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global); stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global); grpc_chttp2_become_writable(transport_global, stream_global);
} }
} }
@ -1623,7 +1632,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
return incoming_byte_stream; return incoming_byte_stream;
} }
/* /*******************************************************************************
* TRACING * TRACING
*/ */
@ -1709,7 +1718,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
gpr_free(prefix); gpr_free(prefix);
} }
/* /*******************************************************************************
* INTEGRATION GLUE * INTEGRATION GLUE
*/ */

@ -43,11 +43,13 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include "src/core/profiling/timers.h" #include "src/core/profiling/timers.h"
#include "src/core/support/murmur_hash.h" #include "src/core/support/murmur_hash.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
#include "src/core/transport/chttp2/bin_encoder.h" #include "src/core/transport/chttp2/bin_encoder.h"
#include "src/core/transport/static_metadata.h" #include "src/core/transport/static_metadata.h"
#include "src/core/iomgr/iomgr_internal.h"
/* There are two kinds of mdelem and mdstr instances. /* There are two kinds of mdelem and mdstr instances.
* Static instances are declared in static_metadata.{h,c} and * Static instances are declared in static_metadata.{h,c} and
@ -227,6 +229,9 @@ void grpc_mdctx_global_shutdown(void) {
if (shard->count != 0) { if (shard->count != 0) {
gpr_log(GPR_DEBUG, "WARNING: %d metadata elements were leaked", gpr_log(GPR_DEBUG, "WARNING: %d metadata elements were leaked",
shard->count); shard->count);
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
} }
gpr_free(shard->elems); gpr_free(shard->elems);
} }
@ -237,6 +242,9 @@ void grpc_mdctx_global_shutdown(void) {
if (shard->count != 0) { if (shard->count != 0) {
gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked", gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked",
shard->count); shard->count);
if (grpc_iomgr_abort_on_leaks()) {
abort();
}
} }
gpr_free(shard->strs); gpr_free(shard->strs);
} }

@ -45,7 +45,7 @@ void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
#else #else
void grpc_stream_ref(grpc_stream_refcount *refcount) { void grpc_stream_ref(grpc_stream_refcount *refcount) {
#endif #endif
gpr_ref(&refcount->refs); gpr_ref_non_zero(&refcount->refs);
} }
#ifdef GRPC_STREAM_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG

@ -220,6 +220,7 @@ gpr_event_get_type gpr_event_get_import;
gpr_event_wait_type gpr_event_wait_import; gpr_event_wait_type gpr_event_wait_import;
gpr_ref_init_type gpr_ref_init_import; gpr_ref_init_type gpr_ref_init_import;
gpr_ref_type gpr_ref_import; gpr_ref_type gpr_ref_import;
gpr_ref_non_zero_type gpr_ref_non_zero_import;
gpr_refn_type gpr_refn_import; gpr_refn_type gpr_refn_import;
gpr_unref_type gpr_unref_import; gpr_unref_type gpr_unref_import;
gpr_stats_init_type gpr_stats_init_import; gpr_stats_init_type gpr_stats_init_import;
@ -485,6 +486,7 @@ void pygrpc_load_imports(HMODULE library) {
gpr_event_wait_import = (gpr_event_wait_type) GetProcAddress(library, "gpr_event_wait"); gpr_event_wait_import = (gpr_event_wait_type) GetProcAddress(library, "gpr_event_wait");
gpr_ref_init_import = (gpr_ref_init_type) GetProcAddress(library, "gpr_ref_init"); gpr_ref_init_import = (gpr_ref_init_type) GetProcAddress(library, "gpr_ref_init");
gpr_ref_import = (gpr_ref_type) GetProcAddress(library, "gpr_ref"); gpr_ref_import = (gpr_ref_type) GetProcAddress(library, "gpr_ref");
gpr_ref_non_zero_import = (gpr_ref_non_zero_type) GetProcAddress(library, "gpr_ref_non_zero");
gpr_refn_import = (gpr_refn_type) GetProcAddress(library, "gpr_refn"); gpr_refn_import = (gpr_refn_type) GetProcAddress(library, "gpr_refn");
gpr_unref_import = (gpr_unref_type) GetProcAddress(library, "gpr_unref"); gpr_unref_import = (gpr_unref_type) GetProcAddress(library, "gpr_unref");
gpr_stats_init_import = (gpr_stats_init_type) GetProcAddress(library, "gpr_stats_init"); gpr_stats_init_import = (gpr_stats_init_type) GetProcAddress(library, "gpr_stats_init");

@ -610,6 +610,9 @@ extern gpr_ref_init_type gpr_ref_init_import;
typedef void(*gpr_ref_type)(gpr_refcount *r); typedef void(*gpr_ref_type)(gpr_refcount *r);
extern gpr_ref_type gpr_ref_import; extern gpr_ref_type gpr_ref_import;
#define gpr_ref gpr_ref_import #define gpr_ref gpr_ref_import
typedef void(*gpr_ref_non_zero_type)(gpr_refcount *r);
extern gpr_ref_non_zero_type gpr_ref_non_zero_import;
#define gpr_ref_non_zero gpr_ref_non_zero_import
typedef void(*gpr_refn_type)(gpr_refcount *r, int n); typedef void(*gpr_refn_type)(gpr_refcount *r, int n);
extern gpr_refn_type gpr_refn_import; extern gpr_refn_type gpr_refn_import;
#define gpr_refn gpr_refn_import #define gpr_refn gpr_refn_import

@ -220,6 +220,7 @@ gpr_event_get_type gpr_event_get_import;
gpr_event_wait_type gpr_event_wait_import; gpr_event_wait_type gpr_event_wait_import;
gpr_ref_init_type gpr_ref_init_import; gpr_ref_init_type gpr_ref_init_import;
gpr_ref_type gpr_ref_import; gpr_ref_type gpr_ref_import;
gpr_ref_non_zero_type gpr_ref_non_zero_import;
gpr_refn_type gpr_refn_import; gpr_refn_type gpr_refn_import;
gpr_unref_type gpr_unref_import; gpr_unref_type gpr_unref_import;
gpr_stats_init_type gpr_stats_init_import; gpr_stats_init_type gpr_stats_init_import;
@ -481,6 +482,7 @@ void grpc_rb_load_imports(HMODULE library) {
gpr_event_wait_import = (gpr_event_wait_type) GetProcAddress(library, "gpr_event_wait"); gpr_event_wait_import = (gpr_event_wait_type) GetProcAddress(library, "gpr_event_wait");
gpr_ref_init_import = (gpr_ref_init_type) GetProcAddress(library, "gpr_ref_init"); gpr_ref_init_import = (gpr_ref_init_type) GetProcAddress(library, "gpr_ref_init");
gpr_ref_import = (gpr_ref_type) GetProcAddress(library, "gpr_ref"); gpr_ref_import = (gpr_ref_type) GetProcAddress(library, "gpr_ref");
gpr_ref_non_zero_import = (gpr_ref_non_zero_type) GetProcAddress(library, "gpr_ref_non_zero");
gpr_refn_import = (gpr_refn_type) GetProcAddress(library, "gpr_refn"); gpr_refn_import = (gpr_refn_type) GetProcAddress(library, "gpr_refn");
gpr_unref_import = (gpr_unref_type) GetProcAddress(library, "gpr_unref"); gpr_unref_import = (gpr_unref_type) GetProcAddress(library, "gpr_unref");
gpr_stats_init_import = (gpr_stats_init_type) GetProcAddress(library, "gpr_stats_init"); gpr_stats_init_import = (gpr_stats_init_type) GetProcAddress(library, "gpr_stats_init");

@ -610,6 +610,9 @@ extern gpr_ref_init_type gpr_ref_init_import;
typedef void(*gpr_ref_type)(gpr_refcount *r); typedef void(*gpr_ref_type)(gpr_refcount *r);
extern gpr_ref_type gpr_ref_import; extern gpr_ref_type gpr_ref_import;
#define gpr_ref gpr_ref_import #define gpr_ref gpr_ref_import
typedef void(*gpr_ref_non_zero_type)(gpr_refcount *r);
extern gpr_ref_non_zero_type gpr_ref_non_zero_import;
#define gpr_ref_non_zero gpr_ref_non_zero_import
typedef void(*gpr_refn_type)(gpr_refcount *r, int n); typedef void(*gpr_refn_type)(gpr_refcount *r, int n);
extern gpr_refn_type gpr_refn_import; extern gpr_refn_type gpr_refn_import;
#define gpr_refn gpr_refn_import #define gpr_refn gpr_refn_import

@ -1,6 +1,6 @@
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without

Loading…
Cancel
Save