Merge pull request #20331 from yashykt/chttp2combiner

Chttp2combiner
pull/20398/head
Yash Tibrewal 5 years ago committed by GitHub
commit c888722f1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 739
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 23
      src/core/ext/transport/chttp2/transport/hpack_parser.cc
  3. 31
      src/core/ext/transport/chttp2/transport/internal.h
  4. 4
      src/core/ext/transport/chttp2/transport/writing.cc
  5. 39
      src/core/lib/iomgr/closure.h
  6. 4
      src/core/lib/iomgr/executor.cc

File diff suppressed because it is too large Load Diff

@ -1669,11 +1669,15 @@ static const maybe_complete_func_type maybe_complete_funcs[] = {
static void force_client_rst_stream(void* sp, grpc_error* error) { static void force_client_rst_stream(void* sp, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
grpc_chttp2_transport* t = s->t; grpc_chttp2_transport* t = s->t;
if (!s->write_closed) { {
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, grpc_core::MutexLock lock(&t->mu);
&s->stats.outgoing); if (!s->write_closed) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); &s->stats.outgoing);
grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE);
}
} }
GRPC_CHTTP2_STREAM_UNREF(s, "final_rst"); GRPC_CHTTP2_STREAM_UNREF(s, "final_rst");
} }
@ -1740,11 +1744,12 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser,
the stream. Wait until the combiner lock is ready to be released the stream. Wait until the combiner lock is ready to be released
however -- it might be that we receive a RST_STREAM following this however -- it might be that we receive a RST_STREAM following this
and can avoid the extra write */ and can avoid the extra write */
// TODO(yashykt) : When we were using combiners, we were using the
// finally version. Maybe do something similar?
GRPC_CHTTP2_STREAM_REF(s, "final_rst"); GRPC_CHTTP2_STREAM_REF(s, "final_rst");
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(force_client_rst_stream, s,
GRPC_CLOSURE_CREATE(force_client_rst_stream, s, grpc_schedule_on_exec_ctx),
grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE);
GRPC_ERROR_NONE);
} }
grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE); grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);
} }

