From 54dcbd170d08e91b20a83cc0c9e23d01bc1e05a7 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 9 Oct 2019 13:16:08 -0700 Subject: [PATCH] chttp2_transport changes --- .../filters/client_channel/client_channel.cc | 2 +- .../ext/filters/client_channel/lb_policy.h | 6 +- .../ext/filters/client_channel/resolver.cc | 3 +- .../ext/filters/client_channel/resolver.h | 6 +- .../dns/c_ares/grpc_ares_ev_driver.cc | 4 +- .../resolver/dns/c_ares/grpc_ares_ev_driver.h | 6 +- .../dns/c_ares/grpc_ares_ev_driver_libuv.cc | 8 +- .../dns/c_ares/grpc_ares_ev_driver_posix.cc | 4 +- .../dns/c_ares/grpc_ares_ev_driver_windows.cc | 16 +- .../resolver/dns/c_ares/grpc_ares_wrapper.cc | 8 +- .../resolver/dns/c_ares/grpc_ares_wrapper.h | 2 +- .../dns/c_ares/grpc_ares_wrapper_fallback.cc | 4 +- .../filters/client_channel/resolver_factory.h | 2 +- .../client_channel/resolver_registry.cc | 2 +- .../client_channel/resolver_registry.h | 2 +- .../filters/client_channel/xds/xds_client.cc | 3 +- .../filters/client_channel/xds/xds_client.h | 6 +- .../chttp2/transport/chttp2_transport.cc | 197 ++++++++++++------ .../chttp2/transport/hpack_parser.cc | 5 +- .../ext/transport/chttp2/transport/internal.h | 4 +- .../ext/transport/chttp2/transport/writing.cc | 5 +- src/core/lib/iomgr/combiner.cc | 27 ++- src/core/lib/iomgr/combiner.h | 52 ++--- src/core/lib/iomgr/exec_ctx.h | 5 +- src/core/lib/iomgr/resource_quota.cc | 2 +- src/core/lib/transport/connectivity_state.cc | 1 + src/core/lib/transport/connectivity_state.h | 5 +- 27 files changed, 236 insertions(+), 151 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index a1952999f85..033816a0b43 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -282,7 +282,7 @@ class ChannelData { // // Fields used in the control plane. Guarded by combiner. // - grpc_combiner* combiner_; + Combiner* combiner_; grpc_pollset_set* interested_parties_; RefCountedPtr subchannel_pool_; OrphanablePtr resolving_lb_policy_; diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 3b7c604d4fa..1a5ea06615c 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -313,7 +313,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { // TODO(roth): Once we have a C++-like interface for combiners, this // API should change to take a smart pointer that does pass ownership // of a reference. - grpc_combiner* combiner = nullptr; + Combiner* combiner = nullptr; /// Channel control helper. /// Note: LB policies MUST NOT call any method on the helper from /// their constructor. @@ -383,7 +383,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { }; protected: - grpc_combiner* combiner() const { return combiner_; } + Combiner* combiner() const { return combiner_; } // Note: LB policies MUST NOT call any method on the helper from their // constructor. @@ -396,7 +396,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { private: /// Combiner under which LB policy actions take place. - grpc_combiner* combiner_; + Combiner* combiner_; /// Owned pointer to interested parties in load balancing decisions. grpc_pollset_set* interested_parties_; /// Channel control helper. diff --git a/src/core/ext/filters/client_channel/resolver.cc b/src/core/ext/filters/client_channel/resolver.cc index b50c42f6a1c..fd20def0d85 100644 --- a/src/core/ext/filters/client_channel/resolver.cc +++ b/src/core/ext/filters/client_channel/resolver.cc @@ -30,8 +30,7 @@ namespace grpc_core { // Resolver // -Resolver::Resolver(grpc_combiner* combiner, - UniquePtr result_handler) +Resolver::Resolver(Combiner* combiner, UniquePtr result_handler) : InternallyRefCounted(&grpc_trace_resolver_refcount), result_handler_(std::move(result_handler)), combiner_(GRPC_COMBINER_REF(combiner, "resolver")) {} diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index ebeb3912032..e57ca12a786 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -126,19 +126,19 @@ class Resolver : public InternallyRefCounted { // TODO(roth): Once we have a C++-like interface for combiners, this // API should change to take a RefCountedPtr<>, so that we always take // ownership of a new ref. - explicit Resolver(grpc_combiner* combiner, + explicit Resolver(Combiner* combiner, UniquePtr result_handler); /// Shuts down the resolver. virtual void ShutdownLocked() = 0; - grpc_combiner* combiner() const { return combiner_; } + Combiner* combiner() const { return combiner_; } ResultHandler* result_handler() const { return result_handler_.get(); } private: UniquePtr result_handler_; - grpc_combiner* combiner_; + Combiner* combiner_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc index e345ed73db8..25706551c8f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -67,7 +67,7 @@ struct grpc_ares_ev_driver { gpr_refcount refs; /** combiner to synchronize c-ares and I/O callbacks on */ - grpc_combiner* combiner; + Combiner* combiner; /** a list of grpc_fd that this event driver is currently using. */ fd_node* fds; /** is this event driver currently working? */ @@ -146,7 +146,7 @@ void (*grpc_ares_test_only_inject_config)(ares_channel channel) = grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, int query_timeout_ms, - grpc_combiner* combiner, + Combiner* combiner, grpc_ares_request* request) { *ev_driver = grpc_core::New(); ares_options opts; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index fe51c34bc3e..4bbbb6c3d3b 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -43,7 +43,7 @@ ares_channel* grpc_ares_ev_driver_get_channel_locked( grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set, int query_timeout_ms, - grpc_combiner* combiner, + Combiner* combiner, grpc_ares_request* request); /* Called back when all DNS lookups have completed. */ @@ -90,12 +90,12 @@ class GrpcPolledFdFactory { /* Creates a new wrapped fd for the current platform */ virtual GrpcPolledFd* NewGrpcPolledFdLocked( ares_socket_t as, grpc_pollset_set* driver_pollset_set, - grpc_combiner* combiner) = 0; + Combiner* combiner) = 0; /* Optionally configures the ares channel after creation */ virtual void ConfigureAresChannelLocked(ares_channel channel) = 0; }; -UniquePtr NewGrpcPolledFdFactory(grpc_combiner* combiner); +UniquePtr NewGrpcPolledFdFactory(Combiner* combiner); } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc index a65b03e19dc..f5acec4fda7 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc @@ -41,7 +41,7 @@ void ares_uv_poll_close_cb(uv_handle_t* handle) { Delete(handle); } class GrpcPolledFdLibuv : public GrpcPolledFd { public: - GrpcPolledFdLibuv(ares_socket_t as, grpc_combiner* combiner) + GrpcPolledFdLibuv(ares_socket_t as, Combiner* combiner) : as_(as), combiner_(combiner) { gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, (intptr_t)as); handle_ = New(); @@ -107,7 +107,7 @@ class GrpcPolledFdLibuv : public GrpcPolledFd { grpc_closure* read_closure_ = nullptr; grpc_closure* write_closure_ = nullptr; int poll_events_ = 0; - grpc_combiner* combiner_; + Combiner* combiner_; }; struct AresUvPollCbArg { @@ -162,14 +162,14 @@ class GrpcPolledFdFactoryLibuv : public GrpcPolledFdFactory { public: GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as, grpc_pollset_set* driver_pollset_set, - grpc_combiner* combiner) override { + Combiner* combiner) override { return New(as, combiner); } void ConfigureAresChannelLocked(ares_channel channel) override {} }; -UniquePtr NewGrpcPolledFdFactory(grpc_combiner* combiner) { +UniquePtr NewGrpcPolledFdFactory(Combiner* combiner) { return MakeUnique(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index f167047ebc9..e588010badb 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -90,14 +90,14 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory { public: GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as, grpc_pollset_set* driver_pollset_set, - grpc_combiner* combiner) override { + Combiner* combiner) override { return New(as, driver_pollset_set); } void ConfigureAresChannelLocked(ares_channel channel) override {} }; -UniquePtr NewGrpcPolledFdFactory(grpc_combiner* combiner) { +UniquePtr NewGrpcPolledFdFactory(Combiner* combiner) { return MakeUnique(); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index 13b3c590f93..2fb972a82ef 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -97,8 +97,8 @@ class GrpcPolledFdWindows { WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY, }; - GrpcPolledFdWindows(ares_socket_t as, grpc_combiner* combiner, - int address_family, int socket_type) + GrpcPolledFdWindows(ares_socket_t as, Combiner* combiner, int address_family, + int socket_type) : read_buf_(grpc_empty_slice()), write_buf_(grpc_empty_slice()), tcp_write_state_(WRITE_IDLE), @@ -698,7 +698,7 @@ class GrpcPolledFdWindows { bool gotten_into_driver_list() const { return gotten_into_driver_list_; } void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; } - grpc_combiner* combiner_; + Combiner* combiner_; char recv_from_source_addr_[200]; ares_socklen_t recv_from_source_addr_len_; grpc_slice read_buf_; @@ -742,7 +742,7 @@ struct SockToPolledFdEntry { * with a GrpcPolledFdWindows factory and event driver */ class SockToPolledFdMap { public: - SockToPolledFdMap(grpc_combiner* combiner) { + SockToPolledFdMap(Combiner* combiner) { combiner_ = GRPC_COMBINER_REF(combiner, "sock to polled fd map"); } @@ -861,7 +861,7 @@ class SockToPolledFdMap { private: SockToPolledFdEntry* head_ = nullptr; - grpc_combiner* combiner_; + Combiner* combiner_; }; const struct ares_socket_functions custom_ares_sock_funcs = { @@ -910,12 +910,12 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd { class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { public: - GrpcPolledFdFactoryWindows(grpc_combiner* combiner) + GrpcPolledFdFactoryWindows(Combiner* combiner) : sock_to_polled_fd_map_(combiner) {} GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as, grpc_pollset_set* driver_pollset_set, - grpc_combiner* combiner) override { + Combiner* combiner) override { GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as); // Set a flag so that the virtual socket "close" method knows it // doesn't need to call ShutdownLocked, since now the driver will. @@ -932,7 +932,7 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory { SockToPolledFdMap sock_to_polled_fd_map_; }; -UniquePtr NewGrpcPolledFdFactory(grpc_combiner* combiner) { +UniquePtr NewGrpcPolledFdFactory(Combiner* combiner) { return MakeUnique(combiner); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index ad7677e63a9..05b0b989fb6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -350,7 +350,7 @@ done: void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( grpc_ares_request* r, const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, - bool check_grpclb, int query_timeout_ms, grpc_combiner* combiner) { + bool check_grpclb, int query_timeout_ms, Combiner* combiner) { grpc_error* error = GRPC_ERROR_NONE; grpc_ares_hostbyname_request* hr = nullptr; ares_channel* channel = nullptr; @@ -590,7 +590,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr* addrs, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner) { + Combiner* combiner) { grpc_ares_request* r = static_cast(gpr_zalloc(sizeof(grpc_ares_request))); r->ev_driver = nullptr; @@ -633,7 +633,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr* addrs, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; + Combiner* combiner) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) { GPR_ASSERT(r != nullptr); @@ -674,7 +674,7 @@ void grpc_ares_cleanup(void) {} typedef struct grpc_resolve_address_ares_request { /* combiner that queries and related callbacks run under */ - grpc_combiner* combiner; + Combiner* combiner; /** the pointer to receive the resolved addresses */ grpc_resolved_addresses** addrs_out; /** currently resolving addresses */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index cc977c06b25..e236de0b21a 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -65,7 +65,7 @@ extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner); + Combiner* combiner); /* Cancel the pending grpc_ares_request \a request */ extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc index d2de88eaa1e..c731a7891b1 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc @@ -31,7 +31,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr* addrs, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner) { + Combiner* combiner) { return NULL; } @@ -40,7 +40,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr* addrs, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl; + Combiner* combiner) = grpc_dns_lookup_ares_locked_impl; static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {} diff --git a/src/core/ext/filters/client_channel/resolver_factory.h b/src/core/ext/filters/client_channel/resolver_factory.h index 7cee4879814..0349c4745ff 100644 --- a/src/core/ext/filters/client_channel/resolver_factory.h +++ b/src/core/ext/filters/client_channel/resolver_factory.h @@ -39,7 +39,7 @@ struct ResolverArgs { /// Used to drive I/O in the name resolution process. grpc_pollset_set* pollset_set = nullptr; /// The combiner under which all resolver calls will be run. - grpc_combiner* combiner = nullptr; + Combiner* combiner = nullptr; /// The result handler to be used by the resolver. UniquePtr result_handler; }; diff --git a/src/core/ext/filters/client_channel/resolver_registry.cc b/src/core/ext/filters/client_channel/resolver_registry.cc index 509c4ef38fa..0776551a9b6 100644 --- a/src/core/ext/filters/client_channel/resolver_registry.cc +++ b/src/core/ext/filters/client_channel/resolver_registry.cc @@ -145,7 +145,7 @@ bool ResolverRegistry::IsValidTarget(const char* target) { OrphanablePtr ResolverRegistry::CreateResolver( const char* target, const grpc_channel_args* args, - grpc_pollset_set* pollset_set, grpc_combiner* combiner, + grpc_pollset_set* pollset_set, Combiner* combiner, UniquePtr result_handler) { GPR_ASSERT(g_state != nullptr); grpc_uri* uri = nullptr; diff --git a/src/core/ext/filters/client_channel/resolver_registry.h b/src/core/ext/filters/client_channel/resolver_registry.h index 4248a065439..dd3ac4713e9 100644 --- a/src/core/ext/filters/client_channel/resolver_registry.h +++ b/src/core/ext/filters/client_channel/resolver_registry.h @@ -68,7 +68,7 @@ class ResolverRegistry { /// \a result_handler is used to return results from the resolver. static OrphanablePtr CreateResolver( const char* target, const grpc_channel_args* args, - grpc_pollset_set* pollset_set, grpc_combiner* combiner, + grpc_pollset_set* pollset_set, Combiner* combiner, UniquePtr result_handler); /// Returns the default authority to pass from a client for \a target. diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 458b03902c2..0192fcfb690 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -1260,8 +1260,7 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const { // XdsClient // -XdsClient::XdsClient(grpc_combiner* combiner, - grpc_pollset_set* interested_parties, +XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties, const char* balancer_name, StringView server_name, UniquePtr watcher, const grpc_channel_args& channel_args) diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index 5de6c37ad18..40bc2992036 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -68,7 +68,7 @@ class XdsClient : public InternallyRefCounted { virtual void OnError(grpc_error* error) = 0; }; - XdsClient(grpc_combiner* combiner, grpc_pollset_set* interested_parties, + XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties, const char* balancer_name, StringView server_name, UniquePtr watcher, const grpc_channel_args& channel_args); @@ -108,7 +108,7 @@ class XdsClient : public InternallyRefCounted { static RefCountedPtr GetFromChannelArgs( const grpc_channel_args& args); - grpc_combiner* combiner() { return combiner_; } + Combiner* combiner() { return combiner_; } private: class ChannelState; @@ -133,7 +133,7 @@ class XdsClient : public InternallyRefCounted { static const grpc_arg_pointer_vtable kXdsClientVtable; - grpc_combiner* combiner_; + Combiner* combiner_; grpc_pollset_set* interested_parties_; UniquePtr server_name_; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 2879d571d22..cf0fe3f26f5 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -104,11 +104,14 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false, /* forward declarations of various callbacks that we'll build closures around */ static void write_action_begin_locked(void* t, grpc_error* error); static void write_action(void* t, grpc_error* error); +static void write_action_end(void* t, grpc_error* error); static void write_action_end_locked(void* t, grpc_error* error); +static void read_action(void* t, grpc_error* error); static void read_action_locked(void* t, grpc_error* error); static void continue_read_action_locked(grpc_chttp2_transport* t); +static void complete_fetch(void* gs, grpc_error* error); static void complete_fetch_locked(void* gs, grpc_error* error); /** Set a transport level setting, and push it to our peer */ static void queue_setting_update(grpc_chttp2_transport* t, @@ -124,6 +127,8 @@ static void connectivity_state_set(grpc_chttp2_transport* t, grpc_connectivity_state state, const char* reason); +static void benign_reclaimer(void* t, grpc_error* error); +static void destructive_reclaimer(void* t, grpc_error* error); static void benign_reclaimer_locked(void* t, grpc_error* error); static void destructive_reclaimer_locked(void* t, grpc_error* error); @@ -136,6 +141,7 @@ static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error); static void schedule_bdp_ping_locked(grpc_chttp2_transport* t); static void start_bdp_ping_locked(void* tp, grpc_error* error); static void finish_bdp_ping_locked(void* tp, grpc_error* error); +static void next_bdp_ping_timer_expired(void* tp, grpc_error* error); static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error); static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error); @@ -145,9 +151,11 @@ static void send_ping_locked(grpc_chttp2_transport* t, static void retry_initiate_ping_locked(void* tp, grpc_error* error); /** keepalive-relevant functions */ +static void init_keepalive_ping(void* arg, grpc_error* error); static void init_keepalive_ping_locked(void* arg, grpc_error* error); static void start_keepalive_ping_locked(void* arg, grpc_error* error); static void finish_keepalive_ping_locked(void* arg, grpc_error* error); +static void keepalive_watchdog_fired(void* arg, grpc_error* error); static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error); static void reset_byte_stream(void* arg, grpc_error* error); @@ -378,33 +386,15 @@ static bool read_channel_args(grpc_chttp2_transport* t, } static void init_transport_closures(grpc_chttp2_transport* t) { - GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, - destructive_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked, - t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + nullptr); GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, - next_bdp_ping_timer_expired_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, - t, grpc_combiner_scheduler(t->combiner)); + nullptr); + GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, - start_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + start_keepalive_ping_locked, t, nullptr); GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, - finish_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, - keepalive_watchdog_fired_locked, t, - grpc_combiner_scheduler(t->combiner)); + finish_keepalive_ping_locked, t, nullptr); } static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { @@ -442,6 +432,8 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, + grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); @@ -556,9 +548,8 @@ static void destroy_transport_locked(void* tp, grpc_error* error) { static void destroy_transport(grpc_transport* gt) { grpc_chttp2_transport* t = reinterpret_cast(gt); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, - grpc_combiner_scheduler(t->combiner)), - GRPC_ERROR_NONE); + t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr), + GRPC_ERROR_NONE); } static void close_transport_locked(grpc_chttp2_transport* t, @@ -669,11 +660,7 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_slice_buffer_init(&frame_storage); grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&flow_controlled_buffer); - - GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, - grpc_combiner_scheduler(t->combiner)); - GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, - grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, nullptr); } grpc_chttp2_stream::~grpc_chttp2_stream() { @@ -766,9 +753,8 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, } s->destroy_stream_arg = then_schedule_closure; - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, - grpc_combiner_scheduler(t->combiner)), + t->combiner->Run( + GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr), GRPC_ERROR_NONE); } @@ -929,10 +915,9 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, * Also, 'write_action_begin_locked' only gathers the bytes into outbuf. * It does not call the endpoint to write the bytes. That is done by the * 'write_action' (which is scheduled by 'write_action_begin_locked') */ - GRPC_CLOSURE_SCHED( + t->combiner->FinallyRun( GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner)), + write_action_begin_locked, t, nullptr), GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: @@ -1006,11 +991,18 @@ static void write_action(void* gt, grpc_error* error) { t->cl = nullptr; grpc_endpoint_write( t->ep, &t->outbuf, - GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner)), + GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t, + grpc_schedule_on_exec_ctx), cl); } +static void write_action_end(void* tp, grpc_error* error) { + grpc_chttp2_transport* t = static_cast(tp); + t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked, + write_action_end_locked, t, nullptr), + GRPC_ERROR_REF(error)); +} + /* Callback from the grpc_endpoint after bytes have been written by calling * sendmsg */ static void write_action_end_locked(void* tp, grpc_error* error) { @@ -1051,10 +1043,9 @@ static void write_action_end_locked(void* tp, grpc_error* error) { if (!closed) { GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); } - GRPC_CLOSURE_RUN( + t->combiner->FinallyRun( GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner)), + write_action_begin_locked, t, nullptr), GRPC_ERROR_NONE); break; } @@ -1305,8 +1296,10 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, } s->fetching_send_message.reset(); return; /* early out */ - } else if (s->fetching_send_message->Next(UINT32_MAX, - &s->complete_fetch_locked)) { + } else if (s->fetching_send_message->Next( + UINT32_MAX, GRPC_CLOSURE_INIT(&s->complete_fetch_locked, + ::complete_fetch, s, + grpc_schedule_on_exec_ctx))) { grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice); if (error != GRPC_ERROR_NONE) { s->fetching_send_message.reset(); @@ -1318,6 +1311,13 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, } } +static void complete_fetch(void* gs, grpc_error* error) { + grpc_chttp2_stream* s = static_cast(gs); + s->t->combiner->Run(GRPC_CLOSURE_INIT(&s->complete_fetch_locked, + ::complete_fetch_locked, s, nullptr), + GRPC_ERROR_REF(error)); +} + static void complete_fetch_locked(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast(gs); grpc_chttp2_transport* t = s->t; @@ -1668,10 +1668,9 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); op->handler_private.extra_arg = gs; - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, - op, grpc_combiner_scheduler(t->combiner)), - GRPC_ERROR_NONE); + t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure, + perform_stream_op_locked, op, nullptr), + GRPC_ERROR_NONE); } static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { @@ -1681,7 +1680,7 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); + t->combiner->Run(&pq->lists[j]); } GRPC_ERROR_UNREF(error); } @@ -1727,6 +1726,13 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); } +void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error* error) { + grpc_chttp2_transport* t = static_cast(tp); + t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, + retry_initiate_ping_locked, t, nullptr), + GRPC_ERROR_REF(error)); +} + static void retry_initiate_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); t->ping_state.is_delayed_ping_timer_set = false; @@ -1744,7 +1750,7 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { gpr_free(from); return; } - GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); + t->combiner->Run(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); } @@ -1835,10 +1841,9 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { } op->handler_private.extra_arg = gt; GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure, - perform_transport_op_locked, op, - grpc_combiner_scheduler(t->combiner)), - GRPC_ERROR_NONE); + t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure, + perform_transport_op_locked, op, nullptr), + GRPC_ERROR_NONE); } /******************************************************************************* @@ -2479,6 +2484,13 @@ static grpc_error* try_http_parsing(grpc_chttp2_transport* t) { return error; } +static void read_action(void* tp, grpc_error* error) { + grpc_chttp2_transport* t = static_cast(tp); + t->combiner->Run( + GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr), + GRPC_ERROR_REF(error)); +} + static void read_action_locked(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("reading_action_locked", 0); @@ -2576,6 +2588,8 @@ static void read_action_locked(void* tp, grpc_error* error) { static void continue_read_action_locked(grpc_chttp2_transport* t) { const bool urgent = t->goaway_error != GRPC_ERROR_NONE; + GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t, + grpc_schedule_on_exec_ctx); grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent); grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr); } @@ -2618,10 +2632,20 @@ static void finish_bdp_ping_locked(void* tp, grpc_error* error) { nullptr); GPR_ASSERT(!t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = true; + GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, + next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx); grpc_timer_init(&t->next_bdp_ping_timer, next_ping, &t->next_bdp_ping_timer_expired_locked); } +static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) { + grpc_chttp2_transport* t = static_cast(tp); + t->combiner->Run( + GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, + next_bdp_ping_timer_expired_locked, t, nullptr), + GRPC_ERROR_REF(error)); +} + static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); GPR_ASSERT(t->have_next_bdp_ping_timer); @@ -2700,6 +2724,13 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } } +static void init_keepalive_ping(void* arg, grpc_error* error) { + grpc_chttp2_transport* t = static_cast(arg); + t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, + init_keepalive_ping_locked, t, nullptr), + GRPC_ERROR_REF(error)); +} + static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); @@ -2715,6 +2746,8 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, + grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); @@ -2722,6 +2755,8 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { } else if (error == GRPC_ERROR_CANCELLED) { /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, + grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); @@ -2741,6 +2776,8 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string); } GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); + GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, + keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, &t->keepalive_watchdog_fired_locked); @@ -2756,6 +2793,8 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; grpc_timer_cancel(&t->keepalive_watchdog_timer); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, + grpc_schedule_on_exec_ctx); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); @@ -2764,6 +2803,14 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } +static void keepalive_watchdog_fired(void* arg, grpc_error* error) { + grpc_chttp2_transport* t = static_cast(arg); + t->combiner->Run( + GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, + keepalive_watchdog_fired_locked, t, nullptr), + GRPC_ERROR_REF(error)); +} + static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { @@ -2864,10 +2911,9 @@ void Chttp2IncomingByteStream::OrphanLocked(void* arg, void Chttp2IncomingByteStream::Orphan() { GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - GRPC_CLOSURE_SCHED( + transport_->combiner->Run( GRPC_CLOSURE_INIT(&destroy_action_, - &Chttp2IncomingByteStream::OrphanLocked, this, - grpc_combiner_scheduler(transport_->combiner)), + &Chttp2IncomingByteStream::OrphanLocked, this, nullptr), GRPC_ERROR_NONE); } @@ -2924,10 +2970,9 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, Ref(); next_action_.max_size_hint = max_size_hint; next_action_.on_complete = on_complete; - GRPC_CLOSURE_SCHED( + transport_->combiner->Run( GRPC_CLOSURE_INIT(&next_action_.closure, - &Chttp2IncomingByteStream::NextLocked, this, - grpc_combiner_scheduler(transport_->combiner)), + &Chttp2IncomingByteStream::NextLocked, this, nullptr), GRPC_ERROR_NONE); return false; } @@ -2980,7 +3025,8 @@ grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { } } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); + stream_->t->combiner->Run(&stream_->reset_byte_stream, + GRPC_ERROR_REF(error)); return error; } return GRPC_ERROR_NONE; @@ -3000,7 +3046,8 @@ grpc_error* Chttp2IncomingByteStream::Push(const grpc_slice& slice, if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); - GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); + transport_->combiner->Run(&stream_->reset_byte_stream, + GRPC_ERROR_REF(error)); grpc_slice_unref_internal(slice); return error; } else { @@ -3020,7 +3067,8 @@ grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error, } } if (error != GRPC_ERROR_NONE && reset_on_error) { - GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); + transport_->combiner->Run(&stream_->reset_byte_stream, + GRPC_ERROR_REF(error)); } Unref(); return error; @@ -3040,6 +3088,8 @@ static void post_benign_reclaimer(grpc_chttp2_transport* t) { if (!t->benign_reclaimer_registered) { t->benign_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); + GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer, t, + grpc_schedule_on_exec_ctx); grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), false, &t->benign_reclaimer_locked); } @@ -3049,11 +3099,21 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) { if (!t->destructive_reclaimer_registered) { t->destructive_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); + GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, + destructive_reclaimer_locked, t, + grpc_schedule_on_exec_ctx); grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), true, &t->destructive_reclaimer_locked); } } +static void benign_reclaimer(void* arg, grpc_error* error) { + grpc_chttp2_transport* t = static_cast(arg); + t->combiner->Run(GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, + benign_reclaimer_locked, t, nullptr), + GRPC_ERROR_REF(error)); +} + static void benign_reclaimer_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); if (error == GRPC_ERROR_NONE && @@ -3083,6 +3143,13 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) { GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } +static void destructive_reclaimer(void* arg, grpc_error* error) { + grpc_chttp2_transport* t = static_cast(arg); + t->combiner->Run(GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, + destructive_reclaimer_locked, t, nullptr), + GRPC_ERROR_REF(error)); +} + static void destructive_reclaimer_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); size_t n = grpc_chttp2_stream_map_size(&t->stream_map); @@ -3209,5 +3276,7 @@ void grpc_chttp2_transport_start_reading( gpr_free(read_buffer); } t->notify_on_receive_settings = notify_on_receive_settings; - GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE); + t->combiner->Run( + GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr), + GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index a5142ffd96f..6c2496dc7ca 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1741,9 +1741,8 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, however -- it might be that we receive a RST_STREAM following this and can avoid the extra write */ GRPC_CHTTP2_STREAM_REF(s, "final_rst"); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(force_client_rst_stream, s, - grpc_combiner_finally_scheduler(t->combiner)), + t->combiner->FinallyRun( + GRPC_CLOSURE_CREATE(force_client_rst_stream, s, nullptr), GRPC_ERROR_NONE); } grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 57411de5450..935ec85e472 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -300,7 +300,7 @@ struct grpc_chttp2_transport { grpc_resource_user* resource_user; - grpc_combiner* combiner; + grpc_core::Combiner* combiner; grpc_closure* notify_on_receive_settings = nullptr; @@ -862,4 +862,6 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t, void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, bool is_client); +void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error* error); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index d6d9e4521f6..fa71d17c42c 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -96,6 +96,9 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { if (!t->ping_state.is_delayed_ping_timer_set) { t->ping_state.is_delayed_ping_timer_set = true; GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked"); + GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, + grpc_chttp2_retry_initiate_ping, t, + grpc_schedule_on_exec_ctx); grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping, &t->retry_initiate_ping_locked); } @@ -104,7 +107,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { pq->inflight_id = t->ping_ctr; t->ping_ctr++; - GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); + t->combiner->Run(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 5bd19a3897c..def95613a9d 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -309,8 +309,10 @@ static void combiner_finally_exec(Combiner* lock, grpc_closure* closure, grpc_core::ExecCtx::Get()->combiner_data()->active_combiner)); if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); - grpc_core::Combiner::Exec( - lock, GRPC_CLOSURE_CREATE(enqueue_finally, closure, nullptr), error); + lock->Run( + GRPC_CLOSURE_CREATE(enqueue_finally, closure, + reinterpret_cast(lock)), + error); return; } @@ -337,15 +339,26 @@ static void combiner_run(Combiner* lock, grpc_closure* closure, } static void enqueue_finally(void* closure, grpc_error* error) { - combiner_finally_exec(static_cast(closure), + grpc_closure* cl = static_cast(cl); + combiner_finally_exec(reinterpret_cast(cl->scheduler), cl, GRPC_ERROR_REF(error)); } -static void Combiner::Run(grpc_closure* closure, grpc_error* error) { - combiner_exec(combiner, closure, error); +void Combiner::Run(grpc_closure* closure, grpc_error* error) { + combiner_exec(this, closure, error); } -static void Combiner::FinallyRun(grpc_closure* closure, grpc_error* error) { - combiner_finally_exec(combiner, closure, exec); +void Combiner::Run(grpc_closure_list* list) { + grpc_closure* c = list->head; + while (c != nullptr) { + grpc_closure* next = c->next_data.next; + Run(c, c->error_data.error); + c = next; + } + list->head = list->tail = nullptr; +} + +void Combiner::FinallyRun(grpc_closure* closure, grpc_error* error) { + combiner_finally_exec(this, closure, error); } } // namespace grpc_core diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 58d1e540872..51929b3e12c 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -27,6 +27,32 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/exec_ctx.h" +namespace grpc_core { +class Combiner { + public: + void Run(grpc_closure* closure, grpc_error* error); + void Run(grpc_closure_list* list); + void FinallyRun(grpc_closure* closure, grpc_error* error); + Combiner* next_combiner_on_this_exec_ctx = nullptr; + grpc_closure_scheduler scheduler; + grpc_closure_scheduler finally_scheduler; + grpc_core::MultiProducerSingleConsumerQueue queue; + // either: + // a pointer to the initiating exec ctx if that is the only exec_ctx that has + // ever queued to this combiner, or NULL. If this is non-null, it's not + // dereferencable (since the initiating exec_ctx may have gone out of scope) + gpr_atm initiating_exec_ctx_or_null; + // state is: + // lower bit - zero if orphaned (STATE_UNORPHANED) + // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT) + gpr_atm state; + bool time_to_execute_final_list = false; + grpc_closure_list final_list; + grpc_closure offload; + gpr_refcount refs; +}; +} // namespace grpc_core + // Provides serialized access to some resource. // Each action queued on a combiner is executed serially in a borrowed thread. // The actual thread executing actions may change over time (but there will only @@ -59,30 +85,4 @@ bool grpc_combiner_continue_exec_ctx(); extern grpc_core::DebugOnlyTraceFlag grpc_combiner_trace; -namespace grpc_core { -class Combiner { - public: - static void Run(grpc_closure* closure, grpc_error* error); - static void FinallyRun(grpc_closure* closure, grpc_error* error); - Combiner* next_combiner_on_this_exec_ctx = nullptr; - grpc_closure_scheduler scheduler; - grpc_closure_scheduler finally_scheduler; - grpc_core::MultiProducerSingleConsumerQueue queue; - // either: - // a pointer to the initiating exec ctx if that is the only exec_ctx that has - // ever queued to this combiner, or NULL. If this is non-null, it's not - // dereferencable (since the initiating exec_ctx may have gone out of scope) - gpr_atm initiating_exec_ctx_or_null; - // state is: - // lower bit - zero if orphaned (STATE_UNORPHANED) - // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT) - gpr_atm state; - bool time_to_execute_final_list = false; - grpc_closure_list final_list; - grpc_closure offload; - gpr_refcount refs; -}; - -} // namespace grpc_core - #endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 0ccf2a878bf..812d52a2ab4 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -63,6 +63,7 @@ grpc_millis grpc_cycle_counter_to_millis_round_down(gpr_cycle_counter cycles); grpc_millis grpc_cycle_counter_to_millis_round_up(gpr_cycle_counter cycles); namespace grpc_core { +class Combiner; /** Execution context. * A bag of data that collects information along a callstack. * It is created on the stack at core entry points (public API or iomgr), and @@ -136,9 +137,9 @@ class ExecCtx { struct CombinerData { /* currently active combiner: updated only via combiner.c */ - grpc_combiner* active_combiner; + Combiner* active_combiner; /* last active combiner in the active combiner list */ - grpc_combiner* last_combiner; + Combiner* last_combiner; }; /** Only to be used by grpc-combiner code */ diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index a9d980f1f48..ce982a64106 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -132,7 +132,7 @@ struct grpc_resource_quota { /* Master combiner lock: all activity on a quota executes under this combiner * (so no mutex is needed for this data structure) */ - grpc_combiner* combiner; + grpc_core::Combiner* combiner; /* Size of the resource quota */ int64_t size; /* Amount of free memory in the resource quota */ diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index 84e33c79be9..41a6ed30477 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -26,6 +26,7 @@ #include #include +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/exec_ctx.h" namespace grpc_core { diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 7aa8ef412d4..9f7ae21418d 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -68,15 +68,14 @@ class AsyncConnectivityStateWatcherInterface protected: class Notifier; - explicit AsyncConnectivityStateWatcherInterface( - grpc_core::Combiner* combiner = nullptr) + explicit AsyncConnectivityStateWatcherInterface(Combiner* combiner = nullptr) : combiner_(combiner) {} // Invoked asynchronously when Notify() is called. virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0; private: - grpc_closure_scheduler* scheduler_; + Combiner* combiner_; }; // Tracks connectivity state. Maintains a list of watchers that are