diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index 7795606e737..8ee7e29316c 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -58,8 +58,8 @@ void grpc_chttp2_server_handshaker_factory_create_handshakers( grpc_chttp2_server_handshaker_factory *handshaker_factory, grpc_handshake_manager *handshake_mgr) { if (handshaker_factory != NULL) { - handshaker_factory->vtable->create_handshakers( - exec_ctx, handshaker_factory, handshake_mgr); + handshaker_factory->vtable->create_handshakers(exec_ctx, handshaker_factory, + handshake_mgr); } } @@ -71,7 +71,6 @@ void grpc_chttp2_server_handshaker_factory_destroy( } } - typedef struct pending_handshake_manager_node { grpc_handshake_manager *handshake_mgr; struct pending_handshake_manager_node *next; @@ -196,9 +195,9 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, // args instead of hard-coding it. const gpr_timespec deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); - grpc_handshake_manager_do_handshake( - exec_ctx, connection_state->handshake_mgr, tcp, state->args, deadline, - acceptor, on_handshake_done, connection_state); + grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr, + tcp, state->args, deadline, acceptor, + on_handshake_done, connection_state); } /* Server callback: start listening on our ports */ @@ -275,9 +274,8 @@ grpc_error *grpc_chttp2_server_add_port( memset(state, 0, sizeof(*state)); grpc_closure_init(&state->tcp_server_shutdown_complete, tcp_server_shutdown_complete, state); - err = - grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete, - args, &tcp_server); + err = grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete, + args, &tcp_server); if (err != GRPC_ERROR_NONE) { goto error; } diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h index b1ff04bcbb8..30733992676 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.h +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -73,7 +73,6 @@ void grpc_chttp2_server_handshaker_factory_destroy( grpc_error *grpc_chttp2_server_add_port( grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, grpc_channel_args *args, - grpc_chttp2_server_handshaker_factory *handshaker_factory, - int *port_num); + grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 366312bd728..7e286d4e46d 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -45,7 +45,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { int port_num = 0; GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2, (server, addr)); - grpc_error* err = grpc_chttp2_server_add_port( + grpc_error *err = grpc_chttp2_server_add_port( &exec_ctx, server, addr, grpc_channel_args_copy(grpc_server_get_channel_args(server)), NULL /* handshaker_factory */, &port_num); diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 5f417281321..85c21f0ca21 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -64,7 +64,7 @@ static void server_security_handshaker_factory_create_handshakers( } static void server_security_handshaker_factory_destroy( - grpc_exec_ctx* exec_ctx, grpc_chttp2_server_handshaker_factory *hf) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf) { server_security_handshaker_factory *handshaker_factory = (server_security_handshaker_factory *)hf; GRPC_SECURITY_CONNECTOR_UNREF(&handshaker_factory->security_connector->base, @@ -106,8 +106,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, goto done; } // Create handshaker factory. - server_security_handshaker_factory* handshaker_factory = - gpr_malloc(sizeof(*handshaker_factory)); + server_security_handshaker_factory *handshaker_factory = + gpr_malloc(sizeof(*handshaker_factory)); memset(handshaker_factory, 0, sizeof(*handshaker_factory)); handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable; handshaker_factory->security_connector = sc; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 3e7c078d3ce..d4493009695 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -425,7 +425,6 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_writable_stream(t, &s)) { - grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); @@ -521,10 +520,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, } } - if (s->fail_pending_writes_on_writes_finished_error != NULL) { - GRPC_ERROR_UNREF(s->fail_pending_writes_on_writes_finished_error); - } - GPR_ASSERT(s->send_initial_metadata_finished == NULL); GPR_ASSERT(s->fetching_send_message == NULL); GPR_ASSERT(s->send_trailing_metadata_finished == NULL); @@ -826,6 +821,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, } #define CLOSURE_BARRIER_STATS_BIT (1 << 0) +#define CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE (1 << 1) #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) static grpc_closure *add_closure_barrier(grpc_closure *closure) { @@ -852,6 +848,16 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, return; } closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; + if (grpc_http_trace) { + const char *errstr = grpc_error_string(error); + gpr_log(GPR_DEBUG, + "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s", + closure, + (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT), + (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT), + desc, errstr); + grpc_error_free_string(errstr); + } if (error != GRPC_ERROR_NONE) { if (closure->error_data.error == GRPC_ERROR_NONE) { closure->error_data.error = @@ -868,7 +874,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, grpc_transport_move_stats(&s->stats, s->collecting_stats); s->collecting_stats = NULL; } - grpc_closure_run(exec_ctx, closure, closure->error_data.error); + if (t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE || + (closure->next_data.scratch & CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE) == + 0) { + grpc_closure_run(exec_ctx, closure, closure->error_data.error); + } else { + grpc_closure_list_append(&t->run_after_write, closure, + closure->error_data.error); + } } } @@ -1013,6 +1026,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (op->send_initial_metadata != NULL) { GPR_ASSERT(s->send_initial_metadata_finished == NULL); + on_complete->next_data.scratch |= CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE; s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata = op->send_initial_metadata; const size_t metadata_size = @@ -1066,6 +1080,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->send_message != NULL) { + on_complete->next_data.scratch |= CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { grpc_chttp2_complete_closure_step( @@ -1103,6 +1118,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (op->send_trailing_metadata != NULL) { GPR_ASSERT(s->send_trailing_metadata_finished == NULL); + on_complete->next_data.scratch |= CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE; s->send_trailing_metadata_finished = add_closure_barrier(on_complete); s->send_trailing_metadata = op->send_trailing_metadata; const size_t metadata_size = @@ -1406,7 +1422,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } if (grpc_chttp2_list_remove_writable_stream(t, s)) { - grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); } @@ -1537,41 +1552,9 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, return error; } -void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - if (s->need_fail_pending_writes_on_writes_finished) { - grpc_error *error = s->fail_pending_writes_on_writes_finished_error; - s->fail_pending_writes_on_writes_finished_error = NULL; - s->need_fail_pending_writes_on_writes_finished = false; - grpc_chttp2_fail_pending_writes(exec_ctx, t, s, error); - } -} - void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { - if (s->need_fail_pending_writes_on_writes_finished || - (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE && - (s->included[GRPC_CHTTP2_LIST_WRITABLE] || - s->included[GRPC_CHTTP2_LIST_WRITING]))) { - /* If a write is in progress, and it involves this stream, wait for the - * write to complete before cancelling things out. If we don't do this, then - * our combiner lock might think that some operation on its queue might be - * covering a completion even though there is none, in which case we might - * offload to another thread, which isn't guarateed to exist */ - if (error != GRPC_ERROR_NONE) { - if (s->fail_pending_writes_on_writes_finished_error == GRPC_ERROR_NONE) { - s->fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE( - "Post-poned fail writes due to in-progress write"); - } - s->fail_pending_writes_on_writes_finished_error = grpc_error_add_child( - s->fail_pending_writes_on_writes_finished_error, error); - } - s->need_fail_pending_writes_on_writes_finished = true; - return; /* early out */ - } - error = removal_error(error, s, "Pending writes failed due to stream closure"); s->send_initial_metadata = NULL; diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6cba1e7fd20..31eb1e01ac0 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -327,6 +327,9 @@ struct grpc_chttp2_transport { */ grpc_error *close_transport_on_writes_finished; + /* a list of closures to run after writes are finished */ + grpc_closure_list run_after_write; + /* buffer pool state */ /** have we scheduled a benign cleanup? */ bool benign_reclaimer_registered; @@ -409,9 +412,6 @@ struct grpc_chttp2_stream { grpc_error *read_closed_error; /** the error that resulted in this stream being write-closed */ grpc_error *write_closed_error; - /** should any writes be cleared once this stream becomes non-writable */ - bool need_fail_pending_writes_on_writes_finished; - grpc_error *fail_pending_writes_on_writes_finished_error; grpc_published_metadata_method published_metadata[2]; bool final_metadata_requested; @@ -692,9 +692,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s); -void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s); void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 769b229a0da..ddd75ee574f 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -208,7 +208,6 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing"); } } else { - grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write"); } } @@ -253,9 +252,9 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, GRPC_ERROR_REF(error)); } - grpc_chttp2_leave_writing_lists(exec_ctx, t, s); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end"); } + grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL); grpc_slice_buffer_reset_and_unref(&t->outbuf); GRPC_ERROR_UNREF(error); GPR_TIMER_END("grpc_chttp2_end_write", 0); diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 16392020f46..cc1840d36b2 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -231,6 +231,7 @@ static void rulist_remove(grpc_resource_user *resource_user, grpc_rulist list) { resource_user->links[list].prev; resource_user->links[list].prev->links[list].next = resource_user->links[list].next; + resource_user->links[list].next = resource_user->links[list].prev = NULL; } /******************************************************************************* @@ -703,6 +704,8 @@ static void ru_unref_by(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, gpr_atm amount) { GPR_ASSERT(amount > 0); gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount); + gpr_log(GPR_DEBUG, "%p unref_by %d, old=%d", resource_user, (int)amount, + (int)old); GPR_ASSERT(old >= amount); if (old == amount) { grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,