diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 91b6a25d213..af9e085ce85 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -929,6 +929,10 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, grpc_chttp2_initiate_write_reason_string(reason)); t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + // TODO(yashykt) : When we were using combiners, we were using the finally + // version, so that the write action would happen after we were done + // queueing up all the writes that we wanted. Maybe do something similar? + // Keeping the earlier comment for posterity - /* Note that the 'write_action_begin_locked' closure is being scheduled * on the 'finally_scheduler' of t->combiner. This means that * 'write_action_begin_locked' is called only *after* all the other @@ -1119,6 +1123,9 @@ static void write_action_end_locked(void* tp, grpc_error* error) { if (!closed) { GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); } + // TODO(yashykt) : When we were using combiners, we were using the finally + // version, so that the write action would happen after we were done + // queueing up all the writes that we wanted. Maybe do something similar? GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, write_action_begin_locked, t, grpc_schedule_on_exec_ctx), @@ -1755,6 +1762,7 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { if (j == GRPC_CHTTP2_PCL_INITIATE) { GRPC_CLOSURE_LIST_RUN(&pq->lists[j]); } else { + // TODO(yashykt) : Use GRPC_CLOSURE_LIST_RUN for this too. GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); } } @@ -1784,6 +1792,7 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_REF(t->closed_with_error)); + // TODO(yashykt) : Change this to GRPC_CLOSURE_RUN too GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, GRPC_ERROR_REF(t->closed_with_error)); return; @@ -1821,6 +1830,7 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { gpr_free(from); return; } + // TODO(yashkt) : Change this to GRPC_CLOSURE_LIST_RUN GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); @@ -2667,8 +2677,8 @@ static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - // No need to take a lock. Closure scheduler will already have a lock. - // gpr_mu_lock(&t->mu); + // No need to take a lock. This closure will always be run while already + // holding the lock. if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string, grpc_error_string(error)); @@ -2681,7 +2691,6 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_timer_cancel(&t->keepalive_ping_timer); } t->flow_control->bdp_estimator()->StartPing(); - // gpr_mu_unlock(&t->mu); } static void finish_bdp_ping_locked(void* tp, grpc_error* error) { @@ -2823,8 +2832,8 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } - // No need to take a lock. - // gpr_mu_lock(&t->mu); + // No need to take a lock. This closure will always be run while already + // holding the lock. if (t->channelz_socket != nullptr) { t->channelz_socket->RecordKeepaliveSent(); } @@ -2835,7 +2844,6 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, &t->keepalive_watchdog_fired_locked); - // gpr_mu_unlock(&t->mu); } static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { @@ -2950,8 +2958,7 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream( stream->byte_stream_error = GRPC_ERROR_NONE; } -void Chttp2IncomingByteStream::OrphanLocked(void* arg, - grpc_error* error_ignored) { +void Chttp2IncomingByteStream::OrphanLocked(void* arg) { Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_stream* s = bs->stream_; grpc_chttp2_transport* t = s->t; @@ -2965,11 +2972,7 @@ void Chttp2IncomingByteStream::OrphanLocked(void* arg, void Chttp2IncomingByteStream::Orphan() { GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - OrphanLocked(this, GRPC_ERROR_NONE); - /*GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&destroy_action_, - &Chttp2IncomingByteStream::OrphanLocked, - this, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE);*/ + OrphanLocked(this); } void Chttp2IncomingByteStream::NextLocked(void* arg, diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index ab9f9e2c9eb..fe6a689ac29 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1742,6 +1742,8 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, the stream. Wait until the combiner lock is ready to be released however -- it might be that we receive a RST_STREAM following this and can avoid the extra write */ + // TODO(yashykt) : When we were using combiners, we were using the + // finally version. Maybe do something similar? GRPC_CHTTP2_STREAM_REF(s, "final_rst"); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(force_client_rst_stream, s, grpc_schedule_on_exec_ctx), diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 784fcf42d31..6f301ad670c 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -254,7 +254,7 @@ class Chttp2IncomingByteStream : public ByteStream { private: static void NextLocked(void* arg, grpc_error* error_ignored); - static void OrphanLocked(void* arg, grpc_error* error_ignored); + static void OrphanLocked(void* arg); void MaybeCreateStreamDecompressionCtx();