chttp2_transport changes

reviewable/pr20542/r1
Yash Tibrewal 5 years ago
parent 7a3388b077
commit 54dcbd170d
  1. 2
      src/core/ext/filters/client_channel/client_channel.cc
  2. 6
      src/core/ext/filters/client_channel/lb_policy.h
  3. 3
      src/core/ext/filters/client_channel/resolver.cc
  4. 6
      src/core/ext/filters/client_channel/resolver.h
  5. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
  6. 6
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
  7. 8
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc
  8. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  9. 16
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  10. 8
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  11. 2
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
  12. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
  13. 2
      src/core/ext/filters/client_channel/resolver_factory.h
  14. 2
      src/core/ext/filters/client_channel/resolver_registry.cc
  15. 2
      src/core/ext/filters/client_channel/resolver_registry.h
  16. 3
      src/core/ext/filters/client_channel/xds/xds_client.cc
  17. 6
      src/core/ext/filters/client_channel/xds/xds_client.h
  18. 197
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  19. 5
      src/core/ext/transport/chttp2/transport/hpack_parser.cc
  20. 4
      src/core/ext/transport/chttp2/transport/internal.h
  21. 5
      src/core/ext/transport/chttp2/transport/writing.cc
  22. 27
      src/core/lib/iomgr/combiner.cc
  23. 52
      src/core/lib/iomgr/combiner.h
  24. 5
      src/core/lib/iomgr/exec_ctx.h
  25. 2
      src/core/lib/iomgr/resource_quota.cc
  26. 1
      src/core/lib/transport/connectivity_state.cc
  27. 5
      src/core/lib/transport/connectivity_state.h

@ -282,7 +282,7 @@ class ChannelData {
//
// Fields used in the control plane. Guarded by combiner.
//
grpc_combiner* combiner_;
Combiner* combiner_;
grpc_pollset_set* interested_parties_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;

@ -313,7 +313,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
// TODO(roth): Once we have a C++-like interface for combiners, this
// API should change to take a smart pointer that does pass ownership
// of a reference.
grpc_combiner* combiner = nullptr;
Combiner* combiner = nullptr;
/// Channel control helper.
/// Note: LB policies MUST NOT call any method on the helper from
/// their constructor.
@ -383,7 +383,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
};
protected:
grpc_combiner* combiner() const { return combiner_; }
Combiner* combiner() const { return combiner_; }
// Note: LB policies MUST NOT call any method on the helper from their
// constructor.
@ -396,7 +396,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
private:
/// Combiner under which LB policy actions take place.
grpc_combiner* combiner_;
Combiner* combiner_;
/// Owned pointer to interested parties in load balancing decisions.
grpc_pollset_set* interested_parties_;
/// Channel control helper.

@ -30,8 +30,7 @@ namespace grpc_core {
// Resolver
//
Resolver::Resolver(grpc_combiner* combiner,
UniquePtr<ResultHandler> result_handler)
Resolver::Resolver(Combiner* combiner, UniquePtr<ResultHandler> result_handler)
: InternallyRefCounted(&grpc_trace_resolver_refcount),
result_handler_(std::move(result_handler)),
combiner_(GRPC_COMBINER_REF(combiner, "resolver")) {}

@ -126,19 +126,19 @@ class Resolver : public InternallyRefCounted<Resolver> {
// TODO(roth): Once we have a C++-like interface for combiners, this
// API should change to take a RefCountedPtr<>, so that we always take
// ownership of a new ref.
explicit Resolver(grpc_combiner* combiner,
explicit Resolver(Combiner* combiner,
UniquePtr<ResultHandler> result_handler);
/// Shuts down the resolver.
virtual void ShutdownLocked() = 0;
grpc_combiner* combiner() const { return combiner_; }
Combiner* combiner() const { return combiner_; }
ResultHandler* result_handler() const { return result_handler_.get(); }
private:
UniquePtr<ResultHandler> result_handler_;
grpc_combiner* combiner_;
Combiner* combiner_;
};
} // namespace grpc_core

@ -67,7 +67,7 @@ struct grpc_ares_ev_driver {
gpr_refcount refs;
/** combiner to synchronize c-ares and I/O callbacks on */
grpc_combiner* combiner;
Combiner* combiner;
/** a list of grpc_fd that this event driver is currently using. */
fd_node* fds;
/** is this event driver currently working? */
@ -146,7 +146,7 @@ void (*grpc_ares_test_only_inject_config)(ares_channel channel) =
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set,
int query_timeout_ms,
grpc_combiner* combiner,
Combiner* combiner,
grpc_ares_request* request) {
*ev_driver = grpc_core::New<grpc_ares_ev_driver>();
ares_options opts;

@ -43,7 +43,7 @@ ares_channel* grpc_ares_ev_driver_get_channel_locked(
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set,
int query_timeout_ms,
grpc_combiner* combiner,
Combiner* combiner,
grpc_ares_request* request);
/* Called back when all DNS lookups have completed. */
@ -90,12 +90,12 @@ class GrpcPolledFdFactory {
/* Creates a new wrapped fd for the current platform */
virtual GrpcPolledFd* NewGrpcPolledFdLocked(
ares_socket_t as, grpc_pollset_set* driver_pollset_set,
grpc_combiner* combiner) = 0;
Combiner* combiner) = 0;
/* Optionally configures the ares channel after creation */
virtual void ConfigureAresChannelLocked(ares_channel channel) = 0;
};
UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner);
UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Combiner* combiner);
} // namespace grpc_core

