Simplifications

pull/7793/head
Craig Tiller 8 years ago
parent 8e21465a76
commit 13e4bf8e6a
  1. 4
      src/core/ext/client_config/subchannel.c
  2. 51
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  3. 15
      src/core/ext/transport/chttp2/transport/internal.h
  4. 20
      src/core/ext/transport/chttp2/transport/stream_lists.c
  5. 2
      src/core/lib/iomgr/error.c
  6. 10
      src/core/lib/transport/transport.c
  7. 2
      src/core/lib/transport/transport.h

@ -219,8 +219,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, "SUBCHANNEL: %p %s 0x%08" PRIxPTR " -> 0x%08" PRIxPTR " [%s]", c,
old_val + delta, reason); purpose, old_val, old_val + delta, reason);
#endif #endif
return old_val; return old_val;
} }

@ -501,7 +501,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
REF_TRANSPORT(t, "stream"); REF_TRANSPORT(t, "stream");
if (server_data) { if (server_data) {
GPR_ASSERT(t->executor.parsing_active);
s->global.id = (uint32_t)(uintptr_t)server_data; s->global.id = (uint32_t)(uintptr_t)server_data;
s->global.outgoing_window = s->global.outgoing_window =
t->global.settings[GRPC_PEER_SETTINGS] t->global.settings[GRPC_PEER_SETTINGS]
@ -540,7 +539,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
exec_ctx, t, exec_ctx, t,
GRPC_ERROR_CREATE("Last stream closed after sending goaway")); GRPC_ERROR_CREATE("Last stream closed after sending goaway"));
} }
if (!t->executor.parsing_active && s->global.id) { if (s->global.id != 0) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
s->global.id) == NULL); s->global.id) == NULL);
} }
@ -1246,15 +1245,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = op->transport_private.args[0]; grpc_chttp2_transport *t = op->transport_private.args[0];
grpc_error *close_transport = op->disconnect_with_error; grpc_error *close_transport = op->disconnect_with_error;
/* If there's a set_accept_stream ensure that we're not parsing
to avoid changing things out from underneath */
if (t->executor.parsing_active && op->set_accept_stream) {
GPR_ASSERT(t->post_parsing_op == NULL);
t->post_parsing_op = gpr_malloc(sizeof(*op));
memcpy(t->post_parsing_op, op, sizeof(*op));
return;
}
if (op->on_connectivity_state_change != NULL) { if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_notify_on_state_change(
exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
@ -1627,18 +1617,12 @@ void grpc_chttp2_mark_stream_closed(
} }
} }
if (stream_global->read_closed && stream_global->write_closed) { if (stream_global->read_closed && stream_global->write_closed) {
if (stream_global->id != 0 && if (stream_global->id != 0) {
TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global->id,
stream_global); removal_error(GRPC_ERROR_REF(error), stream_global));
} else {
if (stream_global->id != 0) {
remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
stream_global->id,
removal_error(GRPC_ERROR_REF(error), stream_global));
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
} }
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -1874,9 +1858,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) { if (!t->closed) {
t->executor.parsing_active = 1;
/* merge stream lists */ /* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map, grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map); &t->parsing_stream_map);
@ -1919,27 +1901,6 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_initiate_write(exec_ctx, transport_global, false, grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"global incoming window"); "global incoming window");
} }
t->executor.parsing_active = 0;
/* handle delayed transport ops (if there is one) */
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to
* ensure we are not parsing before continuing the cancellation to keep
* things in a sane state */
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
&stream_global)) {
GPR_ASSERT(stream_global->in_stream_map);
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
remove_stream(exec_ctx, t, stream_global->id,
removal_error(GRPC_ERROR_NONE, stream_global));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
GPR_TIMER_END("post_parse_locked", 0); GPR_TIMER_END("post_parse_locked", 0);
} }

