diff --git a/src/core/BUILD b/src/core/BUILD index ddd38f6c4f8..2a9a6c81cfa 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4791,6 +4791,7 @@ grpc_cc_library( "ext/transport/chttp2/client/chttp2_connector.h", ], external_deps = [ + "absl/base:core_headers", "absl/status", "absl/status:statusor", "absl/strings:str_format", @@ -4808,6 +4809,7 @@ grpc_cc_library( "resolved_address", "status_helper", "tcp_connect_handshaker", + "time", "transport_fwd", "unique_type_name", "//:config", @@ -4822,7 +4824,6 @@ grpc_cc_library( "//:grpc_trace", "//:grpc_transport_chttp2", "//:handshaker", - "//:iomgr_timer", "//:orphanable", "//:ref_counted_ptr", "//:sockaddr_utils", diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index b6c9ed938c3..36037811f27 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -54,6 +54,7 @@ #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -83,6 +84,8 @@ namespace grpc_core { +using ::grpc_event_engine::experimental::EventEngine; + namespace { void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure, grpc_error_handle error) { @@ -107,6 +110,7 @@ void Chttp2Connector::Connect(const Args& args, Result* result, result_ = result; notify_ = notify; GPR_ASSERT(endpoint_ == nullptr); + event_engine_ = args_.channel_args.GetObject(); } absl::StatusOr address = grpc_sockaddr_to_uri(args.address); if (!address.ok()) { @@ -171,13 +175,16 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) { self->Ref().release(); // Ref held by OnReceiveSettings() GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self, grpc_schedule_on_exec_ctx); - self->Ref().release(); // Ref held by OnTimeout() grpc_chttp2_transport_start_reading(self->result_->transport, args->read_buffer, &self->on_receive_settings_, nullptr); - GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self, - grpc_schedule_on_exec_ctx); - grpc_timer_init(&self->timer_, self->args_.deadline, &self->on_timeout_); + RefCountedPtr cc = self->Ref(); + self->timer_handle_ = self->event_engine_->RunAfter( + self->args_.deadline - Timestamp::Now(), [self = std::move(cc)] { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + self->OnTimeout(); + }); } else { // If the handshaking succeeded but there is no endpoint, then the // handshaker may have handed off the connection to some external @@ -205,7 +212,14 @@ void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) { self->result_->Reset(); } self->MaybeNotify(error); - grpc_timer_cancel(&self->timer_); + if (self->timer_handle_.has_value()) { + if (self->event_engine_->Cancel(*self->timer_handle_)) { + // If we have cancelled the timer successfully, call Notify() again + // since the timer callback will not be called now. + self->MaybeNotify(absl::OkStatus()); + } + self->timer_handle_.reset(); + } } else { // OnTimeout() was already invoked. Call Notify() again so that notify_ // can be invoked. @@ -215,28 +229,24 @@ void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) { self->Unref(); } -void Chttp2Connector::OnTimeout(void* arg, grpc_error_handle /*error*/) { - Chttp2Connector* self = static_cast(arg); - { - MutexLock lock(&self->mu_); - if (!self->notify_error_.has_value()) { - // The transport did not receive the settings frame in time. Destroy the - // transport. - grpc_endpoint_delete_from_pollset_set(self->endpoint_, - self->args_.interested_parties); - // TODO(yashykt): The following two lines should be moved to - // SubchannelConnector::Result::Reset() - grpc_transport_destroy(self->result_->transport); - self->result_->Reset(); - self->MaybeNotify(GRPC_ERROR_CREATE( - "connection attempt timed out before receiving SETTINGS frame")); - } else { - // OnReceiveSettings() was already invoked. Call Notify() again so that - // notify_ can be invoked. - self->MaybeNotify(absl::OkStatus()); - } +void Chttp2Connector::OnTimeout() { + MutexLock lock(&mu_); + timer_handle_.reset(); + if (!notify_error_.has_value()) { + // The transport did not receive the settings frame in time. Destroy the + // transport. + grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties); + // TODO(yashykt): The following two lines should be moved to + // SubchannelConnector::Result::Reset() + grpc_transport_destroy(result_->transport); + result_->Reset(); + MaybeNotify(GRPC_ERROR_CREATE( + "connection attempt timed out before receiving SETTINGS frame")); + } else { + // OnReceiveSettings() was already invoked. Call Notify() again so that + // notify_ can be invoked. + MaybeNotify(absl::OkStatus()); } - self->Unref(); } void Chttp2Connector::MaybeNotify(grpc_error_handle error) { diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h index 9ffb0fe556a..72587d53423 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.h +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -21,15 +21,17 @@ #include +#include "absl/base/thread_annotations.h" #include "absl/types/optional.h" +#include + #include "src/core/ext/filters/client_channel/connector.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/handshaker.h" namespace grpc_core { @@ -44,7 +46,7 @@ class Chttp2Connector : public SubchannelConnector { private: static void OnHandshakeDone(void* arg, grpc_error_handle error); static void OnReceiveSettings(void* arg, grpc_error_handle error); - static void OnTimeout(void* arg, grpc_error_handle error); + void OnTimeout() ABSL_LOCKS_EXCLUDED(mu_); // We cannot invoke notify_ until both OnTimeout() and OnReceiveSettings() // have been called since that is an indicator to the upper layer that we are @@ -66,8 +68,12 @@ class Chttp2Connector : public SubchannelConnector { // the handshake manager, and then again after handshake is done. grpc_endpoint* endpoint_ = nullptr; grpc_closure on_receive_settings_; - grpc_timer timer_; - grpc_closure on_timeout_; + absl::optional + timer_handle_ ABSL_GUARDED_BY(mu_); + // A raw pointer will suffice since args_ holds a copy of the ChannelArgs + // which holds an std::shared_ptr of the EventEngine. + grpc_event_engine::experimental::EventEngine* event_engine_ + ABSL_GUARDED_BY(mu_); absl::optional notify_error_; RefCountedPtr handshake_mgr_; };