Revert "Chttp2combiner"

pull/20435/head
Jan Tattermusch 5 years ago committed by GitHub
parent 7eb9262936
commit c5b5840707
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 401
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 13
      src/core/ext/transport/chttp2/transport/hpack_parser.cc
  3. 31
      src/core/ext/transport/chttp2/transport/internal.h
  4. 4
      src/core/ext/transport/chttp2/transport/writing.cc
  5. 39
      src/core/lib/iomgr/closure.h
  6. 4
      src/core/lib/iomgr/executor.cc

@ -102,14 +102,14 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
"chttp2_refcount"); "chttp2_refcount");
/* forward declarations of various callbacks that we'll build closures around */ /* forward declarations of various callbacks that we'll build closures around */
static void write_action_begin(void* t, grpc_error* error); static void write_action_begin_locked(void* t, grpc_error* error);
static void write_action(void* t, grpc_error* error); static void write_action(void* t, grpc_error* error);
static void write_action_end(void* t, grpc_error* error); static void write_action_end_locked(void* t, grpc_error* error);
static void read_action(void* t, grpc_error* error); static void read_action_locked(void* t, grpc_error* error);
static void continue_read_action_locked(grpc_chttp2_transport* t); static void continue_read_action_locked(grpc_chttp2_transport* t);
static void complete_fetch(void* gs, grpc_error* error); static void complete_fetch_locked(void* gs, grpc_error* error);
/** Set a transport level setting, and push it to our peer */ /** Set a transport level setting, and push it to our peer */
static void queue_setting_update(grpc_chttp2_transport* t, static void queue_setting_update(grpc_chttp2_transport* t,
grpc_chttp2_setting_id id, uint32_t value); grpc_chttp2_setting_id id, uint32_t value);
@ -124,8 +124,8 @@ static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state, grpc_connectivity_state state,
const char* reason); const char* reason);
static void benign_reclaimer(void* t, grpc_error* error); static void benign_reclaimer_locked(void* t, grpc_error* error);
static void destructive_reclaimer(void* t, grpc_error* error); static void destructive_reclaimer_locked(void* t, grpc_error* error);
static void post_benign_reclaimer(grpc_chttp2_transport* t); static void post_benign_reclaimer(grpc_chttp2_transport* t);
static void post_destructive_reclaimer(grpc_chttp2_transport* t); static void post_destructive_reclaimer(grpc_chttp2_transport* t);
@ -135,20 +135,20 @@ static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
static void schedule_bdp_ping_locked(grpc_chttp2_transport* t); static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
static void start_bdp_ping_locked(void* tp, grpc_error* error); static void start_bdp_ping_locked(void* tp, grpc_error* error);
static void finish_bdp_ping(void* tp, grpc_error* error); static void finish_bdp_ping_locked(void* tp, grpc_error* error);
static void next_bdp_ping_timer_expired(void* tp, grpc_error* error); static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error); static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
static void send_ping_locked(grpc_chttp2_transport* t, static void send_ping_locked(grpc_chttp2_transport* t,
grpc_closure* on_initiate, grpc_closure* on_initiate,
grpc_closure* on_complete); grpc_closure* on_complete);
static void retry_initiate_ping(void* tp, grpc_error* error); static void retry_initiate_ping_locked(void* tp, grpc_error* error);
/** keepalive-relevant functions */ /** keepalive-relevant functions */
static void init_keepalive_ping(void* arg, grpc_error* error); static void init_keepalive_ping_locked(void* arg, grpc_error* error);
static void start_keepalive_ping_locked(void* arg, grpc_error* error); static void start_keepalive_ping_locked(void* arg, grpc_error* error);
static void finish_keepalive_ping(void* arg, grpc_error* error); static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
static void keepalive_watchdog_fired(void* arg, grpc_error* error); static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
static void reset_byte_stream(void* arg, grpc_error* error); static void reset_byte_stream(void* arg, grpc_error* error);
@ -197,6 +197,8 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
grpc_chttp2_stream_map_destroy(&stream_map); grpc_chttp2_stream_map_destroy(&stream_map);
GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
cancel_pings(this, cancel_pings(this,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
@ -390,27 +392,33 @@ static bool read_channel_args(grpc_chttp2_transport* t,
} }
static void init_transport_closures(grpc_chttp2_transport* t) { static void init_transport_closures(grpc_chttp2_transport* t) {
GRPC_CLOSURE_INIT(&t->read_action, read_action, t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
GRPC_CLOSURE_INIT(&t->benign_reclaimer, benign_reclaimer, t, grpc_combiner_scheduler(t->combiner));
grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
GRPC_CLOSURE_INIT(&t->destructive_reclaimer, destructive_reclaimer, t, grpc_combiner_scheduler(t->combiner));
grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
GRPC_CLOSURE_INIT(&t->retry_initiate_ping, retry_initiate_ping, t, destructive_reclaimer_locked, t,
grpc_schedule_on_exec_ctx); grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
t, grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
grpc_schedule_on_exec_ctx); grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->finish_bdp_ping, finish_bdp_ping, t, GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
grpc_schedule_on_exec_ctx); grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired, GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx); next_bdp_ping_timer_expired_locked, t,
GRPC_CLOSURE_INIT(&t->init_keepalive_ping, init_keepalive_ping, t, grpc_combiner_scheduler(t->combiner));
grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
t, grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
start_keepalive_ping_locked, t, grpc_schedule_on_exec_ctx); start_keepalive_ping_locked, t,
GRPC_CLOSURE_INIT(&t->finish_keepalive_ping, finish_keepalive_ping, t, grpc_combiner_scheduler(t->combiner));
grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired, keepalive_watchdog_fired, t, finish_keepalive_ping_locked, t,
grpc_schedule_on_exec_ctx); grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
} }
static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
@ -450,7 +458,7 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(&t->keepalive_ping_timer, grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping); &t->init_keepalive_ping_locked);
} else { } else {
/* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
inflight keeaplive timers */ inflight keeaplive timers */
@ -465,6 +473,7 @@ grpc_chttp2_transport::grpc_chttp2_transport(
ep(ep), ep(ep),
peer_string(grpc_endpoint_get_peer(ep)), peer_string(grpc_endpoint_get_peer(ep)),
resource_user(resource_user), resource_user(resource_user),
combiner(grpc_combiner_create()),
state_tracker(is_client ? "client_transport" : "server_transport", state_tracker(is_client ? "client_transport" : "server_transport",
GRPC_CHANNEL_READY), GRPC_CHANNEL_READY),
is_client(is_client), is_client(is_client),
@ -548,20 +557,24 @@ grpc_chttp2_transport::grpc_chttp2_transport(
post_benign_reclaimer(this); post_benign_reclaimer(this);
} }
static void destroy_transport(grpc_transport* gt) { static void destroy_transport_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
{
grpc_core::MutexLock lock(&t->mu);
t->destroying = 1; t->destroying = 1;
close_transport_locked( close_transport_locked(
t, grpc_error_set_int( t, grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
}
// Must be the last line. // Must be the last line.
GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
} }
static void destroy_transport(grpc_transport* gt) {
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
}
static void close_transport_locked(grpc_chttp2_transport* t, static void close_transport_locked(grpc_chttp2_transport* t,
grpc_error* error) { grpc_error* error) {
end_all_the_calls(t, GRPC_ERROR_REF(error)); end_all_the_calls(t, GRPC_ERROR_REF(error));
@ -671,15 +684,13 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
grpc_slice_buffer_init(&flow_controlled_buffer); grpc_slice_buffer_init(&flow_controlled_buffer);
GRPC_CLOSURE_INIT(&complete_fetch, ::complete_fetch, this, GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this,
grpc_schedule_on_exec_ctx); grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this,
grpc_schedule_on_exec_ctx); grpc_combiner_scheduler(t->combiner));
} }
grpc_chttp2_stream::~grpc_chttp2_stream() { grpc_chttp2_stream::~grpc_chttp2_stream() {
{
grpc_core::MutexLock lock(&t->mu);
if (t->channelz_socket != nullptr) { if (t->channelz_socket != nullptr) {
if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
t->channelz_socket->RecordStreamSucceeded(); t->channelz_socket->RecordStreamSucceeded();
@ -695,8 +706,7 @@ grpc_chttp2_stream::~grpc_chttp2_stream() {
grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
grpc_slice_buffer_destroy_internal(&frame_storage); grpc_slice_buffer_destroy_internal(&frame_storage);
if (stream_compression_method != if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
grpc_slice_buffer_destroy_internal(&compressed_data_buffer); grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
} }
if (stream_decompression_method != if (stream_decompression_method !=
@ -731,7 +741,7 @@ grpc_chttp2_stream::~grpc_chttp2_stream() {
if (t->resource_user != nullptr) { if (t->resource_user != nullptr) {
grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
} }
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE);
} }
@ -745,9 +755,16 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
return 0; return 0;
} }
static void destroy_stream_locked(void* sp, grpc_error* error) {
GPR_TIMER_SCOPE("destroy_stream", 0);
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
s->~grpc_chttp2_stream();
}
static void destroy_stream(grpc_transport* gt, grpc_stream* gs, static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
grpc_closure* then_schedule_closure) { grpc_closure* then_schedule_closure) {
GPR_TIMER_SCOPE("destroy_stream", 0); GPR_TIMER_SCOPE("destroy_stream", 0);
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs); grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
if (s->stream_compression_method != if (s->stream_compression_method !=
GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS && GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
@ -763,7 +780,10 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
} }
s->destroy_stream_arg = then_schedule_closure; s->destroy_stream_arg = then_schedule_closure;
s->~grpc_chttp2_stream(); GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
} }
grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
@ -908,29 +928,26 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
grpc_chttp2_initiate_write_reason_string(reason)); grpc_chttp2_initiate_write_reason_string(reason));
t->is_first_write_in_batch = true; t->is_first_write_in_batch = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
// TODO(yashykt) : When we were using combiners, we were using the finally /* Note that the 'write_action_begin_locked' closure is being scheduled
// version, so that the write action would happen after we were done
// queueing up all the writes that we wanted. Maybe do something similar?
// Keeping the earlier comment for posterity -
/* Note that the 'write_action_begin' closure is being scheduled
* on the 'finally_scheduler' of t->combiner. This means that * on the 'finally_scheduler' of t->combiner. This means that
* 'write_action_begin' is called only *after* all the other * 'write_action_begin_locked' is called only *after* all the other
* closures (some of which are potentially initiating more writes on the * closures (some of which are potentially initiating more writes on the
* transport) are executed on the t->combiner. * transport) are executed on the t->combiner.
* *
* The reason for scheduling on finally_scheduler is to make sure we batch * The reason for scheduling on finally_scheduler is to make sure we batch
* as many writes as possible. 'write_action_begin' is the function * as many writes as possible. 'write_action_begin_locked' is the function
* that gathers all the relevant bytes (which are at various places in the * that gathers all the relevant bytes (which are at various places in the
* grpc_chttp2_transport structure) and append them to 'outbuf' field in * grpc_chttp2_transport structure) and append them to 'outbuf' field in
* grpc_chttp2_transport thereby batching what would have been potentially * grpc_chttp2_transport thereby batching what would have been potentially
* multiple write operations. * multiple write operations.
* *
* Also, 'write_action_begin' only gathers the bytes into outbuf. * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
* It does not call the endpoint to write the bytes. That is done by the * It does not call the endpoint to write the bytes. That is done by the
* 'write_action' (which is scheduled by 'write_action_begin') */ * 'write_action' (which is scheduled by 'write_action_begin_locked') */
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
grpc_schedule_on_exec_ctx), write_action_begin_locked, t,
grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
break; break;
case GRPC_CHTTP2_WRITE_STATE_WRITING: case GRPC_CHTTP2_WRITE_STATE_WRITING:
@ -996,10 +1013,9 @@ static const char* begin_writing_desc(bool partial, bool inlined) {
GPR_UNREACHABLE_CODE(return "bad state tuple"); GPR_UNREACHABLE_CODE(return "bad state tuple");
} }
static void write_action_begin(void* gt, grpc_error* error_ignored) { static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
GPR_TIMER_SCOPE("write_action_begin", 0); GPR_TIMER_SCOPE("write_action_begin_locked", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
grpc_core::ReleasableMutexLock lock(&t->mu);
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_chttp2_begin_write_result r; grpc_chttp2_begin_write_result r;
if (t->closed_with_error != GRPC_ERROR_NONE) { if (t->closed_with_error != GRPC_ERROR_NONE) {
@ -1040,11 +1056,9 @@ static void write_action_begin(void* gt, grpc_error* error_ignored) {
t->reading_paused_on_pending_induced_frames = false; t->reading_paused_on_pending_induced_frames = false;
continue_read_action_locked(t); continue_read_action_locked(t);
} }
lock.Unlock();
} else { } else {
GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
lock.Unlock();
GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
} }
} }
@ -1054,19 +1068,19 @@ static void write_action(void* gt, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
void* cl = t->cl; void* cl = t->cl;
t->cl = nullptr; t->cl = nullptr;
grpc_endpoint_write(t->ep, &t->outbuf, grpc_endpoint_write(
GRPC_CLOSURE_INIT(&t->write_action_end, write_action_end, t->ep, &t->outbuf,
t, grpc_schedule_on_exec_ctx), GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
grpc_combiner_scheduler(t->combiner)),
cl); cl);
} }
/* Callback from the grpc_endpoint after bytes have been written by calling /* Callback from the grpc_endpoint after bytes have been written by calling
* sendmsg */ * sendmsg */
static void write_action_end(void* tp, grpc_error* error) { static void write_action_end_locked(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
{
grpc_core::MutexLock lock(&t->mu);
bool closed = false; bool closed = false;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
close_transport_locked(t, GRPC_ERROR_REF(error)); close_transport_locked(t, GRPC_ERROR_REF(error));
@ -1102,19 +1116,15 @@ static void write_action_end(void* tp, grpc_error* error) {
if (!closed) { if (!closed) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
} }
// TODO(yashykt) : When we were using combiners, we were using the GRPC_CLOSURE_RUN(
// finally version, so that the write action would happen after we were GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
// done queueing up all the writes that we wanted. Maybe do something write_action_begin_locked, t,
// similar? grpc_combiner_finally_scheduler(t->combiner)),
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
break; break;
} }
grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
} }
@ -1360,7 +1370,8 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
} }
s->fetching_send_message.reset(); s->fetching_send_message.reset();
return; /* early out */ return; /* early out */
} else if (s->fetching_send_message->Next(UINT32_MAX, &s->complete_fetch)) { } else if (s->fetching_send_message->Next(UINT32_MAX,
&s->complete_fetch_locked)) {
grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice); grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
s->fetching_send_message.reset(); s->fetching_send_message.reset();
@ -1372,10 +1383,9 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
} }
} }
static void complete_fetch(void* gs, grpc_error* error) { static void complete_fetch_locked(void* gs, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs); grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
grpc_chttp2_transport* t = s->t; grpc_chttp2_transport* t = s->t;
grpc_core::MutexLock lock(&t->mu);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
error = s->fetching_send_message->Pull(&s->fetching_slice); error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
@ -1402,40 +1412,24 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
} }
} }
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, static void perform_stream_op_locked(void* stream_op,
grpc_transport_stream_op_batch* op) { grpc_error* error_ignored) {
GPR_TIMER_SCOPE("perform_stream_op", 0); GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
grpc_transport_stream_op_batch_payload* op_payload = op->payload;
if (!t->is_client) {
if (op->send_initial_metadata) {
grpc_millis deadline =
op_payload->send_initial_metadata.send_initial_metadata->deadline;
GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
}
if (op->send_trailing_metadata) {
grpc_millis deadline =
op_payload->send_trailing_metadata.send_trailing_metadata->deadline;
GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { grpc_transport_stream_op_batch* op =
char* str = grpc_transport_stream_op_batch_string(op); static_cast<grpc_transport_stream_op_batch*>(stream_op);
gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str); grpc_chttp2_stream* s =
gpr_free(str); static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
} grpc_transport_stream_op_batch_payload* op_payload = op->payload;
grpc_chttp2_transport* t = s->t;
grpc_core::MutexLock lock(&t->mu);
GRPC_STATS_INC_HTTP2_OP_BATCHES(); GRPC_STATS_INC_HTTP2_OP_BATCHES();
s->context = op_payload->context; s->context = op->payload->context;
s->traced = op->is_traced; s->traced = op->is_traced;
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
char* str = grpc_transport_stream_op_batch_string(op); char* str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_INFO, "perform_stream_op: %s; on_complete = %p", str, gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete); op->on_complete);
gpr_free(str); gpr_free(str);
if (op->send_initial_metadata) { if (op->send_initial_metadata) {
@ -1708,6 +1702,41 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
"op->on_complete"); "op->on_complete");
} }
GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
}
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
GPR_TIMER_SCOPE("perform_stream_op", 0);
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
if (!t->is_client) {
if (op->send_initial_metadata) {
grpc_millis deadline =
op->payload->send_initial_metadata.send_initial_metadata->deadline;
GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
}
if (op->send_trailing_metadata) {
grpc_millis deadline =
op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
char* str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str);
gpr_free(str);
}
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
op->handler_private.extra_arg = gs;
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked,
op, grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
} }
static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
@ -1717,13 +1746,8 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
GPR_ASSERT(error != GRPC_ERROR_NONE); GPR_ASSERT(error != GRPC_ERROR_NONE);
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
if (j == GRPC_CHTTP2_PCL_INITIATE) {
GRPC_CLOSURE_LIST_RUN(&pq->lists[j]);
} else {
// TODO(yashykt) : Use GRPC_CLOSURE_LIST_RUN for this too.
GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]);
} }
}
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -1750,8 +1774,7 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
if (t->closed_with_error != GRPC_ERROR_NONE) { if (t->closed_with_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked,
GRPC_ERROR_REF(t->closed_with_error)); GRPC_ERROR_REF(t->closed_with_error));
// TODO(yashykt) : Change this to GRPC_CLOSURE_RUN too GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked,
GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping,
GRPC_ERROR_REF(t->closed_with_error)); GRPC_ERROR_REF(t->closed_with_error));
return; return;
} }
@ -1760,25 +1783,22 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
/* There is a ping in flight. Add yourself to the inflight closure list. */ /* There is a ping in flight. Add yourself to the inflight closure list. */
GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
&t->finish_keepalive_ping, GRPC_ERROR_NONE); &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
return; return;
} }
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE],
&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); &t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&t->finish_keepalive_ping, GRPC_ERROR_NONE); &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
} }
static void retry_initiate_ping(void* tp, grpc_error* error) { static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
{
grpc_core::MutexLock lock(&t->mu);
t->ping_state.is_delayed_ping_timer_set = false; t->ping_state.is_delayed_ping_timer_set = false;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
} }
} GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping");
} }
void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
@ -1789,7 +1809,6 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
gpr_free(from); gpr_free(from);
return; return;
} }
// TODO(yashkt) : Change this to GRPC_CLOSURE_LIST_RUN
GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
@ -1827,14 +1846,12 @@ void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
} }
} }
static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { static void perform_transport_op_locked(void* stream_op,
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); grpc_error* error_ignored) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
char* msg = grpc_transport_op_string(op); grpc_chttp2_transport* t =
gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
gpr_free(msg);
}
grpc_core::MutexLock lock(&t->mu);
if (op->goaway_error) { if (op->goaway_error) {
send_goaway(t, op->goaway_error); send_goaway(t, op->goaway_error);
} }
@ -1869,7 +1886,24 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
close_transport_locked(t, op->disconnect_with_error); close_transport_locked(t, op->disconnect_with_error);
} }
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
}
static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
char* msg = grpc_transport_op_string(op);
gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg);
gpr_free(msg);
}
op->handler_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure,
perform_transport_op_locked, op,
grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
} }
/******************************************************************************* /*******************************************************************************
@ -2510,11 +2544,11 @@ static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
return error; return error;
} }
static void read_action(void* tp, grpc_error* error) { static void read_action_locked(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("reading_action_locked", 0); GPR_TIMER_SCOPE("reading_action_locked", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
grpc_core::ReleasableMutexLock lock(&t->mu);
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
grpc_error* err = error; grpc_error* err = error;
@ -2598,9 +2632,7 @@ static void read_action(void* tp, grpc_error* error) {
} else { } else {
continue_read_action_locked(t); continue_read_action_locked(t);
} }
lock.Unlock();
} else { } else {
lock.Unlock();
GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
} }
@ -2609,7 +2641,7 @@ static void read_action(void* tp, grpc_error* error) {
static void continue_read_action_locked(grpc_chttp2_transport* t) { static void continue_read_action_locked(grpc_chttp2_transport* t) {
const bool urgent = t->goaway_error != GRPC_ERROR_NONE; const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action, urgent); grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr); grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
} }
@ -2617,13 +2649,11 @@ static void continue_read_action_locked(grpc_chttp2_transport* t) {
// that kicks off finishes, it's unreffed // that kicks off finishes, it's unreffed
static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
t->flow_control->bdp_estimator()->SchedulePing(); t->flow_control->bdp_estimator()->SchedulePing();
send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping); send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
} }
static void start_bdp_ping_locked(void* tp, grpc_error* error) { static void start_bdp_ping_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
// No need to take a lock. This closure will always be run while already
// holding the lock.
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string, gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
grpc_error_string(error)); grpc_error_string(error));
@ -2638,15 +2668,13 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) {
t->flow_control->bdp_estimator()->StartPing(); t->flow_control->bdp_estimator()->StartPing();
} }
static void finish_bdp_ping(void* tp, grpc_error* error) { static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
grpc_core::ReleasableMutexLock lock(&t->mu);
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string, gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
grpc_error_string(error)); grpc_error_string(error));
} }
if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
lock.Unlock();
GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
return; return;
} }
@ -2656,16 +2684,14 @@ static void finish_bdp_ping(void* tp, grpc_error* error) {
GPR_ASSERT(!t->have_next_bdp_ping_timer); GPR_ASSERT(!t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = true; t->have_next_bdp_ping_timer = true;
grpc_timer_init(&t->next_bdp_ping_timer, next_ping, grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
&t->next_bdp_ping_timer_expired); &t->next_bdp_ping_timer_expired_locked);
} }
static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) { static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
grpc_core::ReleasableMutexLock lock(&t->mu);
GPR_ASSERT(t->have_next_bdp_ping_timer); GPR_ASSERT(t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = false; t->have_next_bdp_ping_timer = false;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
lock.Unlock();
GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
return; return;
} }
@ -2739,10 +2765,8 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
} }
} }
static void init_keepalive_ping(void* arg, grpc_error* error) { static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
{
grpc_core::MutexLock lock(&t->mu);
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
@ -2753,21 +2777,19 @@ static void init_keepalive_ping(void* arg, grpc_error* error) {
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
grpc_timer_init_unset(&t->keepalive_watchdog_timer); grpc_timer_init_unset(&t->keepalive_watchdog_timer);
send_keepalive_ping_locked(t); send_keepalive_ping_locked(t);
grpc_chttp2_initiate_write(t, grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
} else { } else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(&t->keepalive_ping_timer, grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping); &t->init_keepalive_ping_locked);
} }
} else if (error == GRPC_ERROR_CANCELLED) { } else if (error == GRPC_ERROR_CANCELLED) {
/* The keepalive ping timer may be cancelled by bdp */ /* The keepalive ping timer may be cancelled by bdp */
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(&t->keepalive_ping_timer, grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping); &t->init_keepalive_ping_locked);
}
} }
GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
} }
@ -2777,8 +2799,6 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
return; return;
} }
// No need to take a lock. This closure will always be run while already
// holding the lock.
if (t->channelz_socket != nullptr) { if (t->channelz_socket != nullptr) {
t->channelz_socket->RecordKeepaliveSent(); t->channelz_socket->RecordKeepaliveSent();
} }
@ -2788,13 +2808,11 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
grpc_timer_init(&t->keepalive_watchdog_timer, grpc_timer_init(&t->keepalive_watchdog_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
&t->keepalive_watchdog_fired); &t->keepalive_watchdog_fired_locked);
} }
static void finish_keepalive_ping(void* arg, grpc_error* error) { static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
{
grpc_core::MutexLock lock(&t->mu);
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
@ -2805,17 +2823,14 @@ static void finish_keepalive_ping(void* arg, grpc_error* error) {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(&t->keepalive_ping_timer, grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping); &t->init_keepalive_ping_locked);
}
} }
} }
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
} }
static void keepalive_watchdog_fired(void* arg, grpc_error* error) { static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
{
grpc_core::MutexLock lock(&t->mu);
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.",
@ -2829,13 +2844,12 @@ static void keepalive_watchdog_fired(void* arg, grpc_error* error) {
} }
} else { } else {
/* The watchdog timer should have been cancelled by /* The watchdog timer should have been cancelled by
* finish_keepalive_ping. */ * finish_keepalive_ping_locked. */
if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
} }
} }
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
} }
@ -2873,7 +2887,6 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
static void reset_byte_stream(void* arg, grpc_error* error) { static void reset_byte_stream(void* arg, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg); grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
grpc_core::MutexLock lock(&s->t->mu);
s->pending_byte_stream = false; s->pending_byte_stream = false;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
grpc_chttp2_maybe_complete_recv_message(s->t, s); grpc_chttp2_maybe_complete_recv_message(s->t, s);
@ -2903,23 +2916,30 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream(
stream->byte_stream_error = GRPC_ERROR_NONE; stream->byte_stream_error = GRPC_ERROR_NONE;
} }
void Chttp2IncomingByteStream::Orphan() { void Chttp2IncomingByteStream::OrphanLocked(void* arg,
GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); grpc_error* error_ignored) {
grpc_chttp2_stream* s = stream_; Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
grpc_chttp2_stream* s = bs->stream_;
grpc_chttp2_transport* t = s->t; grpc_chttp2_transport* t = s->t;
grpc_core::MutexLock lock(&t->mu); bs->Unref();
Unref();
s->pending_byte_stream = false; s->pending_byte_stream = false;
grpc_chttp2_maybe_complete_recv_message(t, s); grpc_chttp2_maybe_complete_recv_message(t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
} }
// TODO(yashykt) : Merge this with Next void Chttp2IncomingByteStream::Orphan() {
GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&destroy_action_,
&Chttp2IncomingByteStream::OrphanLocked, this,
grpc_combiner_scheduler(transport_->combiner)),
GRPC_ERROR_NONE);
}
void Chttp2IncomingByteStream::NextLocked(void* arg, void Chttp2IncomingByteStream::NextLocked(void* arg,
grpc_error* error_ignored) { grpc_error* error_ignored) {
Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg); Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
grpc_chttp2_transport* t = bs->transport_; grpc_chttp2_transport* t = bs->transport_;
grpc_core::MutexLock lock(&t->mu);
grpc_chttp2_stream* s = bs->stream_; grpc_chttp2_stream* s = bs->stream_;
size_t cur_length = s->frame_storage.length; size_t cur_length = s->frame_storage.length;
if (!s->read_closed) { if (!s->read_closed) {
@ -2969,9 +2989,10 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
Ref(); Ref();
next_action_.max_size_hint = max_size_hint; next_action_.max_size_hint = max_size_hint;
next_action_.on_complete = on_complete; next_action_.on_complete = on_complete;
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&next_action_.closure, GRPC_CLOSURE_SCHED(
&Chttp2IncomingByteStream::NextLocked, GRPC_CLOSURE_INIT(&next_action_.closure,
this, grpc_schedule_on_exec_ctx), &Chttp2IncomingByteStream::NextLocked, this,
grpc_combiner_scheduler(transport_->combiner)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
return false; return false;
} }
@ -3085,7 +3106,7 @@ static void post_benign_reclaimer(grpc_chttp2_transport* t) {
t->benign_reclaimer_registered = true; t->benign_reclaimer_registered = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
false, &t->benign_reclaimer); false, &t->benign_reclaimer_locked);
} }
} }
@ -3094,14 +3115,12 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
t->destructive_reclaimer_registered = true; t->destructive_reclaimer_registered = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
true, &t->destructive_reclaimer); true, &t->destructive_reclaimer_locked);
} }
} }
static void benign_reclaimer(void* arg, grpc_error* error) { static void benign_reclaimer_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
{
grpc_core::MutexLock lock(&t->mu);
if (error == GRPC_ERROR_NONE && if (error == GRPC_ERROR_NONE &&
grpc_chttp2_stream_map_size(&t->stream_map) == 0) { grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
/* Channel with no active streams: send a goaway to try and make it /* Channel with no active streams: send a goaway to try and make it
@ -3110,8 +3129,8 @@ static void benign_reclaimer(void* arg, grpc_error* error) {
gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
t->peer_string); t->peer_string);
} }
send_goaway( send_goaway(t,
t, grpc_error_set_int( grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
} else if (error == GRPC_ERROR_NONE && } else if (error == GRPC_ERROR_NONE &&
@ -3126,14 +3145,11 @@ static void benign_reclaimer(void* arg, grpc_error* error) {
grpc_resource_user_finish_reclamation( grpc_resource_user_finish_reclamation(
grpc_endpoint_get_resource_user(t->ep)); grpc_endpoint_get_resource_user(t->ep));
} }
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
} }
static void destructive_reclaimer(void* arg, grpc_error* error) { static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
{
grpc_core::MutexLock lock(&t->mu);
size_t n = grpc_chttp2_stream_map_size(&t->stream_map); size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
t->destructive_reclaimer_registered = false; t->destructive_reclaimer_registered = false;
if (error == GRPC_ERROR_NONE && n > 0) { if (error == GRPC_ERROR_NONE && n > 0) {
@ -3145,9 +3161,9 @@ static void destructive_reclaimer(void* arg, grpc_error* error) {
} }
grpc_chttp2_cancel_stream( grpc_chttp2_cancel_stream(
t, s, t, s,
grpc_error_set_int( grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), GRPC_ERROR_INT_HTTP2_ERROR,
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); GRPC_HTTP2_ENHANCE_YOUR_CALM));
if (n > 1) { if (n > 1) {
/* Since we cancel one stream per destructive reclamation, if /* Since we cancel one stream per destructive reclamation, if
there are more streams left, we can immediately post a new there are more streams left, we can immediately post a new
@ -3160,7 +3176,6 @@ static void destructive_reclaimer(void* arg, grpc_error* error) {
grpc_resource_user_finish_reclamation( grpc_resource_user_finish_reclamation(
grpc_endpoint_get_resource_user(t->ep)); grpc_endpoint_get_resource_user(t->ep));
} }
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
} }
@ -3259,5 +3274,5 @@ void grpc_chttp2_transport_start_reading(
gpr_free(read_buffer); gpr_free(read_buffer);
} }
t->notify_on_receive_settings = notify_on_receive_settings; t->notify_on_receive_settings = notify_on_receive_settings;
GRPC_CLOSURE_SCHED(&t->read_action, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE);
} }

@ -1669,16 +1669,12 @@ static const maybe_complete_func_type maybe_complete_funcs[] = {
static void force_client_rst_stream(void* sp, grpc_error* error) { static void force_client_rst_stream(void* sp, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
grpc_chttp2_transport* t = s->t; grpc_chttp2_transport* t = s->t;
{
grpc_core::MutexLock lock(&t->mu);
if (!s->write_closed) { if (!s->write_closed) {
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing); &s->stats.outgoing);
grpc_chttp2_initiate_write(t, grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE);
} }
}
GRPC_CHTTP2_STREAM_UNREF(s, "final_rst"); GRPC_CHTTP2_STREAM_UNREF(s, "final_rst");
} }
@ -1744,11 +1740,10 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser,
the stream. Wait until the combiner lock is ready to be released the stream. Wait until the combiner lock is ready to be released
however -- it might be that we receive a RST_STREAM following this however -- it might be that we receive a RST_STREAM following this
and can avoid the extra write */ and can avoid the extra write */
// TODO(yashykt) : When we were using combiners, we were using the
// finally version. Maybe do something similar?
GRPC_CHTTP2_STREAM_REF(s, "final_rst"); GRPC_CHTTP2_STREAM_REF(s, "final_rst");
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(force_client_rst_stream, s, GRPC_CLOSURE_SCHED(
grpc_schedule_on_exec_ctx), GRPC_CLOSURE_CREATE(force_client_rst_stream, s,
grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE); grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);

@ -39,7 +39,6 @@
#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/compression/stream_compression.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
@ -254,6 +253,7 @@ class Chttp2IncomingByteStream : public ByteStream {
private: private:
static void NextLocked(void* arg, grpc_error* error_ignored); static void NextLocked(void* arg, grpc_error* error_ignored);
static void OrphanLocked(void* arg, grpc_error* error_ignored);
void MaybeCreateStreamDecompressionCtx(); void MaybeCreateStreamDecompressionCtx();
@ -275,6 +275,7 @@ class Chttp2IncomingByteStream : public ByteStream {
size_t max_size_hint; size_t max_size_hint;
grpc_closure* on_complete; grpc_closure* on_complete;
} next_action_; } next_action_;
grpc_closure destroy_action_;
}; };
} // namespace grpc_core } // namespace grpc_core
@ -293,13 +294,14 @@ struct grpc_chttp2_transport {
~grpc_chttp2_transport(); ~grpc_chttp2_transport();
grpc_transport base; /* must be first */ grpc_transport base; /* must be first */
grpc_core::Mutex mu;
grpc_core::RefCount refs; grpc_core::RefCount refs;
grpc_endpoint* ep; grpc_endpoint* ep;
char* peer_string; char* peer_string;
grpc_resource_user* resource_user; grpc_resource_user* resource_user;
grpc_combiner* combiner;
grpc_closure* notify_on_receive_settings = nullptr; grpc_closure* notify_on_receive_settings = nullptr;
/** write execution state of the transport */ /** write execution state of the transport */
@ -325,11 +327,11 @@ struct grpc_chttp2_transport {
/** maps stream id to grpc_chttp2_stream objects */ /** maps stream id to grpc_chttp2_stream objects */
grpc_chttp2_stream_map stream_map; grpc_chttp2_stream_map stream_map;
grpc_closure write_action_begin; grpc_closure write_action_begin_locked;
grpc_closure write_action; grpc_closure write_action;
grpc_closure write_action_end; grpc_closure write_action_end_locked;
grpc_closure read_action; grpc_closure read_action_locked;
/** incoming read bytes */ /** incoming read bytes */
grpc_slice_buffer read_buffer; grpc_slice_buffer read_buffer;
@ -390,7 +392,7 @@ struct grpc_chttp2_transport {
grpc_chttp2_repeated_ping_policy ping_policy; grpc_chttp2_repeated_ping_policy ping_policy;
grpc_chttp2_repeated_ping_state ping_state; grpc_chttp2_repeated_ping_state ping_state;
uint64_t ping_ctr = 0; /* unique id for pings */ uint64_t ping_ctr = 0; /* unique id for pings */
grpc_closure retry_initiate_ping; grpc_closure retry_initiate_ping_locked;
/** ping acks */ /** ping acks */
size_t ping_ack_count = 0; size_t ping_ack_count = 0;
@ -440,9 +442,9 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb* write_cb_pool = nullptr; grpc_chttp2_write_cb* write_cb_pool = nullptr;
/* bdp estimator */ /* bdp estimator */
grpc_closure next_bdp_ping_timer_expired; grpc_closure next_bdp_ping_timer_expired_locked;
grpc_closure start_bdp_ping_locked; grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping; grpc_closure finish_bdp_ping_locked;
/* if non-NULL, close the transport with this error when writes are finished /* if non-NULL, close the transport with this error when writes are finished
*/ */
@ -457,9 +459,9 @@ struct grpc_chttp2_transport {
/** have we scheduled a destructive cleanup? */ /** have we scheduled a destructive cleanup? */
bool destructive_reclaimer_registered = false; bool destructive_reclaimer_registered = false;
/** benign cleanup closure */ /** benign cleanup closure */
grpc_closure benign_reclaimer; grpc_closure benign_reclaimer_locked;
/** destructive cleanup closure */ /** destructive cleanup closure */
grpc_closure destructive_reclaimer; grpc_closure destructive_reclaimer_locked;
/* next bdp ping timer */ /* next bdp ping timer */
bool have_next_bdp_ping_timer = false; bool have_next_bdp_ping_timer = false;
@ -467,13 +469,13 @@ struct grpc_chttp2_transport {
/* keep-alive ping support */ /* keep-alive ping support */
/** Closure to initialize a keepalive ping */ /** Closure to initialize a keepalive ping */
grpc_closure init_keepalive_ping; grpc_closure init_keepalive_ping_locked;
/** Closure to run when the keepalive ping is sent */ /** Closure to run when the keepalive ping is sent */
grpc_closure start_keepalive_ping_locked; grpc_closure start_keepalive_ping_locked;
/** Cousure to run when the keepalive ping ack is received */ /** Cousure to run when the keepalive ping ack is received */
grpc_closure finish_keepalive_ping; grpc_closure finish_keepalive_ping_locked;
/** Closrue to run when the keepalive ping timeouts */ /** Closrue to run when the keepalive ping timeouts */
grpc_closure keepalive_watchdog_fired; grpc_closure keepalive_watchdog_fired_locked;
/** timer to initiate ping events */ /** timer to initiate ping events */
grpc_timer keepalive_ping_timer; grpc_timer keepalive_ping_timer;
/** watchdog to kill the transport when waiting for the keepalive ping */ /** watchdog to kill the transport when waiting for the keepalive ping */
@ -520,6 +522,7 @@ struct grpc_chttp2_stream {
explicit Reffer(grpc_chttp2_stream* s); explicit Reffer(grpc_chttp2_stream* s);
} reffer; } reffer;
grpc_closure destroy_stream;
grpc_closure* destroy_stream_arg; grpc_closure* destroy_stream_arg;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
@ -540,7 +543,7 @@ struct grpc_chttp2_stream {
int64_t next_message_end_offset; int64_t next_message_end_offset;
int64_t flow_controlled_bytes_written = 0; int64_t flow_controlled_bytes_written = 0;
int64_t flow_controlled_bytes_flowed = 0; int64_t flow_controlled_bytes_flowed = 0;
grpc_closure complete_fetch; grpc_closure complete_fetch_locked;
grpc_closure* fetching_send_message_finished = nullptr; grpc_closure* fetching_send_message_finished = nullptr;
grpc_metadata_batch* recv_initial_metadata; grpc_metadata_batch* recv_initial_metadata;

@ -97,14 +97,14 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
t->ping_state.is_delayed_ping_timer_set = true; t->ping_state.is_delayed_ping_timer_set = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked"); GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping, grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
&t->retry_initiate_ping); &t->retry_initiate_ping_locked);
} }
return; return;
} }
pq->inflight_id = t->ping_ctr; pq->inflight_id = t->ping_ctr;
t->ping_ctr++; t->ping_ctr++;
GRPC_CLOSURE_LIST_RUN(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_slice_buffer_add(&t->outbuf, grpc_slice_buffer_add(&t->outbuf,

@ -355,43 +355,4 @@ inline void grpc_closure_list_sched(grpc_closure_list* list) {
grpc_closure_list_sched(closure_list) grpc_closure_list_sched(closure_list)
#endif #endif
#ifndef NDEBUG
inline void grpc_closure_list_run(const char* file, int line,
grpc_closure_list* list) {
#else
inline void grpc_closure_list_run(grpc_closure_list* list) {
#endif
grpc_closure* c = list->head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
#ifndef NDEBUG
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
"previously scheduled at: [%s: %d] run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
c->line_initiated, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
c->file_initiated = file;
c->line_initiated = line;
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
c->scheduler->vtable->run(c, c->error_data.error);
c = next;
}
list->head = list->tail = nullptr;
}
/** Schedule all closures in a list to be run. Does not need to be run from a
* safe point. */
#ifndef NDEBUG
#define GRPC_CLOSURE_LIST_RUN(closure_list) \
grpc_closure_list_run(__FILE__, __LINE__, closure_list)
#else
#define GRPC_CLOSURE_LIST_RUN(closure_list) grpc_closure_list_run(closure_list)
#endif
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */

@ -465,10 +465,6 @@ void Executor::ShutdownAll() {
bool Executor::IsThreaded(ExecutorType executor_type) { bool Executor::IsThreaded(ExecutorType executor_type) {
GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS); GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS);
Executor* executor = executors[static_cast<size_t>(executor_type)];
if (executor == nullptr) {
return false;
}
return executors[static_cast<size_t>(executor_type)]->IsThreaded(); return executors[static_cast<size_t>(executor_type)]->IsThreaded();
} }

Loading…
Cancel
Save