[EventEngine] RunAfter migration: Chttp2Connector (#31877)

* initial commit

* fix bug

* review

* use raw pointer

* remove unused header

* better comments

* use self->args_

* review
pull/31900/head
Yijie Ma 2 years ago committed by GitHub
parent 5100b33e0e
commit f5584c2c02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/core/BUILD
  2. 62
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  3. 14
      src/core/ext/transport/chttp2/client/chttp2_connector.h

@ -4791,6 +4791,7 @@ grpc_cc_library(
"ext/transport/chttp2/client/chttp2_connector.h",
],
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/status:statusor",
"absl/strings:str_format",
@ -4808,6 +4809,7 @@ grpc_cc_library(
"resolved_address",
"status_helper",
"tcp_connect_handshaker",
"time",
"transport_fwd",
"unique_type_name",
"//:config",
@ -4822,7 +4824,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:grpc_transport_chttp2",
"//:handshaker",
"//:iomgr_timer",
"//:orphanable",
"//:ref_counted_ptr",
"//:sockaddr_utils",

@ -54,6 +54,7 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -83,6 +84,8 @@
namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine;
namespace {
void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
grpc_error_handle error) {
@ -107,6 +110,7 @@ void Chttp2Connector::Connect(const Args& args, Result* result,
result_ = result;
notify_ = notify;
GPR_ASSERT(endpoint_ == nullptr);
event_engine_ = args_.channel_args.GetObject<EventEngine>();
}
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(args.address);
if (!address.ok()) {
@ -171,13 +175,16 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
self->Ref().release(); // Ref held by OnReceiveSettings()
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
grpc_schedule_on_exec_ctx);
self->Ref().release(); // Ref held by OnTimeout()
grpc_chttp2_transport_start_reading(self->result_->transport,
args->read_buffer,
&self->on_receive_settings_, nullptr);
GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&self->timer_, self->args_.deadline, &self->on_timeout_);
RefCountedPtr<Chttp2Connector> cc = self->Ref();
self->timer_handle_ = self->event_engine_->RunAfter(
self->args_.deadline - Timestamp::Now(), [self = std::move(cc)] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnTimeout();
});
} else {
// If the handshaking succeeded but there is no endpoint, then the
// handshaker may have handed off the connection to some external
@ -205,7 +212,14 @@ void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
self->result_->Reset();
}
self->MaybeNotify(error);
grpc_timer_cancel(&self->timer_);
if (self->timer_handle_.has_value()) {
if (self->event_engine_->Cancel(*self->timer_handle_)) {
// If we have cancelled the timer successfully, call Notify() again
// since the timer callback will not be called now.
self->MaybeNotify(absl::OkStatus());
}
self->timer_handle_.reset();
}
} else {
// OnTimeout() was already invoked. Call Notify() again so that notify_
// can be invoked.
@ -215,28 +229,24 @@ void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
self->Unref();
}
void Chttp2Connector::OnTimeout(void* arg, grpc_error_handle /*error*/) {
Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
{
MutexLock lock(&self->mu_);
if (!self->notify_error_.has_value()) {
// The transport did not receive the settings frame in time. Destroy the
// transport.
grpc_endpoint_delete_from_pollset_set(self->endpoint_,
self->args_.interested_parties);
// TODO(yashykt): The following two lines should be moved to
// SubchannelConnector::Result::Reset()
grpc_transport_destroy(self->result_->transport);
self->result_->Reset();
self->MaybeNotify(GRPC_ERROR_CREATE(
"connection attempt timed out before receiving SETTINGS frame"));
} else {
// OnReceiveSettings() was already invoked. Call Notify() again so that
// notify_ can be invoked.
self->MaybeNotify(absl::OkStatus());
}
void Chttp2Connector::OnTimeout() {
MutexLock lock(&mu_);
timer_handle_.reset();
if (!notify_error_.has_value()) {
// The transport did not receive the settings frame in time. Destroy the
// transport.
grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties);
// TODO(yashykt): The following two lines should be moved to
// SubchannelConnector::Result::Reset()
grpc_transport_destroy(result_->transport);
result_->Reset();
MaybeNotify(GRPC_ERROR_CREATE(
"connection attempt timed out before receiving SETTINGS frame"));
} else {
// OnReceiveSettings() was already invoked. Call Notify() again so that
// notify_ can be invoked.
MaybeNotify(absl::OkStatus());
}
self->Unref();
}
void Chttp2Connector::MaybeNotify(grpc_error_handle error) {

@ -21,15 +21,17 @@
#include <grpc/support/port_platform.h>
#include "absl/base/thread_annotations.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/transport/handshaker.h"
namespace grpc_core {
@ -44,7 +46,7 @@ class Chttp2Connector : public SubchannelConnector {
private:
static void OnHandshakeDone(void* arg, grpc_error_handle error);
static void OnReceiveSettings(void* arg, grpc_error_handle error);
static void OnTimeout(void* arg, grpc_error_handle error);
void OnTimeout() ABSL_LOCKS_EXCLUDED(mu_);
// We cannot invoke notify_ until both OnTimeout() and OnReceiveSettings()
// have been called since that is an indicator to the upper layer that we are
@ -66,8 +68,12 @@ class Chttp2Connector : public SubchannelConnector {
// the handshake manager, and then again after handshake is done.
grpc_endpoint* endpoint_ = nullptr;
grpc_closure on_receive_settings_;
grpc_timer timer_;
grpc_closure on_timeout_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
timer_handle_ ABSL_GUARDED_BY(mu_);
// A raw pointer will suffice since args_ holds a copy of the ChannelArgs
// which holds an std::shared_ptr of the EventEngine.
grpc_event_engine::experimental::EventEngine* event_engine_
ABSL_GUARDED_BY(mu_);
absl::optional<grpc_error_handle> notify_error_;
RefCountedPtr<HandshakeManager> handshake_mgr_;
};

Loading…
Cancel
Save