Use Mutex and MutexLock instead of gpr_mu

pull/20331/head
Yash Tibrewal 5 years ago
parent ac82cc422f
commit 0d2558d05f
  1. 469
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 16
      src/core/ext/transport/chttp2/transport/hpack_parser.cc
  3. 2
      src/core/ext/transport/chttp2/transport/internal.h

@ -212,7 +212,6 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
GRPC_ERROR_UNREF(closed_with_error); GRPC_ERROR_UNREF(closed_with_error);
gpr_free(ping_acks); gpr_free(ping_acks);
gpr_free(peer_string); gpr_free(peer_string);
gpr_mu_destroy(&mu);
} }
static const grpc_transport_vtable* get_vtable(void); static const grpc_transport_vtable* get_vtable(void);
@ -473,7 +472,6 @@ grpc_chttp2_transport::grpc_chttp2_transport(
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
base.vtable = get_vtable(); base.vtable = get_vtable();
gpr_mu_init(&mu);
/* 8 is a random stab in the dark as to a good initial size: it's small enough /* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet that it shouldn't waste memory for infrequently used connections, yet
large enough that the exponential growth should happen nicely when it's large enough that the exponential growth should happen nicely when it's
@ -554,13 +552,14 @@ grpc_chttp2_transport::grpc_chttp2_transport(
static void destroy_transport(grpc_transport* gt) { static void destroy_transport(grpc_transport* gt) {
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
gpr_mu_lock(&t->mu); {
t->destroying = 1; grpc_core::MutexLock lock(&t->mu);
close_transport_locked( t->destroying = 1;
t, grpc_error_set_int( close_transport_locked(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), t, grpc_error_set_int(
GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
gpr_mu_unlock(&t->mu); GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
}
// Must be the last line. // Must be the last line.
GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
} }
@ -681,58 +680,60 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
} }
grpc_chttp2_stream::~grpc_chttp2_stream() { grpc_chttp2_stream::~grpc_chttp2_stream() {
gpr_mu_lock(&t->mu); {
if (t->channelz_socket != nullptr) { grpc_core::MutexLock lock(&t->mu);
if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { if (t->channelz_socket != nullptr) {
t->channelz_socket->RecordStreamSucceeded(); if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
} else { t->channelz_socket->RecordStreamSucceeded();
t->channelz_socket->RecordStreamFailed(); } else {
t->channelz_socket->RecordStreamFailed();
}
} }
}
GPR_ASSERT((write_closed && read_closed) || id == 0); GPR_ASSERT((write_closed && read_closed) || id == 0);
if (id != 0) { if (id != 0) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
} }
grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
grpc_slice_buffer_destroy_internal(&frame_storage); grpc_slice_buffer_destroy_internal(&frame_storage);
if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { if (stream_compression_method !=
grpc_slice_buffer_destroy_internal(&compressed_data_buffer); GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
} grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
if (stream_decompression_method != }
GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { if (stream_decompression_method !=
grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
} grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
}
grpc_chttp2_list_remove_stalled_by_transport(t, this); grpc_chttp2_list_remove_stalled_by_transport(t, this);
grpc_chttp2_list_remove_stalled_by_stream(t, this); grpc_chttp2_list_remove_stalled_by_stream(t, this);
for (int i = 0; i < STREAM_LIST_COUNT; i++) { for (int i = 0; i < STREAM_LIST_COUNT; i++) {
if (GPR_UNLIKELY(included[i])) { if (GPR_UNLIKELY(included[i])) {
gpr_log(GPR_ERROR, "%s stream %d still included in list %d", gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
t->is_client ? "client" : "server", id, i); t->is_client ? "client" : "server", id, i);
abort(); abort();
}
} }
}
GPR_ASSERT(send_initial_metadata_finished == nullptr); GPR_ASSERT(send_initial_metadata_finished == nullptr);
GPR_ASSERT(fetching_send_message == nullptr); GPR_ASSERT(fetching_send_message == nullptr);
GPR_ASSERT(send_trailing_metadata_finished == nullptr); GPR_ASSERT(send_trailing_metadata_finished == nullptr);
GPR_ASSERT(recv_initial_metadata_ready == nullptr); GPR_ASSERT(recv_initial_metadata_ready == nullptr);
GPR_ASSERT(recv_message_ready == nullptr); GPR_ASSERT(recv_message_ready == nullptr);
GPR_ASSERT(recv_trailing_metadata_finished == nullptr); GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
GRPC_ERROR_UNREF(read_closed_error); GRPC_ERROR_UNREF(read_closed_error);
GRPC_ERROR_UNREF(write_closed_error); GRPC_ERROR_UNREF(write_closed_error);
GRPC_ERROR_UNREF(byte_stream_error); GRPC_ERROR_UNREF(byte_stream_error);
flow_control.Destroy(); flow_control.Destroy();
if (t->resource_user != nullptr) { if (t->resource_user != nullptr) {
grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
}
} }
gpr_mu_unlock(&t->mu);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE);
} }
@ -1010,7 +1011,7 @@ static const char* begin_writing_desc(bool partial, bool inlined) {
static void write_action_begin(void* gt, grpc_error* error_ignored) { static void write_action_begin(void* gt, grpc_error* error_ignored) {
GPR_TIMER_SCOPE("write_action_begin", 0); GPR_TIMER_SCOPE("write_action_begin", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
gpr_mu_lock(&t->mu); grpc_core::ReleasableMutexLock lock(&t->mu);
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_chttp2_begin_write_result r; grpc_chttp2_begin_write_result r;
if (t->closed_with_error != GRPC_ERROR_NONE) { if (t->closed_with_error != GRPC_ERROR_NONE) {
@ -1051,11 +1052,11 @@ static void write_action_begin(void* gt, grpc_error* error_ignored) {
t->reading_paused_on_pending_induced_frames = false; t->reading_paused_on_pending_induced_frames = false;
continue_read_action_locked(t); continue_read_action_locked(t);
} }
gpr_mu_unlock(&t->mu); lock.Unlock();
} else { } else {
GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
gpr_mu_unlock(&t->mu); lock.Unlock();
GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
} }
} }
@ -1076,54 +1077,56 @@ static void write_action(void* gt, grpc_error* error) {
static void write_action_end(void* tp, grpc_error* error) { static void write_action_end(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
gpr_mu_lock(&t->mu); {
bool closed = false; grpc_core::MutexLock lock(&t->mu);
if (error != GRPC_ERROR_NONE) { bool closed = false;
close_transport_locked(t, GRPC_ERROR_REF(error)); if (error != GRPC_ERROR_NONE) {
closed = true; close_transport_locked(t, GRPC_ERROR_REF(error));
} closed = true;
if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
closed = true;
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
close_transport_locked(
t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
} }
}
switch (t->write_state) { if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
case GRPC_CHTTP2_WRITE_STATE_IDLE: t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
GPR_UNREACHABLE_CODE(break); closed = true;
case GRPC_CHTTP2_WRITE_STATE_WRITING: if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
GPR_TIMER_MARK("state=writing", 0); close_transport_locked(
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
t->is_first_write_in_batch = false;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
// If the transport is closed, we will retry writing on the endpoint
// and next write may contain part of the currently serialized frames.
// So, we should only call the run_after_write callbacks when the next
// write finishes, or the callbacks will be invoked when the stream is
// closed.
if (!closed) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
} }
// TODO(yashykt) : When we were using combiners, we were using the finally }
// version, so that the write action would happen after we were done
// queueing up all the writes that we wanted. Maybe do something similar? switch (t->write_state) {
GRPC_CLOSURE_SCHED( case GRPC_CHTTP2_WRITE_STATE_IDLE:
GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, GPR_UNREACHABLE_CODE(break);
grpc_schedule_on_exec_ctx), case GRPC_CHTTP2_WRITE_STATE_WRITING:
GRPC_ERROR_NONE); GPR_TIMER_MARK("state=writing", 0);
break; set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
} break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
t->is_first_write_in_batch = false;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
// If the transport is closed, we will retry writing on the endpoint
// and next write may contain part of the currently serialized frames.
// So, we should only call the run_after_write callbacks when the next
// write finishes, or the callbacks will be invoked when the stream is
// closed.
if (!closed) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
}
// TODO(yashykt) : When we were using combiners, we were using the
// finally version, so that the write action would happen after we were
// done queueing up all the writes that we wanted. Maybe do something
// similar?
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
break;
}
grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
gpr_mu_unlock(&t->mu); }
GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
} }
@ -1384,7 +1387,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
static void complete_fetch(void* gs, grpc_error* error) { static void complete_fetch(void* gs, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs); grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
grpc_chttp2_transport* t = s->t; grpc_chttp2_transport* t = s->t;
gpr_mu_lock(&t->mu); grpc_core::MutexLock lock(&t->mu);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
error = s->fetching_send_message->Pull(&s->fetching_slice); error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
@ -1396,7 +1399,6 @@ static void complete_fetch(void* gs, grpc_error* error) {
s->fetching_send_message.reset(); s->fetching_send_message.reset();
grpc_chttp2_cancel_stream(t, s, error); grpc_chttp2_cancel_stream(t, s, error);
} }
gpr_mu_unlock(&t->mu);
} }
static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
@ -1438,8 +1440,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
gpr_free(str); gpr_free(str);
} }
gpr_mu_lock(&t->mu); grpc_core::MutexLock lock(&t->mu);
GRPC_STATS_INC_HTTP2_OP_BATCHES(); GRPC_STATS_INC_HTTP2_OP_BATCHES();
s->context = op_payload->context; s->context = op_payload->context;
@ -1719,7 +1720,6 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
"op->on_complete"); "op->on_complete");
} }
gpr_mu_unlock(&t->mu);
} }
static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
@ -1783,12 +1783,13 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
static void retry_initiate_ping(void* tp, grpc_error* error) { static void retry_initiate_ping(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
gpr_mu_lock(&t->mu); {
t->ping_state.is_delayed_ping_timer_set = false; grpc_core::MutexLock lock(&t->mu);
if (error == GRPC_ERROR_NONE) { t->ping_state.is_delayed_ping_timer_set = false;
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); if (error == GRPC_ERROR_NONE) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
}
} }
gpr_mu_unlock(&t->mu);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping");
} }
@ -1845,8 +1846,7 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg);
gpr_free(msg); gpr_free(msg);
} }
op->handler_private.extra_arg = gt; grpc_core::MutexLock lock(&t->mu);
gpr_mu_lock(&t->mu);
if (op->goaway_error) { if (op->goaway_error) {
send_goaway(t, op->goaway_error); send_goaway(t, op->goaway_error);
} }
@ -1881,7 +1881,6 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
} }
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
gpr_mu_unlock(&t->mu);
} }
/******************************************************************************* /*******************************************************************************
@ -2526,7 +2525,7 @@ static void read_action(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("reading_action_locked", 0); GPR_TIMER_SCOPE("reading_action_locked", 0);
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
gpr_mu_lock(&t->mu); grpc_core::ReleasableMutexLock lock(&t->mu);
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
grpc_error* err = error; grpc_error* err = error;
@ -2610,9 +2609,9 @@ static void read_action(void* tp, grpc_error* error) {
} else { } else {
continue_read_action_locked(t); continue_read_action_locked(t);
} }
gpr_mu_unlock(&t->mu); lock.Unlock();
} else { } else {
gpr_mu_unlock(&t->mu); lock.Unlock();
GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
} }
@ -2652,13 +2651,13 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) {
static void finish_bdp_ping(void* tp, grpc_error* error) { static void finish_bdp_ping(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
gpr_mu_lock(&t->mu); grpc_core::ReleasableMutexLock lock(&t->mu);
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string, gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
grpc_error_string(error)); grpc_error_string(error));
} }
if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
gpr_mu_unlock(&t->mu); lock.Unlock();
GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
return; return;
} }
@ -2669,21 +2668,19 @@ static void finish_bdp_ping(void* tp, grpc_error* error) {
t->have_next_bdp_ping_timer = true; t->have_next_bdp_ping_timer = true;
grpc_timer_init(&t->next_bdp_ping_timer, next_ping, grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
&t->next_bdp_ping_timer_expired); &t->next_bdp_ping_timer_expired);
gpr_mu_unlock(&t->mu);
} }
static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) { static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
gpr_mu_lock(&t->mu); grpc_core::ReleasableMutexLock lock(&t->mu);
GPR_ASSERT(t->have_next_bdp_ping_timer); GPR_ASSERT(t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = false; t->have_next_bdp_ping_timer = false;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
gpr_mu_unlock(&t->mu); lock.Unlock();
GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
return; return;
} }
schedule_bdp_ping_locked(t); schedule_bdp_ping_locked(t);
gpr_mu_unlock(&t->mu);
} }
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
@ -2755,32 +2752,34 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
static void init_keepalive_ping(void* arg, grpc_error* error) { static void init_keepalive_ping(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
gpr_mu_lock(&t->mu); {
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); grpc_core::MutexLock lock(&t->mu);
if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
} else if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
if (t->keepalive_permit_without_calls || } else if (error == GRPC_ERROR_NONE) {
grpc_chttp2_stream_map_size(&t->stream_map) > 0) { if (t->keepalive_permit_without_calls ||
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
grpc_timer_init_unset(&t->keepalive_watchdog_timer); GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
send_keepalive_ping_locked(t); grpc_timer_init_unset(&t->keepalive_watchdog_timer);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); send_keepalive_ping_locked(t);
} else { grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
} else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping);
}
} else if (error == GRPC_ERROR_CANCELLED) {
/* The keepalive ping timer may be cancelled by bdp */
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(&t->keepalive_ping_timer, grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping); &t->init_keepalive_ping);
} }
} else if (error == GRPC_ERROR_CANCELLED) {
/* The keepalive ping timer may be cancelled by bdp */
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping);
} }
gpr_mu_unlock(&t->mu);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
} }
@ -2805,47 +2804,49 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
static void finish_keepalive_ping(void* arg, grpc_error* error) { static void finish_keepalive_ping(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
gpr_mu_lock(&t->mu); {
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { grpc_core::MutexLock lock(&t->mu);
if (error == GRPC_ERROR_NONE) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string);
}
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
grpc_timer_cancel(&t->keepalive_watchdog_timer);
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping);
} }
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
grpc_timer_cancel(&t->keepalive_watchdog_timer);
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping);
} }
} }
gpr_mu_unlock(&t->mu);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
} }
static void keepalive_watchdog_fired(void* arg, grpc_error* error) { static void keepalive_watchdog_fired(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
gpr_mu_lock(&t->mu); {
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { grpc_core::MutexLock lock(&t->mu);
if (error == GRPC_ERROR_NONE) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", if (error == GRPC_ERROR_NONE) {
t->peer_string); gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.",
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; t->peer_string);
close_transport_locked( t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( close_transport_locked(
"keepalive watchdog timeout"), t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
GRPC_ERROR_INT_GRPC_STATUS, "keepalive watchdog timeout"),
GRPC_STATUS_UNAVAILABLE)); GRPC_ERROR_INT_GRPC_STATUS,
} GRPC_STATUS_UNAVAILABLE));
} else { }
/* The watchdog timer should have been cancelled by } else {
* finish_keepalive_ping. */ /* The watchdog timer should have been cancelled by
if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { * finish_keepalive_ping. */
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
}
} }
} }
gpr_mu_unlock(&t->mu);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
} }
@ -2884,7 +2885,7 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
static void reset_byte_stream(void* arg, grpc_error* error) { static void reset_byte_stream(void* arg, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg); grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
gpr_mu_lock(&s->t->mu); grpc_core::MutexLock lock(&s->t->mu);
s->pending_byte_stream = false; s->pending_byte_stream = false;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
grpc_chttp2_maybe_complete_recv_message(s->t, s); grpc_chttp2_maybe_complete_recv_message(s->t, s);
@ -2898,7 +2899,6 @@ static void reset_byte_stream(void* arg, grpc_error* error) {
grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error)); grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
s->byte_stream_error = GRPC_ERROR_REF(error); s->byte_stream_error = GRPC_ERROR_REF(error);
} }
gpr_mu_unlock(&s->t->mu);
} }
namespace grpc_core { namespace grpc_core {
@ -2917,13 +2917,13 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream(
void Chttp2IncomingByteStream::Orphan() { void Chttp2IncomingByteStream::Orphan() {
GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
grpc_chttp2_transport* t = stream_->t; grpc_chttp2_stream* s = stream_;
gpr_mu_lock(&t->mu); grpc_chttp2_transport* t = s->t;
grpc_core::MutexLock lock(&t->mu);
Unref(); Unref();
stream_->pending_byte_stream = false; s->pending_byte_stream = false;
grpc_chttp2_maybe_complete_recv_message(t, stream_); grpc_chttp2_maybe_complete_recv_message(t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, stream_); grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
gpr_mu_unlock(&t->mu);
} }
// TODO(yashykt) : Merge this with Next // TODO(yashykt) : Merge this with Next
@ -2931,7 +2931,7 @@ void Chttp2IncomingByteStream::NextLocked(void* arg,
grpc_error* error_ignored) { grpc_error* error_ignored) {
Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg); Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
grpc_chttp2_transport* t = bs->transport_; grpc_chttp2_transport* t = bs->transport_;
gpr_mu_lock(&t->mu); grpc_core::MutexLock lock(&t->mu);
grpc_chttp2_stream* s = bs->stream_; grpc_chttp2_stream* s = bs->stream_;
size_t cur_length = s->frame_storage.length; size_t cur_length = s->frame_storage.length;
if (!s->read_closed) { if (!s->read_closed) {
@ -2970,7 +2970,6 @@ void Chttp2IncomingByteStream::NextLocked(void* arg,
s->on_next = bs->next_action_.on_complete; s->on_next = bs->next_action_.on_complete;
} }
bs->Unref(); bs->Unref();
gpr_mu_unlock(&t->mu);
} }
bool Chttp2IncomingByteStream::Next(size_t max_size_hint, bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
@ -3113,65 +3112,67 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
static void benign_reclaimer(void* arg, grpc_error* error) { static void benign_reclaimer(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
gpr_mu_lock(&t->mu); {
if (error == GRPC_ERROR_NONE && grpc_core::MutexLock lock(&t->mu);
grpc_chttp2_stream_map_size(&t->stream_map) == 0) { if (error == GRPC_ERROR_NONE &&
/* Channel with no active streams: send a goaway to try and make it grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
* disconnect cleanly */ /* Channel with no active streams: send a goaway to try and make it
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { * disconnect cleanly */
gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
t->peer_string); gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
t->peer_string);
}
send_goaway(
t, grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
} else if (error == GRPC_ERROR_NONE &&
GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO,
"HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
" streams",
t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
}
t->benign_reclaimer_registered = false;
if (error != GRPC_ERROR_CANCELLED) {
grpc_resource_user_finish_reclamation(
grpc_endpoint_get_resource_user(t->ep));
} }
send_goaway(t, }
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
} else if (error == GRPC_ERROR_NONE &&
GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
gpr_log(GPR_INFO,
"HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
" streams",
t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
}
t->benign_reclaimer_registered = false;
if (error != GRPC_ERROR_CANCELLED) {
grpc_resource_user_finish_reclamation(
grpc_endpoint_get_resource_user(t->ep));
}
gpr_mu_unlock(&t->mu);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
} }
static void destructive_reclaimer(void* arg, grpc_error* error) { static void destructive_reclaimer(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
gpr_mu_lock(&t->mu); {
size_t n = grpc_chttp2_stream_map_size(&t->stream_map); grpc_core::MutexLock lock(&t->mu);
t->destructive_reclaimer_registered = false; size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
if (error == GRPC_ERROR_NONE && n > 0) { t->destructive_reclaimer_registered = false;
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>( if (error == GRPC_ERROR_NONE && n > 0) {
grpc_chttp2_stream_map_rand(&t->stream_map)); grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { grpc_chttp2_stream_map_rand(&t->stream_map));
gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string, if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
s->id); gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
} s->id);
grpc_chttp2_cancel_stream( }
t, s, grpc_chttp2_cancel_stream(
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), t, s,
GRPC_ERROR_INT_HTTP2_ERROR, grpc_error_set_int(
GRPC_HTTP2_ENHANCE_YOUR_CALM)); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
if (n > 1) { GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
/* Since we cancel one stream per destructive reclamation, if if (n > 1) {
there are more streams left, we can immediately post a new /* Since we cancel one stream per destructive reclamation, if
reclaimer in case the resource quota needs to free more there are more streams left, we can immediately post a new
memory */ reclaimer in case the resource quota needs to free more
post_destructive_reclaimer(t); memory */
} post_destructive_reclaimer(t);
} }
if (error != GRPC_ERROR_CANCELLED) { }
grpc_resource_user_finish_reclamation( if (error != GRPC_ERROR_CANCELLED) {
grpc_endpoint_get_resource_user(t->ep)); grpc_resource_user_finish_reclamation(
} grpc_endpoint_get_resource_user(t->ep));
gpr_mu_unlock(&t->mu); }
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
} }

@ -1669,14 +1669,16 @@ static const maybe_complete_func_type maybe_complete_funcs[] = {
static void force_client_rst_stream(void* sp, grpc_error* error) { static void force_client_rst_stream(void* sp, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
grpc_chttp2_transport* t = s->t; grpc_chttp2_transport* t = s->t;
gpr_mu_lock(&t->mu); {
if (!s->write_closed) { grpc_core::MutexLock lock(&t->mu);
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, if (!s->write_closed) {
&s->stats.outgoing); grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); &s->stats.outgoing);
grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE);
}
} }
gpr_mu_unlock(&t->mu);
GRPC_CHTTP2_STREAM_UNREF(s, "final_rst"); GRPC_CHTTP2_STREAM_UNREF(s, "final_rst");
} }

@ -293,7 +293,7 @@ struct grpc_chttp2_transport {
~grpc_chttp2_transport(); ~grpc_chttp2_transport();
grpc_transport base; /* must be first */ grpc_transport base; /* must be first */
gpr_mu mu; grpc_core::Mutex mu;
grpc_core::RefCount refs; grpc_core::RefCount refs;
grpc_endpoint* ep; grpc_endpoint* ep;
char* peer_string; char* peer_string;

Loading…
Cancel
Save