@ -64,7 +64,6 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_WRITING,
GRPC_CHTTP2_LIST_WRITTEN, GRPC_CHTTP2_LIST_WRITTEN,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
/* streams waiting for the outgoing window in the writing path, they will be /* streams waiting for the outgoing window in the writing path, they will be
@ -308,10 +307,6 @@ struct grpc_chttp2_transport {
struct { struct {
grpc_combiner *combiner; grpc_combiner *combiner;
/** is a thread currently in the global lock */
bool global_active;
/** is a thread currently parsing */
bool parsing_active;
/** write execution state of the transport */ /** write execution state of the transport */
grpc_chttp2_write_state write_state; grpc_chttp2_write_state write_state;
/** has a check_read_ops been scheduled */ /** has a check_read_ops been scheduled */
@ -374,9 +369,6 @@ struct grpc_chttp2_transport {
/** connectivity tracking */ /** connectivity tracking */
grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_tracker;
} channel_callback; } channel_callback;
/** Transport op to be applied post-parsing */
grpc_transport_op *post_parsing_op;
}; };
struct grpc_chttp2_stream_global { struct grpc_chttp2_stream_global {
@ -602,13 +594,6 @@ void grpc_chttp2_list_remove_stalled_by_transport(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_closed_waiting_for_writing( void grpc_chttp2_list_add_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);

@ -334,26 +334,6 @@ void grpc_chttp2_list_remove_stalled_by_transport(
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
} }
void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
}
int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
void grpc_chttp2_list_add_closed_waiting_for_writing( void grpc_chttp2_list_add_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) { grpc_chttp2_stream_global *stream_global) {

@ -265,7 +265,7 @@ static grpc_error *copy_error_and_unref(grpc_error *in) {
} else { } else {
out = gpr_malloc(sizeof(*out)); out = gpr_malloc(sizeof(*out));
#ifdef GRPC_ERROR_REFCOUNT_DEBUG #ifdef GRPC_ERROR_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "%p create copying", out); gpr_log(GPR_DEBUG, "%p create copying %p", out, in);
#endif #endif
out->ints = gpr_avl_ref(in->ints); out->ints = gpr_avl_ref(in->ints);
out->strs = gpr_avl_ref(in->strs); out->strs = gpr_avl_ref(in->strs);

@ -46,8 +46,9 @@
#ifdef GRPC_STREAM_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
gpr_log(GPR_DEBUG, "%s %p:%p REF %d->%d %s", refcount->object_type, gpr_log(GPR_DEBUG, "%s %p:%p REF %" PRIdPTR "->%" PRIdPTR " %s",
refcount, refcount->destroy.cb_arg, val, val + 1, reason); refcount->object_type, refcount, refcount->destroy.cb_arg, val,
val + 1, reason);
#else #else
void grpc_stream_ref(grpc_stream_refcount *refcount) { void grpc_stream_ref(grpc_stream_refcount *refcount) {
#endif #endif
@ -58,8 +59,9 @@ void grpc_stream_ref(grpc_stream_refcount *refcount) {
void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
const char *reason) { const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
gpr_log(GPR_DEBUG, "%s %p:%p UNREF %d->%d %s", refcount->object_type, gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s",
refcount, refcount->destroy.cb_arg, val, val - 1, reason); refcount->object_type, refcount, refcount->destroy.cb_arg, val,
val - 1, reason);
#else #else
void grpc_stream_unref(grpc_exec_ctx *exec_ctx, void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) { grpc_stream_refcount *refcount) {

@ -55,7 +55,7 @@ typedef struct grpc_transport grpc_transport;
for a stream. */ for a stream. */
typedef struct grpc_stream grpc_stream; typedef struct grpc_stream grpc_stream;
//#define GRPC_STREAM_REFCOUNT_DEBUG #define GRPC_STREAM_REFCOUNT_DEBUG
typedef struct grpc_stream_refcount { typedef struct grpc_stream_refcount {
gpr_refcount refs; gpr_refcount refs;

Loading…
Cancel
Save