EventEngine::RunAfter: handshaker (#31564)

* EventEngine::RunAfter migration for handshaker

* Fix build and add execution contexts to the top of the timer function
stack

* Add event_engine_ member object, remove OnTimeoutFn and self.reset()
before goes out of scope

* Run iwyu and fix_build_deps.py

* fix: more cleanup

* fix: restore unrelated files

* fix: run tools/distrib/clang_format_code.sh

* re: pass EventEngine as shared_ptr to HandshakeManager

* fix: ran tools/distrib/sanitize.sh

* fix: resolve review comment to initialize event_engine_ from the channel
args passed in DoHandshake instead of passing through constructor

* sanitize

* fix: resolve comments

* fix: one more
pull/31627/head
Yijie Ma 2 years ago committed by GitHub
parent 916a325b6c
commit 5a131bd94b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      BUILD
  2. 6
      include/grpc/event_engine/event_engine.h
  3. 35
      src/core/lib/transport/handshaker.cc
  4. 43
      src/core/lib/transport/handshaker.h
  5. 2
      test/core/security/BUILD
  6. 15
      test/core/security/ssl_server_fuzzer.cc

@ -2266,6 +2266,7 @@ grpc_cc_library(
"//src/core:lib/transport/handshaker.cc",
],
external_deps = [
"absl/base:core_headers",
"absl/container:inlined_vector",
"absl/status",
"absl/strings:str_format",
@ -2277,12 +2278,12 @@ grpc_cc_library(
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"debug_location",
"event_engine_base_hdrs",
"exec_ctx",
"gpr",
"grpc_base",
"grpc_public_hdrs",
"grpc_trace",
"iomgr_timer",
"ref_counted_ptr",
"//src/core:channel_args",
"//src/core:closure",

@ -400,7 +400,7 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
///
/// The \a closure will execute when time \a when arrives unless it has been
/// cancelled via the \a Cancel method. If cancelled, the closure will not be
/// run. Unilke the overloaded \a Closure alternative, the absl::AnyInvocable
/// run. Unlike the overloaded \a Closure alternative, the absl::AnyInvocable
/// version's \a closure will be deleted by the EventEngine after the closure
/// has been run, or upon cancellation.
///
@ -414,12 +414,12 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// If the associated closure has already been scheduled to run, it will not
/// be cancelled, and this function will return false.
///
/// If the associated callback has not been scheduled to run, it will be
/// If the associated closure has not been scheduled to run, it will be
/// cancelled, and the associated absl::AnyInvocable or \a Closure* will not
/// be executed. In this case, Cancel will return true.
///
/// Implementation note: closures should be destroyed in a timely manner after
/// execution or cancelliation (milliseconds), since any state bound to the
/// execution or cancellation (milliseconds), since any state bound to the
/// closure may need to be destroyed for things to progress (e.g., if a
/// closure holds a ref to some ref-counted object).
virtual bool Cancel(TaskHandle handle) = 0;

@ -28,6 +28,7 @@
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
@ -38,7 +39,6 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
namespace grpc_core {
@ -46,6 +46,8 @@ TraceFlag grpc_handshaker_trace(false, "handshaker");
namespace {
using ::grpc_event_engine::experimental::EventEngine;
std::string HandshakerArgsString(HandshakerArgs* args) {
size_t read_buffer_length =
args->read_buffer != nullptr ? args->read_buffer->length : 0;
@ -58,16 +60,19 @@ std::string HandshakerArgsString(HandshakerArgs* args) {
} // namespace
HandshakeManager::HandshakeManager() {}
HandshakeManager::HandshakeManager()
: RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)
? "HandshakeManager"
: nullptr) {}
void HandshakeManager::Add(RefCountedPtr<Handshaker> handshaker) {
MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_handshaker_trace)) {
gpr_log(
GPR_INFO,
"handshake_manager %p: adding handshaker %s [%p] at index %" PRIuPTR,
this, handshaker->name(), handshaker.get(), handshakers_.size());
}
MutexLock lock(&mu_);
handshakers_.push_back(std::move(handshaker));
}
@ -128,7 +133,7 @@ bool HandshakeManager::CallNextHandshakerLocked(grpc_error_handle error) {
}
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(&deadline_timer_);
event_engine_->Cancel(deadline_timer_handle_);
ExecCtx::Run(DEBUG_LOCATION, &on_handshake_done_, error);
is_shutdown_ = true;
} else {
@ -161,14 +166,6 @@ void HandshakeManager::CallNextHandshakerFn(void* arg,
}
}
void HandshakeManager::OnTimeoutFn(void* arg, grpc_error_handle error) {
auto* mgr = static_cast<HandshakeManager*>(arg);
if (error.ok()) { // Timer fired, rather than being cancelled
mgr->Shutdown(GRPC_ERROR_CREATE("Handshake timed out"));
}
mgr->Unref();
}
void HandshakeManager::DoHandshake(grpc_endpoint* endpoint,
const ChannelArgs& channel_args,
Timestamp deadline,
@ -201,10 +198,16 @@ void HandshakeManager::DoHandshake(grpc_endpoint* endpoint,
GRPC_CLOSURE_INIT(&on_handshake_done_, on_handshake_done, &args_,
grpc_schedule_on_exec_ctx);
// Start deadline timer, which owns a ref.
Ref().release();
GRPC_CLOSURE_INIT(&on_timeout_, &HandshakeManager::OnTimeoutFn, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&deadline_timer_, deadline, &on_timeout_);
const Duration time_to_deadline = deadline - Timestamp::Now();
event_engine_ = args_.args.GetObjectRef<EventEngine>();
deadline_timer_handle_ =
event_engine_->RunAfter(time_to_deadline, [self = Ref()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->Shutdown(GRPC_ERROR_CREATE("Handshake timed out"));
// HandshakeManager deletion might require an active ExecCtx.
self.reset();
});
// Start first handshaker, which also owns a ref.
Ref().release();
done = CallNextHandshakerLocked(absl::OkStatus());

@ -23,8 +23,12 @@
#include <stddef.h>
#include <memory>
#include "absl/base/thread_annotations.h"
#include "absl/container/inlined_vector.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include "src/core/lib/channel/channel_args.h"
@ -36,7 +40,6 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/timer.h"
namespace grpc_core {
@ -102,11 +105,11 @@ class HandshakeManager : public RefCounted<HandshakeManager> {
/// Adds a handshaker to the handshake manager.
/// Takes ownership of \a handshaker.
void Add(RefCountedPtr<Handshaker> handshaker);
void Add(RefCountedPtr<Handshaker> handshaker) ABSL_LOCKS_EXCLUDED(mu_);
/// Shuts down the handshake manager (e.g., to clean up when the operation is
/// aborted in the middle).
void Shutdown(grpc_error_handle why);
void Shutdown(grpc_error_handle why) ABSL_LOCKS_EXCLUDED(mu_);
/// Invokes handshakers in the order they were added.
/// Takes ownership of \a endpoint, and then passes that ownership to
@ -122,37 +125,39 @@ class HandshakeManager : public RefCounted<HandshakeManager> {
/// the arguments.
void DoHandshake(grpc_endpoint* endpoint, const ChannelArgs& channel_args,
Timestamp deadline, grpc_tcp_server_acceptor* acceptor,
grpc_iomgr_cb_func on_handshake_done, void* user_data);
grpc_iomgr_cb_func on_handshake_done, void* user_data)
ABSL_LOCKS_EXCLUDED(mu_);
private:
bool CallNextHandshakerLocked(grpc_error_handle error);
bool CallNextHandshakerLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// A function used as the handshaker-done callback when chaining
// handshakers together.
static void CallNextHandshakerFn(void* arg, grpc_error_handle error);
// Callback invoked when deadline is exceeded.
static void OnTimeoutFn(void* arg, grpc_error_handle error);
static void CallNextHandshakerFn(void* arg, grpc_error_handle error)
ABSL_LOCKS_EXCLUDED(mu_);
static const size_t HANDSHAKERS_INIT_SIZE = 2;
Mutex mu_;
bool is_shutdown_ = false;
bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
// An array of handshakers added via grpc_handshake_manager_add().
absl::InlinedVector<RefCountedPtr<Handshaker>, HANDSHAKERS_INIT_SIZE>
handshakers_;
handshakers_ ABSL_GUARDED_BY(mu_);
// The index of the handshaker to invoke next and closure to invoke it.
size_t index_ = 0;
grpc_closure call_next_handshaker_;
size_t index_ ABSL_GUARDED_BY(mu_) = 0;
grpc_closure call_next_handshaker_ ABSL_GUARDED_BY(mu_);
// The acceptor to call the handshakers with.
grpc_tcp_server_acceptor* acceptor_;
// Deadline timer across all handshakers.
grpc_timer deadline_timer_;
grpc_closure on_timeout_;
grpc_tcp_server_acceptor* acceptor_ ABSL_GUARDED_BY(mu_);
// The final callback and user_data to invoke after the last handshaker.
grpc_closure on_handshake_done_;
grpc_closure on_handshake_done_ ABSL_GUARDED_BY(mu_);
// Handshaker args.
HandshakerArgs args_;
HandshakerArgs args_ ABSL_GUARDED_BY(mu_);
// Deadline timer across all handshakers.
grpc_event_engine::experimental::EventEngine::TaskHandle
deadline_timer_handle_ ABSL_GUARDED_BY(mu_);
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_
ABSL_GUARDED_BY(mu_);
};
} // namespace grpc_core

@ -45,8 +45,10 @@ grpc_fuzzer(
language = "C++",
tags = ["no_windows"],
deps = [
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc",
"//src/core:default_event_engine",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],

@ -16,10 +16,12 @@
*
*/
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h"
@ -29,6 +31,9 @@
#define SERVER_CERT_PATH "src/core/tsi/test_creds/server1.pem"
#define SERVER_KEY_PATH "src/core/tsi/test_creds/server1.key"
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::GetDefaultEventEngine;
bool squelch = true;
// ssl has an array of global gpr_mu's that are never released.
// Turning this on will fail the leak check.
@ -96,10 +101,12 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
state.done_callback_called = false;
auto handshake_mgr =
grpc_core::MakeRefCounted<grpc_core::HandshakeManager>();
sc->add_handshakers(grpc_core::ChannelArgs(), nullptr, handshake_mgr.get());
handshake_mgr->DoHandshake(mock_endpoint, grpc_core::ChannelArgs(),
deadline, nullptr /* acceptor */,
on_handshake_done, &state);
auto channel_args = grpc_core::ChannelArgs().SetObject<EventEngine>(
GetDefaultEventEngine());
sc->add_handshakers(channel_args, nullptr, handshake_mgr.get());
handshake_mgr->DoHandshake(mock_endpoint, channel_args, deadline,
nullptr /* acceptor */, on_handshake_done,
&state);
grpc_core::ExecCtx::Get()->Flush();
// If the given string happens to be part of the correct client hello, the

Loading…
Cancel
Save