@ -41,7 +41,7 @@ void ares_uv_poll_close_cb(uv_handle_t* handle) { Delete(handle); }
class GrpcPolledFdLibuv : public GrpcPolledFd {
public:
GrpcPolledFdLibuv(ares_socket_t as, grpc_combiner* combiner)
GrpcPolledFdLibuv(ares_socket_t as, Combiner* combiner)
: as_(as), combiner_(combiner) {
gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, (intptr_t)as);
handle_ = New<uv_poll_t>();
@ -107,7 +107,7 @@ class GrpcPolledFdLibuv : public GrpcPolledFd {
grpc_closure* read_closure_ = nullptr;
grpc_closure* write_closure_ = nullptr;
int poll_events_ = 0;
grpc_combiner* combiner_;
Combiner* combiner_;
};
struct AresUvPollCbArg {
@ -162,14 +162,14 @@ class GrpcPolledFdFactoryLibuv : public GrpcPolledFdFactory {
public:
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
grpc_pollset_set* driver_pollset_set,
grpc_combiner* combiner) override {
Combiner* combiner) override {
return New<GrpcPolledFdLibuv>(as, combiner);
}
void ConfigureAresChannelLocked(ares_channel channel) override {}
};
UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Combiner* combiner) {
return MakeUnique<GrpcPolledFdFactoryLibuv>();
}

@ -90,14 +90,14 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
public:
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
grpc_pollset_set* driver_pollset_set,
grpc_combiner* combiner) override {
Combiner* combiner) override {
return New<GrpcPolledFdPosix>(as, driver_pollset_set);
}
void ConfigureAresChannelLocked(ares_channel channel) override {}
};
UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Combiner* combiner) {
return MakeUnique<GrpcPolledFdFactoryPosix>();
}

