[EventEngine] Add more granular trace flags (#32376)

The set of trace flags is now:
* event_engine
* event_engine_endpoint
* event_engine_endpoint_data: additionally log all sent/received data,
similar to what the shims do.
* event_engine_poller

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32394/head
AJ Heller 2 years ago committed by GitHub
parent 9dd4b7757d
commit dd07fd8669
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/lib/event_engine/trace.cc
  2. 13
      src/core/lib/event_engine/trace.h
  3. 34
      src/core/lib/event_engine/windows/iocp.cc
  4. 31
      src/core/lib/event_engine/windows/win_socket.cc
  5. 14
      src/core/lib/event_engine/windows/windows_endpoint.cc
  6. 3
      src/core/lib/event_engine/windows/windows_engine.cc
  7. 6
      src/core/lib/event_engine/windows/windows_engine.h
  8. 2
      src/core/lib/iomgr/tcp_server_windows.cc

@ -16,3 +16,9 @@
#include "src/core/lib/debug/trace.h"
grpc_core::TraceFlag grpc_event_engine_trace(false, "event_engine");
grpc_core::TraceFlag grpc_event_engine_endpoint_trace(false,
"event_engine_endpoint");
grpc_core::TraceFlag grpc_event_engine_endpoint_data_trace(
false, "event_engine_endpoint_data");
grpc_core::TraceFlag grpc_event_engine_poller_trace(false,
"event_engine_poller");

@ -21,10 +21,23 @@
#include "src/core/lib/debug/trace.h"
extern grpc_core::TraceFlag grpc_event_engine_trace;
extern grpc_core::TraceFlag grpc_event_engine_endpoint_data_trace;
extern grpc_core::TraceFlag grpc_event_engine_poller_trace;
extern grpc_core::TraceFlag grpc_event_engine_endpoint_trace;
#define GRPC_EVENT_ENGINE_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { \
gpr_log(GPR_DEBUG, "(event_engine) " format, __VA_ARGS__); \
}
#define GRPC_EVENT_ENGINE_ENDPOINT_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_endpoint_trace)) { \
gpr_log(GPR_DEBUG, "(event_engine endpoint) " format, __VA_ARGS__); \
}
#define GRPC_EVENT_ENGINE_POLLER_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_poller_trace)) { \
gpr_log(GPR_DEBUG, "(event_engine poller) " format, __VA_ARGS__); \
}
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_TRACE_H

@ -27,6 +27,7 @@
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/win_socket.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/iomgr/error.h"
namespace grpc_event_engine {
namespace experimental {
@ -48,21 +49,18 @@ std::unique_ptr<WinSocket> IOCP::Watch(SOCKET socket) {
reinterpret_cast<HANDLE>(socket), iocp_handle_,
reinterpret_cast<uintptr_t>(wrapped_socket.get()), 0);
if (!ret) {
char* utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "Unable to add socket to iocp: %s", utf8_message);
gpr_free(utf8_message);
__debugbreak();
abort();
grpc_core::Crash(
GRPC_WSA_ERROR(WSAGetLastError(), "Unable to add socket to iocp")
.ToString());
}
GPR_ASSERT(ret == iocp_handle_);
return wrapped_socket;
}
void IOCP::Shutdown() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "IOCP::%p shutting down. Outstanding kicks: %d", this,
outstanding_kicks_.load());
}
GRPC_EVENT_ENGINE_POLLER_TRACE(
"IOCP::%p shutting down. Outstanding kicks: %d", this,
outstanding_kicks_.load());
while (outstanding_kicks_.load() > 0) {
Work(std::chrono::hours(42), []() {});
}
@ -74,23 +72,17 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
DWORD bytes = 0;
ULONG_PTR completion_key;
LPOVERLAPPED overlapped;
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "IOCP::%p doing work", this);
}
GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p doing work", this);
BOOL success = GetQueuedCompletionStatus(
iocp_handle_, &bytes, &completion_key, &overlapped,
static_cast<DWORD>(Milliseconds(timeout)));
if (success == 0 && overlapped == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "IOCP::%p deadline exceeded", this);
}
GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p deadline exceeded", this);
return Poller::WorkResult::kDeadlineExceeded;
}
GPR_ASSERT(completion_key && overlapped);
if (overlapped == &kick_overlap_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "IOCP::%p kicked", this);
}
GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p kicked", this);
outstanding_kicks_.fetch_sub(1);
if (completion_key == (ULONG_PTR)&kick_token_) {
return Poller::WorkResult::kKicked;
@ -98,10 +90,8 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
grpc_core::Crash(
absl::StrFormat("Unknown custom completion key: %lu", completion_key));
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "IOCP::%p got event on OVERLAPPED::%p", this,
overlapped);
}
GRPC_EVENT_ENGINE_POLLER_TRACE("IOCP::%p got event on OVERLAPPED::%p", this,
overlapped);
WinSocket* socket = reinterpret_cast<WinSocket*>(completion_key);
// TODO(hork): move the following logic into the WinSocket impl.
WinSocket::OpState* info = socket->GetOpInfoForOverlapped(overlapped);

