diff --git a/src/core/lib/event_engine/trace.cc b/src/core/lib/event_engine/trace.cc index 52cbd35f4f5..991cd2d4775 100644 --- a/src/core/lib/event_engine/trace.cc +++ b/src/core/lib/event_engine/trace.cc @@ -16,3 +16,9 @@ #include "src/core/lib/debug/trace.h" grpc_core::TraceFlag grpc_event_engine_trace(false, "event_engine"); +grpc_core::TraceFlag grpc_event_engine_endpoint_trace(false, + "event_engine_endpoint"); +grpc_core::TraceFlag grpc_event_engine_endpoint_data_trace( + false, "event_engine_endpoint_data"); +grpc_core::TraceFlag grpc_event_engine_poller_trace(false, + "event_engine_poller"); diff --git a/src/core/lib/event_engine/trace.h b/src/core/lib/event_engine/trace.h index edf8fcf2275..63511d161fc 100644 --- a/src/core/lib/event_engine/trace.h +++ b/src/core/lib/event_engine/trace.h @@ -21,10 +21,23 @@ #include "src/core/lib/debug/trace.h" extern grpc_core::TraceFlag grpc_event_engine_trace; +extern grpc_core::TraceFlag grpc_event_engine_endpoint_data_trace; +extern grpc_core::TraceFlag grpc_event_engine_poller_trace; +extern grpc_core::TraceFlag grpc_event_engine_endpoint_trace; #define GRPC_EVENT_ENGINE_TRACE(format, ...) \ if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { \ gpr_log(GPR_DEBUG, "(event_engine) " format, __VA_ARGS__); \ } +#define GRPC_EVENT_ENGINE_ENDPOINT_TRACE(format, ...) \ + if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_endpoint_trace)) { \ + gpr_log(GPR_DEBUG, "(event_engine endpoint) " format, __VA_ARGS__); \ + } + +#define GRPC_EVENT_ENGINE_POLLER_TRACE(format, ...) \ + if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_poller_trace)) { \ + gpr_log(GPR_DEBUG, "(event_engine poller) " format, __VA_ARGS__); \ + } + #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_TRACE_H diff --git a/src/core/lib/event_engine/windows/iocp.cc b/src/core/lib/event_engine/windows/iocp.cc index 96277b2f62e..9677156fb66 100644 --- a/src/core/lib/event_engine/windows/iocp.cc +++ b/src/core/lib/event_engine/windows/iocp.cc @@ -27,6 +27,7 @@ #include "src/core/lib/event_engine/windows/iocp.h" #include "src/core/lib/event_engine/windows/win_socket.h" #include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/iomgr/error.h" namespace grpc_event_engine { namespace experimental { @@ -48,21 +49,18 @@ std::unique_ptr IOCP::Watch(SOCKET socket) { reinterpret_cast(socket), iocp_handle_, reinterpret_cast(wrapped_socket.get()), 0); if (!ret) { - char* utf8_message = gpr_format_message(WSAGetLastError()); - gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message); - gpr_free(utf8_message); - __debugbreak(); - abort(); + grpc_core::Crash( + GRPC_WSA_ERROR(WSAGetLastError(), "Unable to add socket to iocp") + .ToString()); } GPR_ASSERT(ret == iocp_handle_); return wrapped_socket; } void IOCP::Shutdown() { - if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { - gpr_log(GPR_DEBUG, "IOCP::%p shutting down. Outstanding kicks: %d", this, - outstanding_kicks_.load()); - } + GRPC_EVENT_ENGINE_POLLER_TRACE( + "IOCP::%p shutting down. Outstanding kicks: %d", this, + outstanding_kicks_.load()); while (outstanding_kicks_.load() > 0) { Work(std::chrono::hours(42), []() {}); } @@ -74,23 +72,17 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout, DWORD bytes = 0; ULONG_PTR completion_key; LPOVERLAPPED overlapped; - if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { - gpr_log(GPR_DEBUG, "IOCP::%p doing work", this); - } + GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p doing work", this); BOOL success = GetQueuedCompletionStatus( iocp_handle_, &bytes, &completion_key, &overlapped, static_cast(Milliseconds(timeout))); if (success == 0 && overlapped == nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { - gpr_log(GPR_DEBUG, "IOCP::%p deadline exceeded", this); - } + GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p deadline exceeded", this); return Poller::WorkResult::kDeadlineExceeded; } GPR_ASSERT(completion_key && overlapped); if (overlapped == &kick_overlap_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { - gpr_log(GPR_DEBUG, "IOCP::%p kicked", this); - } + GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p kicked", this); outstanding_kicks_.fetch_sub(1); if (completion_key == (ULONG_PTR)&kick_token_) { return Poller::WorkResult::kKicked; @@ -98,10 +90,8 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout, grpc_core::Crash( absl::StrFormat("Unknown custom completion key: %lu", completion_key)); } - if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { - gpr_log(GPR_DEBUG, "IOCP::%p got event on OVERLAPPED::%p", this, - overlapped); - } + GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p got event on OVERLAPPED::%p", this, + overlapped); WinSocket* socket = reinterpret_cast(completion_key); // TODO(hork): move the following logic into the WinSocket impl. WinSocket::OpState* info = socket->GetOpInfoForOverlapped(overlapped); diff --git a/src/core/lib/event_engine/windows/win_socket.cc b/src/core/lib/event_engine/windows/win_socket.cc index c6a9f3c7423..583f8ee2e8b 100644 --- a/src/core/lib/event_engine/windows/win_socket.cc +++ b/src/core/lib/event_engine/windows/win_socket.cc @@ -40,22 +40,23 @@ WinSocket::WinSocket(SOCKET socket, Executor* executor) noexcept read_info_(OpState(this)), write_info_(OpState(this)) {} -WinSocket::~WinSocket() { GPR_ASSERT(is_shutdown_.load()); } +WinSocket::~WinSocket() { + GPR_ASSERT(is_shutdown_.load()); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p destroyed", this); +} SOCKET WinSocket::socket() { return socket_; } void WinSocket::MaybeShutdown(absl::Status why) { // if already shutdown, return early. Otherwise, set the shutdown flag. if (is_shutdown_.exchange(true)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { - gpr_log(GPR_DEBUG, "WinSocket::%p already shutting down", this); - } + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p already shutting down", + this); return; } - if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { - gpr_log(GPR_DEBUG, "WinSocket::%p shutting down now. Reason: %s", this, - why.ToString().c_str()); - } + GRPC_EVENT_ENGINE_ENDPOINT_TRACE( + "WinSocket::%p shutting down now. Reason: %s", this, + why.ToString().c_str()); // Grab the function pointer for DisconnectEx for that specific socket. // It may change depending on the interface. GUID guid = WSAID_DISCONNECTEX; @@ -74,6 +75,7 @@ void WinSocket::MaybeShutdown(absl::Status why) { gpr_free(utf8_message); } closesocket(socket_); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p socket closed", this); } void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) { @@ -97,6 +99,8 @@ void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) { NotifyOnReady(write_info_, on_write); } +// ---- WinSocket::OpState ---- + WinSocket::OpState::OpState(WinSocket* win_socket) noexcept : win_socket_(win_socket), closure_(nullptr) { memset(&overlapped_, 0, sizeof(OVERLAPPED)); @@ -132,13 +136,10 @@ void WinSocket::SetWritable() { write_info_.SetReady(); } bool WinSocket::IsShutdown() { return is_shutdown_.load(); } WinSocket::OpState* WinSocket::GetOpInfoForOverlapped(OVERLAPPED* overlapped) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { - gpr_log(GPR_DEBUG, - "WinSocket::%p looking for matching OVERLAPPED::%p. " - "read(%p) write(%p)", - this, overlapped, &read_info_.overlapped_, - &write_info_.overlapped_); - } + GRPC_EVENT_ENGINE_POLLER_TRACE( + "WinSocket::%p looking for matching OVERLAPPED::%p. " + "read(%p) write(%p)", + this, overlapped, &read_info_.overlapped_, &write_info_.overlapped_); if (overlapped == &read_info_.overlapped_) return &read_info_; if (overlapped == &write_info_.overlapped_) return &write_info_; return nullptr; diff --git a/src/core/lib/event_engine/windows/windows_endpoint.cc b/src/core/lib/event_engine/windows/windows_endpoint.cc index 9c12f27fdb4..478f8e17264 100644 --- a/src/core/lib/event_engine/windows/windows_endpoint.cc +++ b/src/core/lib/event_engine/windows/windows_endpoint.cc @@ -81,7 +81,7 @@ WindowsEndpoint::~WindowsEndpoint() { void WindowsEndpoint::Read(absl::AnyInvocable on_read, SliceBuffer* buffer, const ReadArgs* args) { // TODO(hork): last_read_buffer from iomgr: Is it only garbage, or optimized? - GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p reading", this); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p reading", this); // Prepare the WSABUF struct WSABUF wsa_buffers[kMaxWSABUFCount]; int min_read_size = kDefaultTargetReadSize; @@ -136,7 +136,8 @@ void WindowsEndpoint::Read(absl::AnyInvocable on_read, void WindowsEndpoint::Write(absl::AnyInvocable on_writable, SliceBuffer* data, const WriteArgs* /* args */) { - if (grpc_event_engine_trace.enabled()) { + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p writing", this); + if (grpc_event_engine_endpoint_data_trace.enabled()) { for (int i = 0; i < data->Count(); i++) { auto str = data->RefSlice(i).as_string_view(); gpr_log(GPR_INFO, "WindowsEndpoint::%p WRITE (peer=%s): %.*s", this, @@ -219,7 +220,8 @@ WindowsEndpoint::BaseEventClosure::BaseEventClosure(WindowsEndpoint* endpoint) : cb_(&AbortOnEvent), endpoint_(endpoint) {} void WindowsEndpoint::HandleReadClosure::Run() { - GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p Handling Read Event", endpoint_); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Read Event", + endpoint_); absl::Status status; auto* read_info = endpoint_->socket_->read_info(); auto cb_cleanup = absl::MakeCleanup([this, &status]() { @@ -239,7 +241,7 @@ void WindowsEndpoint::HandleReadClosure::Run() { read_info->bytes_transferred()); } GPR_ASSERT(read_info->bytes_transferred() == buffer_->Length()); - if (grpc_event_engine_trace.enabled()) { + if (grpc_event_engine_endpoint_data_trace.enabled()) { for (int i = 0; i < buffer_->Count(); i++) { auto str = buffer_->RefSlice(i).as_string_view(); gpr_log(GPR_INFO, "WindowsEndpoint::%p READ (peer=%s): %.*s", this, @@ -256,8 +258,8 @@ void WindowsEndpoint::HandleReadClosure::Run() { } void WindowsEndpoint::HandleWriteClosure::Run() { - GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p Handling Write Event", - endpoint_); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Write Event", + endpoint_); auto* write_info = endpoint_->socket_->write_info(); auto cb = std::move(cb_); cb_ = &AbortOnEvent; diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc index 7391598c66f..45720c05c36 100644 --- a/src/core/lib/event_engine/windows/windows_engine.cc +++ b/src/core/lib/event_engine/windows/windows_engine.cc @@ -128,6 +128,9 @@ WindowsEventEngine::~WindowsEventEngine() { bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) { grpc_core::MutexLock lock(&task_mu_); if (!known_handles_.contains(handle)) return false; + GRPC_EVENT_ENGINE_TRACE( + "WindowsEventEngine::%p cancelling %s", this, + HandleToString(handle).c_str()); auto* cd = reinterpret_cast(handle.keys[0]); bool r = timer_manager_.TimerCancel(&cd->timer); known_handles_.erase(handle); diff --git a/src/core/lib/event_engine/windows/windows_engine.h b/src/core/lib/event_engine/windows/windows_engine.h index 66dd486dfea..6d31084f4bd 100644 --- a/src/core/lib/event_engine/windows/windows_engine.h +++ b/src/core/lib/event_engine/windows/windows_engine.h @@ -45,12 +45,6 @@ namespace experimental { class WindowsEventEngine : public EventEngine, public grpc_core::KeepsGrpcInitialized { public: - class WindowsListener : public EventEngine::Listener { - public: - ~WindowsListener() override; - absl::StatusOr Bind(const ResolvedAddress& addr) override; - absl::Status Start() override; - }; class WindowsDNSResolver : public EventEngine::DNSResolver { public: ~WindowsDNSResolver() override; diff --git a/src/core/lib/iomgr/tcp_server_windows.cc b/src/core/lib/iomgr/tcp_server_windows.cc index bf5e24b4bdf..67d5f9adb7e 100644 --- a/src/core/lib/iomgr/tcp_server_windows.cc +++ b/src/core/lib/iomgr/tcp_server_windows.cc @@ -456,7 +456,6 @@ static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s, grpc_resolved_address addr6_v4mapped; grpc_resolved_address wildcard; grpc_resolved_address* allocated_addr = NULL; - grpc_resolved_address sockname_temp; unsigned port_index = 0; grpc_error_handle error; @@ -468,6 +467,7 @@ static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s, // as some previously created listener. if (grpc_sockaddr_get_port(addr) == 0) { for (sp = s->head; sp; sp = sp->next) { + grpc_resolved_address sockname_temp; int sockname_temp_len = sizeof(struct sockaddr_storage); if (0 == getsockname(sp->socket->socket, (grpc_sockaddr*)sockname_temp.addr,