@ -97,8 +97,8 @@ class GrpcPolledFdWindows {
WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
};
GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner,
int address_family, int socket_type)
GrpcPolledFdWindows(ares_socket_t as, Combiner* combiner, int address_family,
int socket_type)
: read_buf_(grpc_empty_slice()),
write_buf_(grpc_empty_slice()),
tcp_write_state_(WRITE_IDLE),
@ -698,7 +698,7 @@ class GrpcPolledFdWindows {
bool gotten_into_driver_list() const { return gotten_into_driver_list_; }
void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
grpc_combiner* combiner_;
Combiner* combiner_;
char recv_from_source_addr_[200];
ares_socklen_t recv_from_source_addr_len_;
grpc_slice read_buf_;
@ -742,7 +742,7 @@ struct SockToPolledFdEntry {
* with a GrpcPolledFdWindows factory and event driver */
class SockToPolledFdMap {
public:
SockToPolledFdMap(grpc_combiner* combiner) {
SockToPolledFdMap(Combiner* combiner) {
combiner_ = GRPC_COMBINER_REF(combiner, "sock to polled fd map");
}
@ -861,7 +861,7 @@ class SockToPolledFdMap {
private:
SockToPolledFdEntry* head_ = nullptr;
grpc_combiner* combiner_;
Combiner* combiner_;
};
const struct ares_socket_functions custom_ares_sock_funcs = {
@ -910,12 +910,12 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd {
class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
public:
GrpcPolledFdFactoryWindows(grpc_combiner* combiner)
GrpcPolledFdFactoryWindows(Combiner* combiner)
: sock_to_polled_fd_map_(combiner) {}
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
grpc_pollset_set* driver_pollset_set,
grpc_combiner* combiner) override {
Combiner* combiner) override {
GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
// Set a flag so that the virtual socket "close" method knows it
// doesn't need to call ShutdownLocked, since now the driver will.
@ -932,7 +932,7 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
SockToPolledFdMap sock_to_polled_fd_map_;
};
UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(grpc_combiner* combiner) {
UniquePtr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Combiner* combiner) {
return MakeUnique<GrpcPolledFdFactoryWindows>(combiner);
}

@ -350,7 +350,7 @@ done:
void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
grpc_ares_request* r, const char* dns_server, const char* name,
const char* default_port, grpc_pollset_set* interested_parties,
bool check_grpclb, int query_timeout_ms, grpc_combiner* combiner) {
bool check_grpclb, int query_timeout_ms, Combiner* combiner) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_ares_hostbyname_request* hr = nullptr;
ares_channel* channel = nullptr;
@ -590,7 +590,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs,
bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner) {
Combiner* combiner) {
grpc_ares_request* r =
static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
r->ev_driver = nullptr;
@ -633,7 +633,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs,
bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
Combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {
GPR_ASSERT(r != nullptr);
@ -674,7 +674,7 @@ void grpc_ares_cleanup(void) {}
typedef struct grpc_resolve_address_ares_request {
/* combiner that queries and related callbacks run under */
grpc_combiner* combiner;
Combiner* combiner;
/** the pointer to receive the resolved addresses */
grpc_resolved_addresses** addrs_out;
/** currently resolving addresses */

@ -65,7 +65,7 @@ extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses,
bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner);
Combiner* combiner);
/* Cancel the pending grpc_ares_request \a request */
extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request);

@ -31,7 +31,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs,
bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner) {
Combiner* combiner) {
return NULL;
}
@ -40,7 +40,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addrs,
bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
Combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {}

@ -39,7 +39,7 @@ struct ResolverArgs {
/// Used to drive I/O in the name resolution process.
grpc_pollset_set* pollset_set = nullptr;
/// The combiner under which all resolver calls will be run.
grpc_combiner* combiner = nullptr;
Combiner* combiner = nullptr;
/// The result handler to be used by the resolver.
UniquePtr<Resolver::ResultHandler> result_handler;
};

@ -145,7 +145,7 @@ bool ResolverRegistry::IsValidTarget(const char* target) {
OrphanablePtr<Resolver> ResolverRegistry::CreateResolver(
const char* target, const grpc_channel_args* args,
grpc_pollset_set* pollset_set, grpc_combiner* combiner,
grpc_pollset_set* pollset_set, Combiner* combiner,
UniquePtr<Resolver::ResultHandler> result_handler) {
GPR_ASSERT(g_state != nullptr);
grpc_uri* uri = nullptr;

@ -68,7 +68,7 @@ class ResolverRegistry {
/// \a result_handler is used to return results from the resolver.
static OrphanablePtr<Resolver> CreateResolver(
const char* target, const grpc_channel_args* args,
grpc_pollset_set* pollset_set, grpc_combiner* combiner,
grpc_pollset_set* pollset_set, Combiner* combiner,
UniquePtr<Resolver::ResultHandler> result_handler);
/// Returns the default authority to pass from a client for \a target.

@ -1260,8 +1260,7 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
// XdsClient
//
XdsClient::XdsClient(grpc_combiner* combiner,
grpc_pollset_set* interested_parties,
XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
const char* balancer_name, StringView server_name,
UniquePtr<ServiceConfigWatcherInterface> watcher,
const grpc_channel_args& channel_args)

@ -68,7 +68,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
virtual void OnError(grpc_error* error) = 0;
};
XdsClient(grpc_combiner* combiner, grpc_pollset_set* interested_parties,
XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
const char* balancer_name, StringView server_name,
UniquePtr<ServiceConfigWatcherInterface> watcher,
const grpc_channel_args& channel_args);
@ -108,7 +108,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
static RefCountedPtr<XdsClient> GetFromChannelArgs(
const grpc_channel_args& args);
grpc_combiner* combiner() { return combiner_; }
Combiner* combiner() { return combiner_; }
private:
class ChannelState;
@ -133,7 +133,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
static const grpc_arg_pointer_vtable kXdsClientVtable;
grpc_combiner* combiner_;
Combiner* combiner_;
grpc_pollset_set* interested_parties_;
UniquePtr<char> server_name_;

@ -104,11 +104,14 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
/* forward declarations of various callbacks that we'll build closures around */
static void write_action_begin_locked(void* t, grpc_error* error);
static void write_action(void* t, grpc_error* error);
static void write_action_end(void* t, grpc_error* error);
static void write_action_end_locked(void* t, grpc_error* error);
static void read_action(void* t, grpc_error* error);
static void read_action_locked(void* t, grpc_error* error);
static void continue_read_action_locked(grpc_chttp2_transport* t);
static void complete_fetch(void* gs, grpc_error* error);
static void complete_fetch_locked(void* gs, grpc_error* error);
/** Set a transport level setting, and push it to our peer */
static void queue_setting_update(grpc_chttp2_transport* t,
@ -124,6 +127,8 @@ static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state,
const char* reason);
static void benign_reclaimer(void* t, grpc_error* error);
static void destructive_reclaimer(void* t, grpc_error* error);
static void benign_reclaimer_locked(void* t, grpc_error* error);
static void destructive_reclaimer_locked(void* t, grpc_error* error);
@ -136,6 +141,7 @@ static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
static void start_bdp_ping_locked(void* tp, grpc_error* error);
static void finish_bdp_ping_locked(void* tp, grpc_error* error);
static void next_bdp_ping_timer_expired(void* tp, grpc_error* error);
static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
@ -145,9 +151,11 @@ static void send_ping_locked(grpc_chttp2_transport* t,
static void retry_initiate_ping_locked(void* tp, grpc_error* error);
/** keepalive-relevant functions */
static void init_keepalive_ping(void* arg, grpc_error* error);
static void init_keepalive_ping_locked(void* arg, grpc_error* error);
static void start_keepalive_ping_locked(void* arg, grpc_error* error);
static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
static void keepalive_watchdog_fired(void* arg, grpc_error* error);
static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
static void reset_byte_stream(void* arg, grpc_error* error);
@ -378,33 +386,15 @@ static bool read_channel_args(grpc_chttp2_transport* t,
}
static void init_transport_closures(grpc_chttp2_transport* t) {
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
t, grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner));
nullptr);
GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
next_bdp_ping_timer_expired_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
t, grpc_combiner_scheduler(t->combiner));
nullptr);
GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
start_keepalive_ping_locked, t,
grpc_combiner_scheduler(t->combiner));
start_keepalive_ping_locked, t, nullptr);
GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
finish_keepalive_ping_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
finish_keepalive_ping_locked, t, nullptr);
}
static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
@ -442,6 +432,8 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
@ -556,9 +548,8 @@ static void destroy_transport_locked(void* tp, grpc_error* error) {
static void destroy_transport(grpc_transport* gt) {
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
GRPC_ERROR_NONE);
}
static void close_transport_locked(grpc_chttp2_transport* t,
@ -669,11 +660,7 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
grpc_slice_buffer_init(&frame_storage);
grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
grpc_slice_buffer_init(&flow_controlled_buffer);
GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, nullptr);
}
grpc_chttp2_stream::~grpc_chttp2_stream() {
@ -766,9 +753,8 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
}
s->destroy_stream_arg = then_schedule_closure;
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
grpc_combiner_scheduler(t->combiner)),
t->combiner->Run(
GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
GRPC_ERROR_NONE);
}
@ -929,10 +915,9 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
* Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
* It does not call the endpoint to write the bytes. That is done by the
* 'write_action' (which is scheduled by 'write_action_begin_locked') */
GRPC_CLOSURE_SCHED(
t->combiner->FinallyRun(
GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t,
grpc_combiner_finally_scheduler(t->combiner)),
write_action_begin_locked, t, nullptr),
GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
@ -1006,11 +991,18 @@ static void write_action(void* gt, grpc_error* error) {
t->cl = nullptr;
grpc_endpoint_write(
t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
grpc_combiner_scheduler(t->combiner)),
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
grpc_schedule_on_exec_ctx),
cl);
}
static void write_action_end(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked,
write_action_end_locked, t, nullptr),
GRPC_ERROR_REF(error));
}
/* Callback from the grpc_endpoint after bytes have been written by calling
* sendmsg */
static void write_action_end_locked(void* tp, grpc_error* error) {
@ -1051,10 +1043,9 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
if (!closed) {
GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
}
GRPC_CLOSURE_RUN(
t->combiner->FinallyRun(
GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
write_action_begin_locked, t,
grpc_combiner_finally_scheduler(t->combiner)),
write_action_begin_locked, t, nullptr),
GRPC_ERROR_NONE);
break;
}
@ -1305,8 +1296,10 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
}
s->fetching_send_message.reset();
return; /* early out */
} else if (s->fetching_send_message->Next(UINT32_MAX,
&s->complete_fetch_locked)) {
} else if (s->fetching_send_message->Next(
UINT32_MAX, GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
::complete_fetch, s,
grpc_schedule_on_exec_ctx))) {
grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error != GRPC_ERROR_NONE) {
s->fetching_send_message.reset();
@ -1318,6 +1311,13 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
}
}
static void complete_fetch(void* gs, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
s->t->combiner->Run(GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
::complete_fetch_locked, s, nullptr),
GRPC_ERROR_REF(error));
}
static void complete_fetch_locked(void* gs, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
grpc_chttp2_transport* t = s->t;
@ -1668,10 +1668,9 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
op->handler_private.extra_arg = gs;
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked,
op, grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
perform_stream_op_locked, op, nullptr),
GRPC_ERROR_NONE);
}
static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
@ -1681,7 +1680,7 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]);
t->combiner->Run(&pq->lists[j]);
}
GRPC_ERROR_UNREF(error);
}
@ -1727,6 +1726,13 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
&t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
}
void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
retry_initiate_ping_locked, t, nullptr),
GRPC_ERROR_REF(error));
}
static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->ping_state.is_delayed_ping_timer_set = false;
@ -1744,7 +1750,7 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
gpr_free(from);
return;
}
GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
t->combiner->Run(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
}
@ -1835,10 +1841,9 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
}
op->handler_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure,
perform_transport_op_locked, op,
grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
perform_transport_op_locked, op, nullptr),
GRPC_ERROR_NONE);
}
/*******************************************************************************
@ -2479,6 +2484,13 @@ static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
return error;
}
static void read_action(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->combiner->Run(
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
GRPC_ERROR_REF(error));
}
static void read_action_locked(void* tp, grpc_error* error) {
GPR_TIMER_SCOPE("reading_action_locked", 0);
@ -2576,6 +2588,8 @@ static void read_action_locked(void* tp, grpc_error* error) {
static void continue_read_action_locked(grpc_chttp2_transport* t) {
const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
grpc_schedule_on_exec_ctx);
grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
}
@ -2618,10 +2632,20 @@ static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
nullptr);
GPR_ASSERT(!t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = true;
GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
&t->next_bdp_ping_timer_expired_locked);
}
static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->combiner->Run(
GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
next_bdp_ping_timer_expired_locked, t, nullptr),
GRPC_ERROR_REF(error));
}
static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
GPR_ASSERT(t->have_next_bdp_ping_timer);
@ -2700,6 +2724,13 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
}
}
static void init_keepalive_ping(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
init_keepalive_ping_locked, t, nullptr),
GRPC_ERROR_REF(error));
}
static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
@ -2715,6 +2746,8 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
} else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
@ -2722,6 +2755,8 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
} else if (error == GRPC_ERROR_CANCELLED) {
/* The keepalive ping timer may be cancelled by bdp */
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
@ -2741,6 +2776,8 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string);
}
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_watchdog_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
&t->keepalive_watchdog_fired_locked);
@ -2756,6 +2793,8 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
grpc_timer_cancel(&t->keepalive_watchdog_timer);
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
@ -2764,6 +2803,14 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
}
static void keepalive_watchdog_fired(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
t->combiner->Run(
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired_locked, t, nullptr),
GRPC_ERROR_REF(error));
}
static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
@ -2864,10 +2911,9 @@ void Chttp2IncomingByteStream::OrphanLocked(void* arg,
void Chttp2IncomingByteStream::Orphan() {
GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
GRPC_CLOSURE_SCHED(
transport_->combiner->Run(
GRPC_CLOSURE_INIT(&destroy_action_,
&Chttp2IncomingByteStream::OrphanLocked, this,
grpc_combiner_scheduler(transport_->combiner)),
&Chttp2IncomingByteStream::OrphanLocked, this, nullptr),
GRPC_ERROR_NONE);
}
@ -2924,10 +2970,9 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
Ref();
next_action_.max_size_hint = max_size_hint;
next_action_.on_complete = on_complete;
GRPC_CLOSURE_SCHED(
transport_->combiner->Run(
GRPC_CLOSURE_INIT(&next_action_.closure,
&Chttp2IncomingByteStream::NextLocked, this,
grpc_combiner_scheduler(transport_->combiner)),
&Chttp2IncomingByteStream::NextLocked, this, nullptr),
GRPC_ERROR_NONE);
return false;
}
@ -2980,7 +3025,8 @@ grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
}
} else {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
stream_->t->combiner->Run(&stream_->reset_byte_stream,
GRPC_ERROR_REF(error));
return error;
}
return GRPC_ERROR_NONE;
@ -3000,7 +3046,8 @@ grpc_error* Chttp2IncomingByteStream::Push(const grpc_slice& slice,
if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
transport_->combiner->Run(&stream_->reset_byte_stream,
GRPC_ERROR_REF(error));
grpc_slice_unref_internal(slice);
return error;
} else {
@ -3020,7 +3067,8 @@ grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
}
}
if (error != GRPC_ERROR_NONE && reset_on_error) {
GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
transport_->combiner->Run(&stream_->reset_byte_stream,
GRPC_ERROR_REF(error));
}
Unref();
return error;
@ -3040,6 +3088,8 @@ static void post_benign_reclaimer(grpc_chttp2_transport* t) {
if (!t->benign_reclaimer_registered) {
t->benign_reclaimer_registered = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer, t,
grpc_schedule_on_exec_ctx);
grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
false, &t->benign_reclaimer_locked);
}
@ -3049,11 +3099,21 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
if (!t->destructive_reclaimer_registered) {
t->destructive_reclaimer_registered = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
grpc_schedule_on_exec_ctx);
grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
true, &t->destructive_reclaimer_locked);
}
}
static void benign_reclaimer(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
t->combiner->Run(GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked,
benign_reclaimer_locked, t, nullptr),
GRPC_ERROR_REF(error));
}
static void benign_reclaimer_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
if (error == GRPC_ERROR_NONE &&
@ -3083,6 +3143,13 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
}
static void destructive_reclaimer(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
t->combiner->Run(GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t, nullptr),
GRPC_ERROR_REF(error));
}
static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
@ -3209,5 +3276,7 @@ void grpc_chttp2_transport_start_reading(
gpr_free(read_buffer);
}
t->notify_on_receive_settings = notify_on_receive_settings;
GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE);
t->combiner->Run(
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
GRPC_ERROR_NONE);
}