@ -40,22 +40,23 @@ WinSocket::WinSocket(SOCKET socket, Executor* executor) noexcept
read_info_(OpState(this)),
write_info_(OpState(this)) {}
WinSocket::~WinSocket() { GPR_ASSERT(is_shutdown_.load()); }
WinSocket::~WinSocket() {
GPR_ASSERT(is_shutdown_.load());
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p destroyed", this);
}
SOCKET WinSocket::socket() { return socket_; }
void WinSocket::MaybeShutdown(absl::Status why) {
// if already shutdown, return early. Otherwise, set the shutdown flag.
if (is_shutdown_.exchange(true)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "WinSocket::%p already shutting down", this);
}
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p already shutting down",
this);
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "WinSocket::%p shutting down now. Reason: %s", this,
why.ToString().c_str());
}
GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
"WinSocket::%p shutting down now. Reason: %s", this,
why.ToString().c_str());
// Grab the function pointer for DisconnectEx for that specific socket.
// It may change depending on the interface.
GUID guid = WSAID_DISCONNECTEX;
@ -74,6 +75,7 @@ void WinSocket::MaybeShutdown(absl::Status why) {
gpr_free(utf8_message);
}
closesocket(socket_);
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WinSocket::%p socket closed", this);
}
void WinSocket::NotifyOnReady(OpState& info, EventEngine::Closure* closure) {
@ -97,6 +99,8 @@ void WinSocket::NotifyOnWrite(EventEngine::Closure* on_write) {
NotifyOnReady(write_info_, on_write);
}
// ---- WinSocket::OpState ----
WinSocket::OpState::OpState(WinSocket* win_socket) noexcept
: win_socket_(win_socket), closure_(nullptr) {
memset(&overlapped_, 0, sizeof(OVERLAPPED));
@ -132,13 +136,10 @@ void WinSocket::SetWritable() { write_info_.SetReady(); }
bool WinSocket::IsShutdown() { return is_shutdown_.load(); }
WinSocket::OpState* WinSocket::GetOpInfoForOverlapped(OVERLAPPED* overlapped) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG,
"WinSocket::%p looking for matching OVERLAPPED::%p. "
"read(%p) write(%p)",
this, overlapped, &read_info_.overlapped_,
&write_info_.overlapped_);
}
GRPC_EVENT_ENGINE_POLLER_TRACE(
"WinSocket::%p looking for matching OVERLAPPED::%p. "
"read(%p) write(%p)",
this, overlapped, &read_info_.overlapped_, &write_info_.overlapped_);
if (overlapped == &read_info_.overlapped_) return &read_info_;
if (overlapped == &write_info_.overlapped_) return &write_info_;
return nullptr;