@ -39,6 +39,7 @@
#include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/channelz.h"
#include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/compression/stream_compression.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
@ -253,7 +254,6 @@ class Chttp2IncomingByteStream : public ByteStream {
private: private:
static void NextLocked(void* arg, grpc_error* error_ignored); static void NextLocked(void* arg, grpc_error* error_ignored);
static void OrphanLocked(void* arg, grpc_error* error_ignored);
void MaybeCreateStreamDecompressionCtx(); void MaybeCreateStreamDecompressionCtx();
@ -275,7 +275,6 @@ class Chttp2IncomingByteStream : public ByteStream {
size_t max_size_hint; size_t max_size_hint;
grpc_closure* on_complete; grpc_closure* on_complete;
} next_action_; } next_action_;
grpc_closure destroy_action_;
}; };
} // namespace grpc_core } // namespace grpc_core
@ -294,14 +293,13 @@ struct grpc_chttp2_transport {
~grpc_chttp2_transport(); ~grpc_chttp2_transport();
grpc_transport base; /* must be first */ grpc_transport base; /* must be first */
grpc_core::Mutex mu;
grpc_core::RefCount refs; grpc_core::RefCount refs;
grpc_endpoint* ep; grpc_endpoint* ep;
char* peer_string; char* peer_string;
grpc_resource_user* resource_user; grpc_resource_user* resource_user;
grpc_combiner* combiner;
grpc_closure* notify_on_receive_settings = nullptr; grpc_closure* notify_on_receive_settings = nullptr;
/** write execution state of the transport */ /** write execution state of the transport */
@ -327,11 +325,11 @@ struct grpc_chttp2_transport {
/** maps stream id to grpc_chttp2_stream objects */ /** maps stream id to grpc_chttp2_stream objects */
grpc_chttp2_stream_map stream_map; grpc_chttp2_stream_map stream_map;
grpc_closure write_action_begin_locked; grpc_closure write_action_begin;
grpc_closure write_action; grpc_closure write_action;
grpc_closure write_action_end_locked; grpc_closure write_action_end;
grpc_closure read_action_locked; grpc_closure read_action;
/** incoming read bytes */ /** incoming read bytes */
grpc_slice_buffer read_buffer; grpc_slice_buffer read_buffer;
@ -392,7 +390,7 @@ struct grpc_chttp2_transport {
grpc_chttp2_repeated_ping_policy ping_policy; grpc_chttp2_repeated_ping_policy ping_policy;
grpc_chttp2_repeated_ping_state ping_state; grpc_chttp2_repeated_ping_state ping_state;
uint64_t ping_ctr = 0; /* unique id for pings */ uint64_t ping_ctr = 0; /* unique id for pings */
grpc_closure retry_initiate_ping_locked; grpc_closure retry_initiate_ping;
/** ping acks */ /** ping acks */
size_t ping_ack_count = 0; size_t ping_ack_count = 0;
@ -442,9 +440,9 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb* write_cb_pool = nullptr; grpc_chttp2_write_cb* write_cb_pool = nullptr;
/* bdp estimator */ /* bdp estimator */
grpc_closure next_bdp_ping_timer_expired_locked; grpc_closure next_bdp_ping_timer_expired;
grpc_closure start_bdp_ping_locked; grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked; grpc_closure finish_bdp_ping;
/* if non-NULL, close the transport with this error when writes are finished /* if non-NULL, close the transport with this error when writes are finished
*/ */
@ -459,9 +457,9 @@ struct grpc_chttp2_transport {
/** have we scheduled a destructive cleanup? */ /** have we scheduled a destructive cleanup? */
bool destructive_reclaimer_registered = false; bool destructive_reclaimer_registered = false;
/** benign cleanup closure */ /** benign cleanup closure */
grpc_closure benign_reclaimer_locked; grpc_closure benign_reclaimer;
/** destructive cleanup closure */ /** destructive cleanup closure */
grpc_closure destructive_reclaimer_locked; grpc_closure destructive_reclaimer;
/* next bdp ping timer */ /* next bdp ping timer */
bool have_next_bdp_ping_timer = false; bool have_next_bdp_ping_timer = false;
@ -469,13 +467,13 @@ struct grpc_chttp2_transport {
/* keep-alive ping support */ /* keep-alive ping support */
/** Closure to initialize a keepalive ping */ /** Closure to initialize a keepalive ping */
grpc_closure init_keepalive_ping_locked; grpc_closure init_keepalive_ping;
/** Closure to run when the keepalive ping is sent */ /** Closure to run when the keepalive ping is sent */
grpc_closure start_keepalive_ping_locked; grpc_closure start_keepalive_ping_locked;
/** Cousure to run when the keepalive ping ack is received */ /** Cousure to run when the keepalive ping ack is received */
grpc_closure finish_keepalive_ping_locked; grpc_closure finish_keepalive_ping;
/** Closrue to run when the keepalive ping timeouts */ /** Closrue to run when the keepalive ping timeouts */
grpc_closure keepalive_watchdog_fired_locked; grpc_closure keepalive_watchdog_fired;
/** timer to initiate ping events */ /** timer to initiate ping events */
grpc_timer keepalive_ping_timer; grpc_timer keepalive_ping_timer;
/** watchdog to kill the transport when waiting for the keepalive ping */ /** watchdog to kill the transport when waiting for the keepalive ping */
@ -522,7 +520,6 @@ struct grpc_chttp2_stream {
explicit Reffer(grpc_chttp2_stream* s); explicit Reffer(grpc_chttp2_stream* s);
} reffer; } reffer;
grpc_closure destroy_stream;
grpc_closure* destroy_stream_arg; grpc_closure* destroy_stream_arg;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
@ -543,7 +540,7 @@ struct grpc_chttp2_stream {
int64_t next_message_end_offset; int64_t next_message_end_offset;
int64_t flow_controlled_bytes_written = 0; int64_t flow_controlled_bytes_written = 0;
int64_t flow_controlled_bytes_flowed = 0; int64_t flow_controlled_bytes_flowed = 0;
grpc_closure complete_fetch_locked; grpc_closure complete_fetch;
grpc_closure* fetching_send_message_finished = nullptr; grpc_closure* fetching_send_message_finished = nullptr;
grpc_metadata_batch* recv_initial_metadata; grpc_metadata_batch* recv_initial_metadata;

@ -97,14 +97,14 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
t->ping_state.is_delayed_ping_timer_set = true; t->ping_state.is_delayed_ping_timer_set = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked"); GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping, grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
&t->retry_initiate_ping_locked); &t->retry_initiate_ping);
} }
return; return;
} }
pq->inflight_id = t->ping_ctr; pq->inflight_id = t->ping_ctr;
t->ping_ctr++; t->ping_ctr++;
GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); GRPC_CLOSURE_LIST_RUN(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_slice_buffer_add(&t->outbuf, grpc_slice_buffer_add(&t->outbuf,

@ -355,4 +355,43 @@ inline void grpc_closure_list_sched(grpc_closure_list* list) {
grpc_closure_list_sched(closure_list) grpc_closure_list_sched(closure_list)
#endif #endif
#ifndef NDEBUG
inline void grpc_closure_list_run(const char* file, int line,
grpc_closure_list* list) {
#else
inline void grpc_closure_list_run(grpc_closure_list* list) {
#endif
grpc_closure* c = list->head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
#ifndef NDEBUG
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
"previously scheduled at: [%s: %d] run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
c->line_initiated, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
c->file_initiated = file;
c->line_initiated = line;
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
c->scheduler->vtable->run(c, c->error_data.error);
c = next;
}
list->head = list->tail = nullptr;
}
/** Schedule all closures in a list to be run. Does not need to be run from a
* safe point. */
#ifndef NDEBUG
#define GRPC_CLOSURE_LIST_RUN(closure_list) \
grpc_closure_list_run(__FILE__, __LINE__, closure_list)
#else
#define GRPC_CLOSURE_LIST_RUN(closure_list) grpc_closure_list_run(closure_list)
#endif
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */

@ -465,6 +465,10 @@ void Executor::ShutdownAll() {
bool Executor::IsThreaded(ExecutorType executor_type) { bool Executor::IsThreaded(ExecutorType executor_type) {
GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS); GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS);
Executor* executor = executors[static_cast<size_t>(executor_type)];
if (executor == nullptr) {
return false;
}
return executors[static_cast<size_t>(executor_type)]->IsThreaded(); return executors[static_cast<size_t>(executor_type)]->IsThreaded();
} }

Loading…
Cancel
Save