|
|
@ -929,6 +929,10 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, |
|
|
|
grpc_chttp2_initiate_write_reason_string(reason)); |
|
|
|
grpc_chttp2_initiate_write_reason_string(reason)); |
|
|
|
t->is_first_write_in_batch = true; |
|
|
|
t->is_first_write_in_batch = true; |
|
|
|
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); |
|
|
|
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); |
|
|
|
|
|
|
|
// TODO(yashykt) : When we were using combiners, we were using the finally
|
|
|
|
|
|
|
|
// version, so that the write action would happen after we were done
|
|
|
|
|
|
|
|
// queueing up all the writes that we wanted. Maybe do something similar?
|
|
|
|
|
|
|
|
// Keeping the earlier comment for posterity -
|
|
|
|
/* Note that the 'write_action_begin_locked' closure is being scheduled
|
|
|
|
/* Note that the 'write_action_begin_locked' closure is being scheduled
|
|
|
|
* on the 'finally_scheduler' of t->combiner. This means that |
|
|
|
* on the 'finally_scheduler' of t->combiner. This means that |
|
|
|
* 'write_action_begin_locked' is called only *after* all the other |
|
|
|
* 'write_action_begin_locked' is called only *after* all the other |
|
|
@ -1119,6 +1123,9 @@ static void write_action_end_locked(void* tp, grpc_error* error) { |
|
|
|
if (!closed) { |
|
|
|
if (!closed) { |
|
|
|
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); |
|
|
|
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TODO(yashykt) : When we were using combiners, we were using the finally
|
|
|
|
|
|
|
|
// version, so that the write action would happen after we were done
|
|
|
|
|
|
|
|
// queueing up all the writes that we wanted. Maybe do something similar?
|
|
|
|
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, |
|
|
|
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, |
|
|
|
write_action_begin_locked, t, |
|
|
|
write_action_begin_locked, t, |
|
|
|
grpc_schedule_on_exec_ctx), |
|
|
|
grpc_schedule_on_exec_ctx), |
|
|
@ -1755,6 +1762,7 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { |
|
|
|
if (j == GRPC_CHTTP2_PCL_INITIATE) { |
|
|
|
if (j == GRPC_CHTTP2_PCL_INITIATE) { |
|
|
|
GRPC_CLOSURE_LIST_RUN(&pq->lists[j]); |
|
|
|
GRPC_CLOSURE_LIST_RUN(&pq->lists[j]); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
|
|
|
|
// TODO(yashykt) : Use GRPC_CLOSURE_LIST_RUN for this too.
|
|
|
|
GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); |
|
|
|
GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -1784,6 +1792,7 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { |
|
|
|
if (t->closed_with_error != GRPC_ERROR_NONE) { |
|
|
|
if (t->closed_with_error != GRPC_ERROR_NONE) { |
|
|
|
GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, |
|
|
|
GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, |
|
|
|
GRPC_ERROR_REF(t->closed_with_error)); |
|
|
|
GRPC_ERROR_REF(t->closed_with_error)); |
|
|
|
|
|
|
|
// TODO(yashykt) : Change this to GRPC_CLOSURE_RUN too
|
|
|
|
GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, |
|
|
|
GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, |
|
|
|
GRPC_ERROR_REF(t->closed_with_error)); |
|
|
|
GRPC_ERROR_REF(t->closed_with_error)); |
|
|
|
return; |
|
|
|
return; |
|
|
@ -1821,6 +1830,7 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { |
|
|
|
gpr_free(from); |
|
|
|
gpr_free(from); |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TODO(yashkt) : Change this to GRPC_CLOSURE_LIST_RUN
|
|
|
|
GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); |
|
|
|
GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); |
|
|
|
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { |
|
|
|
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { |
|
|
|
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); |
|
|
|
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); |
|
|
@ -2667,8 +2677,8 @@ static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { |
|
|
|
|
|
|
|
|
|
|
|
static void start_bdp_ping_locked(void* tp, grpc_error* error) { |
|
|
|
static void start_bdp_ping_locked(void* tp, grpc_error* error) { |
|
|
|
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
|
|
|
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
|
|
|
// No need to take a lock. Closure scheduler will already have a lock.
|
|
|
|
// No need to take a lock. This closure will always be run while already
|
|
|
|
// gpr_mu_lock(&t->mu);
|
|
|
|
// holding the lock.
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { |
|
|
|
gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string, |
|
|
|
gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string, |
|
|
|
grpc_error_string(error)); |
|
|
|
grpc_error_string(error)); |
|
|
@ -2681,7 +2691,6 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { |
|
|
|
grpc_timer_cancel(&t->keepalive_ping_timer); |
|
|
|
grpc_timer_cancel(&t->keepalive_ping_timer); |
|
|
|
} |
|
|
|
} |
|
|
|
t->flow_control->bdp_estimator()->StartPing(); |
|
|
|
t->flow_control->bdp_estimator()->StartPing(); |
|
|
|
// gpr_mu_unlock(&t->mu);
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void finish_bdp_ping_locked(void* tp, grpc_error* error) { |
|
|
|
static void finish_bdp_ping_locked(void* tp, grpc_error* error) { |
|
|
@ -2823,8 +2832,8 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { |
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
// No need to take a lock.
|
|
|
|
// No need to take a lock. This closure will always be run while already
|
|
|
|
// gpr_mu_lock(&t->mu);
|
|
|
|
// holding the lock.
|
|
|
|
if (t->channelz_socket != nullptr) { |
|
|
|
if (t->channelz_socket != nullptr) { |
|
|
|
t->channelz_socket->RecordKeepaliveSent(); |
|
|
|
t->channelz_socket->RecordKeepaliveSent(); |
|
|
|
} |
|
|
|
} |
|
|
@ -2835,7 +2844,6 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { |
|
|
|
grpc_timer_init(&t->keepalive_watchdog_timer, |
|
|
|
grpc_timer_init(&t->keepalive_watchdog_timer, |
|
|
|
grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, |
|
|
|
grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, |
|
|
|
&t->keepalive_watchdog_fired_locked); |
|
|
|
&t->keepalive_watchdog_fired_locked); |
|
|
|
// gpr_mu_unlock(&t->mu);
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { |
|
|
|
static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { |
|
|
@ -2950,8 +2958,7 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream( |
|
|
|
stream->byte_stream_error = GRPC_ERROR_NONE; |
|
|
|
stream->byte_stream_error = GRPC_ERROR_NONE; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void Chttp2IncomingByteStream::OrphanLocked(void* arg, |
|
|
|
void Chttp2IncomingByteStream::OrphanLocked(void* arg) { |
|
|
|
grpc_error* error_ignored) { |
|
|
|
|
|
|
|
Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg); |
|
|
|
Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg); |
|
|
|
grpc_chttp2_stream* s = bs->stream_; |
|
|
|
grpc_chttp2_stream* s = bs->stream_; |
|
|
|
grpc_chttp2_transport* t = s->t; |
|
|
|
grpc_chttp2_transport* t = s->t; |
|
|
@ -2965,11 +2972,7 @@ void Chttp2IncomingByteStream::OrphanLocked(void* arg, |
|
|
|
|
|
|
|
|
|
|
|
void Chttp2IncomingByteStream::Orphan() { |
|
|
|
void Chttp2IncomingByteStream::Orphan() { |
|
|
|
GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); |
|
|
|
GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); |
|
|
|
OrphanLocked(this, GRPC_ERROR_NONE); |
|
|
|
OrphanLocked(this); |
|
|
|
/*GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&destroy_action_,
|
|
|
|
|
|
|
|
&Chttp2IncomingByteStream::OrphanLocked, |
|
|
|
|
|
|
|
this, grpc_schedule_on_exec_ctx), |
|
|
|
|
|
|
|
GRPC_ERROR_NONE);*/ |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void Chttp2IncomingByteStream::NextLocked(void* arg, |
|
|
|
void Chttp2IncomingByteStream::NextLocked(void* arg, |
|
|
|