Merge pull request #20435 from grpc/revert-20331-chttp2combiner

Revert "Chttp2combiner"
pull/20451/head
Karthik Ravi Shankar 5 years ago committed by GitHub
commit eb07a377cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 739
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 23
      src/core/ext/transport/chttp2/transport/hpack_parser.cc
  3. 31
      src/core/ext/transport/chttp2/transport/internal.h
  4. 4
      src/core/ext/transport/chttp2/transport/writing.cc
  5. 39
      src/core/lib/iomgr/closure.h
  6. 4
      src/core/lib/iomgr/executor.cc

File diff suppressed because it is too large Load Diff

@ -1669,15 +1669,11 @@ static const maybe_complete_func_type maybe_complete_funcs[] = {
static void force_client_rst_stream(void* sp, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
grpc_chttp2_transport* t = s->t;
{
grpc_core::MutexLock lock(&t->mu);
if (!s->write_closed) {
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing);
grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE);
}
if (!s->write_closed) {
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE);
}
GRPC_CHTTP2_STREAM_UNREF(s, "final_rst");
}
@ -1744,12 +1740,11 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser,
the stream. Wait until the combiner lock is ready to be released
however -- it might be that we receive a RST_STREAM following this
and can avoid the extra write */
// TODO(yashykt) : When we were using combiners, we were using the
// finally version. Maybe do something similar?
GRPC_CHTTP2_STREAM_REF(s, "final_rst");
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(force_client_rst_stream, s,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(force_client_rst_stream, s,
grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
}
grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);
}

@ -39,7 +39,6 @@
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/compression/stream_compression.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/timer.h"
@ -254,6 +253,7 @@ class Chttp2IncomingByteStream : public ByteStream {
private:
static void NextLocked(void* arg, grpc_error* error_ignored);
static void OrphanLocked(void* arg, grpc_error* error_ignored);
void MaybeCreateStreamDecompressionCtx();
@ -275,6 +275,7 @@ class Chttp2IncomingByteStream : public ByteStream {
size_t max_size_hint;
grpc_closure* on_complete;
} next_action_;
grpc_closure destroy_action_;
};
} // namespace grpc_core
@ -293,13 +294,14 @@ struct grpc_chttp2_transport {
~grpc_chttp2_transport();
grpc_transport base; /* must be first */
grpc_core::Mutex mu;
grpc_core::RefCount refs;
grpc_endpoint* ep;
char* peer_string;
grpc_resource_user* resource_user;
grpc_combiner* combiner;
grpc_closure* notify_on_receive_settings = nullptr;
/** write execution state of the transport */
@ -325,11 +327,11 @@ struct grpc_chttp2_transport {
/** maps stream id to grpc_chttp2_stream objects */
grpc_chttp2_stream_map stream_map;
grpc_closure write_action_begin;
grpc_closure write_action_begin_locked;
grpc_closure write_action;
grpc_closure write_action_end;
grpc_closure write_action_end_locked;
grpc_closure read_action;
grpc_closure read_action_locked;
/** incoming read bytes */
grpc_slice_buffer read_buffer;
@ -390,7 +392,7 @@ struct grpc_chttp2_transport {
grpc_chttp2_repeated_ping_policy ping_policy;
grpc_chttp2_repeated_ping_state ping_state;
uint64_t ping_ctr = 0; /* unique id for pings */
grpc_closure retry_initiate_ping;
grpc_closure retry_initiate_ping_locked;
/** ping acks */
size_t ping_ack_count = 0;
@ -440,9 +442,9 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb* write_cb_pool = nullptr;
/* bdp estimator */
grpc_closure next_bdp_ping_timer_expired;
grpc_closure next_bdp_ping_timer_expired_locked;
grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping;
grpc_closure finish_bdp_ping_locked;
/* if non-NULL, close the transport with this error when writes are finished
*/
@ -457,9 +459,9 @@ struct grpc_chttp2_transport {
/** have we scheduled a destructive cleanup? */
bool destructive_reclaimer_registered = false;
/** benign cleanup closure */
grpc_closure benign_reclaimer;
grpc_closure benign_reclaimer_locked;
/** destructive cleanup closure */
grpc_closure destructive_reclaimer;
grpc_closure destructive_reclaimer_locked;
/* next bdp ping timer */
bool have_next_bdp_ping_timer = false;
@ -467,13 +469,13 @@ struct grpc_chttp2_transport {
/* keep-alive ping support */
/** Closure to initialize a keepalive ping */
grpc_closure init_keepalive_ping;
grpc_closure init_keepalive_ping_locked;
/** Closure to run when the keepalive ping is sent */
grpc_closure start_keepalive_ping_locked;
/** Cousure to run when the keepalive ping ack is received */
grpc_closure finish_keepalive_ping;
grpc_closure finish_keepalive_ping_locked;
/** Closrue to run when the keepalive ping timeouts */
grpc_closure keepalive_watchdog_fired;
grpc_closure keepalive_watchdog_fired_locked;
/** timer to initiate ping events */
grpc_timer keepalive_ping_timer;
/** watchdog to kill the transport when waiting for the keepalive ping */
@ -520,6 +522,7 @@ struct grpc_chttp2_stream {
explicit Reffer(grpc_chttp2_stream* s);
} reffer;
grpc_closure destroy_stream;
grpc_closure* destroy_stream_arg;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
@ -540,7 +543,7 @@ struct grpc_chttp2_stream {
int64_t next_message_end_offset;
int64_t flow_controlled_bytes_written = 0;
int64_t flow_controlled_bytes_flowed = 0;
grpc_closure complete_fetch;
grpc_closure complete_fetch_locked;
grpc_closure* fetching_send_message_finished = nullptr;
grpc_metadata_batch* recv_initial_metadata;

@ -97,14 +97,14 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
t->ping_state.is_delayed_ping_timer_set = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
&t->retry_initiate_ping);
&t->retry_initiate_ping_locked);
}
return;
}
pq->inflight_id = t->ping_ctr;
t->ping_ctr++;
GRPC_CLOSURE_LIST_RUN(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_slice_buffer_add(&t->outbuf,

@ -355,43 +355,4 @@ inline void grpc_closure_list_sched(grpc_closure_list* list) {
grpc_closure_list_sched(closure_list)
#endif
#ifndef NDEBUG
inline void grpc_closure_list_run(const char* file, int line,
grpc_closure_list* list) {
#else
inline void grpc_closure_list_run(grpc_closure_list* list) {
#endif
grpc_closure* c = list->head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
#ifndef NDEBUG
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
"previously scheduled at: [%s: %d] run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
c->line_initiated, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
c->file_initiated = file;
c->line_initiated = line;
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
c->scheduler->vtable->run(c, c->error_data.error);
c = next;
}
list->head = list->tail = nullptr;
}
/** Schedule all closures in a list to be run. Does not need to be run from a
* safe point. */
#ifndef NDEBUG
#define GRPC_CLOSURE_LIST_RUN(closure_list) \
grpc_closure_list_run(__FILE__, __LINE__, closure_list)
#else
#define GRPC_CLOSURE_LIST_RUN(closure_list) grpc_closure_list_run(closure_list)
#endif
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */

@ -465,10 +465,6 @@ void Executor::ShutdownAll() {
bool Executor::IsThreaded(ExecutorType executor_type) {
GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS);
Executor* executor = executors[static_cast<size_t>(executor_type)];
if (executor == nullptr) {
return false;
}
return executors[static_cast<size_t>(executor_type)]->IsThreaded();
}

Loading…
Cancel
Save