@ -1741,9 +1741,8 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser,
however -- it might be that we receive a RST_STREAM following this
and can avoid the extra write */
GRPC_CHTTP2_STREAM_REF(s, "final_rst");
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(force_client_rst_stream, s,
grpc_combiner_finally_scheduler(t->combiner)),
t->combiner->FinallyRun(
GRPC_CLOSURE_CREATE(force_client_rst_stream, s, nullptr),
GRPC_ERROR_NONE);
}
grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);

@ -300,7 +300,7 @@ struct grpc_chttp2_transport {
grpc_resource_user* resource_user;
grpc_combiner* combiner;
grpc_core::Combiner* combiner;
grpc_closure* notify_on_receive_settings = nullptr;
@ -862,4 +862,6 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
bool is_client);
void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error* error);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */

@ -96,6 +96,9 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
if (!t->ping_state.is_delayed_ping_timer_set) {
t->ping_state.is_delayed_ping_timer_set = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
grpc_chttp2_retry_initiate_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
&t->retry_initiate_ping_locked);
}
@ -104,7 +107,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
pq->inflight_id = t->ping_ctr;
t->ping_ctr++;
GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
t->combiner->Run(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_slice_buffer_add(&t->outbuf,

@ -309,8 +309,10 @@ static void combiner_finally_exec(Combiner* lock, grpc_closure* closure,
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner));
if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
grpc_core::Combiner::Exec(
lock, GRPC_CLOSURE_CREATE(enqueue_finally, closure, nullptr), error);
lock->Run(
GRPC_CLOSURE_CREATE(enqueue_finally, closure,
reinterpret_cast<grpc_closure_scheduler*>(lock)),
error);
return;
}
@ -337,15 +339,26 @@ static void combiner_run(Combiner* lock, grpc_closure* closure,
}
static void enqueue_finally(void* closure, grpc_error* error) {
combiner_finally_exec(static_cast<grpc_closure*>(closure),
grpc_closure* cl = static_cast<grpc_closure*>(cl);
combiner_finally_exec(reinterpret_cast<Combiner*>(cl->scheduler), cl,
GRPC_ERROR_REF(error));
}
static void Combiner::Run(grpc_closure* closure, grpc_error* error) {
combiner_exec(combiner, closure, error);
void Combiner::Run(grpc_closure* closure, grpc_error* error) {
combiner_exec(this, closure, error);
}
static void Combiner::FinallyRun(grpc_closure* closure, grpc_error* error) {
combiner_finally_exec(combiner, closure, exec);
void Combiner::Run(grpc_closure_list* list) {
grpc_closure* c = list->head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;
Run(c, c->error_data.error);
c = next;
}
list->head = list->tail = nullptr;
}
void Combiner::FinallyRun(grpc_closure* closure, grpc_error* error) {
combiner_finally_exec(this, closure, error);
}
} // namespace grpc_core

@ -27,6 +27,32 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
class Combiner {
public:
void Run(grpc_closure* closure, grpc_error* error);
void Run(grpc_closure_list* list);
void FinallyRun(grpc_closure* closure, grpc_error* error);
Combiner* next_combiner_on_this_exec_ctx = nullptr;
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;
grpc_core::MultiProducerSingleConsumerQueue queue;
// either:
// a pointer to the initiating exec ctx if that is the only exec_ctx that has
// ever queued to this combiner, or NULL. If this is non-null, it's not
// dereferencable (since the initiating exec_ctx may have gone out of scope)
gpr_atm initiating_exec_ctx_or_null;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
bool time_to_execute_final_list = false;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
} // namespace grpc_core
// Provides serialized access to some resource.
// Each action queued on a combiner is executed serially in a borrowed thread.
// The actual thread executing actions may change over time (but there will only
@ -59,30 +85,4 @@ bool grpc_combiner_continue_exec_ctx();
extern grpc_core::DebugOnlyTraceFlag grpc_combiner_trace;
namespace grpc_core {
class Combiner {
public:
static void Run(grpc_closure* closure, grpc_error* error);
static void FinallyRun(grpc_closure* closure, grpc_error* error);
Combiner* next_combiner_on_this_exec_ctx = nullptr;
grpc_closure_scheduler scheduler;
grpc_closure_scheduler finally_scheduler;
grpc_core::MultiProducerSingleConsumerQueue queue;
// either:
// a pointer to the initiating exec ctx if that is the only exec_ctx that has
// ever queued to this combiner, or NULL. If this is non-null, it's not
// dereferencable (since the initiating exec_ctx may have gone out of scope)
gpr_atm initiating_exec_ctx_or_null;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
bool time_to_execute_final_list = false;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */

@ -63,6 +63,7 @@ grpc_millis grpc_cycle_counter_to_millis_round_down(gpr_cycle_counter cycles);
grpc_millis grpc_cycle_counter_to_millis_round_up(gpr_cycle_counter cycles);
namespace grpc_core {
class Combiner;
/** Execution context.
* A bag of data that collects information along a callstack.
* It is created on the stack at core entry points (public API or iomgr), and
@ -136,9 +137,9 @@ class ExecCtx {
struct CombinerData {
/* currently active combiner: updated only via combiner.c */
grpc_combiner* active_combiner;
Combiner* active_combiner;
/* last active combiner in the active combiner list */
grpc_combiner* last_combiner;
Combiner* last_combiner;
};
/** Only to be used by grpc-combiner code */

@ -132,7 +132,7 @@ struct grpc_resource_quota {
/* Master combiner lock: all activity on a quota executes under this combiner
* (so no mutex is needed for this data structure) */
grpc_combiner* combiner;
grpc_core::Combiner* combiner;
/* Size of the resource quota */
int64_t size;
/* Amount of free memory in the resource quota */

@ -26,6 +26,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {

@ -68,15 +68,14 @@ class AsyncConnectivityStateWatcherInterface
protected:
class Notifier;
explicit AsyncConnectivityStateWatcherInterface(
grpc_core::Combiner* combiner = nullptr)
explicit AsyncConnectivityStateWatcherInterface(Combiner* combiner = nullptr)
: combiner_(combiner) {}
// Invoked asynchronously when Notify() is called.
virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0;
private:
grpc_closure_scheduler* scheduler_;
Combiner* combiner_;
};
// Tracks connectivity state. Maintains a list of watchers that are

Loading…
Cancel
Save