@ -81,7 +81,7 @@ WindowsEndpoint::~WindowsEndpoint() {
void WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) {
// TODO(hork): last_read_buffer from iomgr: Is it only garbage, or optimized?
GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p reading", this);
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p reading", this);
// Prepare the WSABUF struct
WSABUF wsa_buffers[kMaxWSABUFCount];
int min_read_size = kDefaultTargetReadSize;
@ -136,7 +136,8 @@ void WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* /* args */) {
if (grpc_event_engine_trace.enabled()) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p writing", this);
if (grpc_event_engine_endpoint_data_trace.enabled()) {
for (int i = 0; i < data->Count(); i++) {
auto str = data->RefSlice(i).as_string_view();
gpr_log(GPR_INFO, "WindowsEndpoint::%p WRITE (peer=%s): %.*s", this,
@ -219,7 +220,8 @@ WindowsEndpoint::BaseEventClosure::BaseEventClosure(WindowsEndpoint* endpoint)
: cb_(&AbortOnEvent), endpoint_(endpoint) {}
void WindowsEndpoint::HandleReadClosure::Run() {
GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p Handling Read Event", endpoint_);
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Read Event",
endpoint_);
absl::Status status;
auto* read_info = endpoint_->socket_->read_info();
auto cb_cleanup = absl::MakeCleanup([this, &status]() {
@ -239,7 +241,7 @@ void WindowsEndpoint::HandleReadClosure::Run() {
read_info->bytes_transferred());
}
GPR_ASSERT(read_info->bytes_transferred() == buffer_->Length());
if (grpc_event_engine_trace.enabled()) {
if (grpc_event_engine_endpoint_data_trace.enabled()) {
for (int i = 0; i < buffer_->Count(); i++) {
auto str = buffer_->RefSlice(i).as_string_view();
gpr_log(GPR_INFO, "WindowsEndpoint::%p READ (peer=%s): %.*s", this,
@ -256,8 +258,8 @@ void WindowsEndpoint::HandleReadClosure::Run() {
}
void WindowsEndpoint::HandleWriteClosure::Run() {
GRPC_EVENT_ENGINE_TRACE("WindowsEndpoint::%p Handling Write Event",
endpoint_);
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p Handling Write Event",
endpoint_);
auto* write_info = endpoint_->socket_->write_info();
auto cb = std::move(cb_);
cb_ = &AbortOnEvent;

@ -128,6 +128,9 @@ WindowsEventEngine::~WindowsEventEngine() {
bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) {
grpc_core::MutexLock lock(&task_mu_);
if (!known_handles_.contains(handle)) return false;
GRPC_EVENT_ENGINE_TRACE(
"WindowsEventEngine::%p cancelling %s", this,
HandleToString<EventEngine::TaskHandle>(handle).c_str());
auto* cd = reinterpret_cast<TimerClosure*>(handle.keys[0]);
bool r = timer_manager_.TimerCancel(&cd->timer);
known_handles_.erase(handle);

@ -45,12 +45,6 @@ namespace experimental {
class WindowsEventEngine : public EventEngine,
public grpc_core::KeepsGrpcInitialized {
public:
class WindowsListener : public EventEngine::Listener {
public:
~WindowsListener() override;
absl::StatusOr<int> Bind(const ResolvedAddress& addr) override;
absl::Status Start() override;
};
class WindowsDNSResolver : public EventEngine::DNSResolver {
public:
~WindowsDNSResolver() override;

@ -456,7 +456,6 @@ static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
grpc_resolved_address addr6_v4mapped;
grpc_resolved_address wildcard;
grpc_resolved_address* allocated_addr = NULL;
grpc_resolved_address sockname_temp;
unsigned port_index = 0;
grpc_error_handle error;
@ -468,6 +467,7 @@ static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
// as some previously created listener.
if (grpc_sockaddr_get_port(addr) == 0) {
for (sp = s->head; sp; sp = sp->next) {
grpc_resolved_address sockname_temp;
int sockname_temp_len = sizeof(struct sockaddr_storage);
if (0 == getsockname(sp->socket->socket,
(grpc_sockaddr*)sockname_temp.addr,

Loading…
Cancel
Save