|
|
|
@ -23,7 +23,6 @@ |
|
|
|
|
#include "absl/strings/str_format.h" |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h" |
|
|
|
|
#include "src/core/lib/event_engine/trace.h" |
|
|
|
|
#include "src/core/lib/gprpp/strerror.h" |
|
|
|
|
|
|
|
|
|
namespace grpc_event_engine { |
|
|
|
@ -68,9 +67,9 @@ absl::StatusOr<EventEngine::ResolvedAddress> CFReadStreamLocallAddress( |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
bool CFStreamEndpointImpl::CancelConnect(absl::Status status) { |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE( |
|
|
|
|
"CFStreamEndpointImpl::CancelConnect: status: %s, this: %p", |
|
|
|
|
status.ToString().c_str(), this); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::CancelConnect: status: " << status |
|
|
|
|
<< ", this: " << this; |
|
|
|
|
|
|
|
|
|
return open_event_.SetShutdown(std::move(status)); |
|
|
|
|
} |
|
|
|
@ -85,8 +84,8 @@ void CFStreamEndpointImpl::Connect( |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Connect: %s", |
|
|
|
|
addr_uri.value().c_str()); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::Connect: " << addr_uri.value(); |
|
|
|
|
|
|
|
|
|
peer_address_ = std::move(addr); |
|
|
|
|
auto host_port = ResolvedAddressToNormalizedString(peer_address_); |
|
|
|
@ -96,8 +95,8 @@ void CFStreamEndpointImpl::Connect( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
peer_address_string_ = host_port.value(); |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE( |
|
|
|
|
"CFStreamEndpointImpl::Connect, host_port: %s", host_port->c_str()); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::Connect, host_port: " << peer_address_string_; |
|
|
|
|
|
|
|
|
|
std::string host_string; |
|
|
|
|
std::string port_string; |
|
|
|
@ -163,8 +162,9 @@ void CFStreamEndpointImpl::Connect( |
|
|
|
|
void* client_callback_info) { |
|
|
|
|
auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info); |
|
|
|
|
|
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE( |
|
|
|
|
"CFStreamEndpointImpl::ReadCallback, type: %lu, this: %p", type, self); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::ReadCallback, type: " << type |
|
|
|
|
<< ", this: " << self; |
|
|
|
|
|
|
|
|
|
switch (type) { |
|
|
|
|
case kCFStreamEventOpenCompleted: |
|
|
|
@ -177,8 +177,8 @@ void CFStreamEndpointImpl::Connect( |
|
|
|
|
break; |
|
|
|
|
case kCFStreamEventErrorOccurred: { |
|
|
|
|
auto status = CFErrorToStatus(CFReadStreamCopyError(stream)); |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Read error: %s", |
|
|
|
|
status.ToString().c_str()); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStream Read error: " << status; |
|
|
|
|
|
|
|
|
|
self->open_event_.SetShutdown(status); |
|
|
|
|
self->read_event_.SetShutdown(status); |
|
|
|
@ -194,8 +194,9 @@ void CFStreamEndpointImpl::WriteCallback(CFWriteStreamRef stream, |
|
|
|
|
CFStreamEventType type, |
|
|
|
|
void* client_callback_info) { |
|
|
|
|
auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info); |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE( |
|
|
|
|
"CFStreamEndpointImpl::WriteCallback, type: %lu, this: %p", type, self); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::WriteCallback, type: " << type |
|
|
|
|
<< ", this: " << self; |
|
|
|
|
|
|
|
|
|
switch (type) { |
|
|
|
|
case kCFStreamEventOpenCompleted: |
|
|
|
@ -208,8 +209,8 @@ void CFStreamEndpointImpl::WriteCallback(CFWriteStreamRef stream, |
|
|
|
|
break; |
|
|
|
|
case kCFStreamEventErrorOccurred: { |
|
|
|
|
auto status = CFErrorToStatus(CFWriteStreamCopyError(stream)); |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Write error: %s", |
|
|
|
|
status.ToString().c_str()); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStream Write error: " << status; |
|
|
|
|
|
|
|
|
|
self->open_event_.SetShutdown(status); |
|
|
|
|
self->read_event_.SetShutdown(status); |
|
|
|
@ -239,8 +240,8 @@ CFStreamEndpointImpl::~CFStreamEndpointImpl() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CFStreamEndpointImpl::Shutdown() { |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Shutdown: this: %p", |
|
|
|
|
this); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::Shutdown: this: " << this; |
|
|
|
|
|
|
|
|
|
auto shutdownStatus = |
|
|
|
|
absl::Status(absl::StatusCode::kUnknown, |
|
|
|
@ -259,8 +260,8 @@ void CFStreamEndpointImpl::Shutdown() { |
|
|
|
|
bool CFStreamEndpointImpl::Read( |
|
|
|
|
absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, |
|
|
|
|
const EventEngine::Endpoint::ReadArgs* /* args */) { |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Read, this: %p", |
|
|
|
|
this); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::Read, this: " << this; |
|
|
|
|
|
|
|
|
|
read_event_.NotifyOn(new PosixEngineClosure( |
|
|
|
|
[that = Ref(), on_read = std::move(on_read), |
|
|
|
@ -278,8 +279,8 @@ bool CFStreamEndpointImpl::Read( |
|
|
|
|
|
|
|
|
|
void CFStreamEndpointImpl::DoRead( |
|
|
|
|
absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer) { |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoRead, this: %p", |
|
|
|
|
this); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::DoRead, this: " << this; |
|
|
|
|
|
|
|
|
|
auto buffer_index = buffer->AppendIndexed( |
|
|
|
|
Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize))); |
|
|
|
@ -292,8 +293,8 @@ void CFStreamEndpointImpl::DoRead( |
|
|
|
|
|
|
|
|
|
if (read_size < 0) { |
|
|
|
|
auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_)); |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream read error: %s, read_size: %ld", |
|
|
|
|
status.ToString().c_str(), read_size); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStream read error: " << status << ", read_size: " << read_size; |
|
|
|
|
on_read(status); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -305,8 +306,8 @@ void CFStreamEndpointImpl::DoRead( |
|
|
|
|
bool CFStreamEndpointImpl::Write( |
|
|
|
|
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data, |
|
|
|
|
const EventEngine::Endpoint::WriteArgs* /* args */) { |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Write, this: %p", |
|
|
|
|
this); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::Write, this: " << this; |
|
|
|
|
|
|
|
|
|
write_event_.NotifyOn(new PosixEngineClosure( |
|
|
|
|
[that = Ref(), on_writable = std::move(on_writable), |
|
|
|
@ -324,8 +325,8 @@ bool CFStreamEndpointImpl::Write( |
|
|
|
|
|
|
|
|
|
void CFStreamEndpointImpl::DoWrite( |
|
|
|
|
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data) { |
|
|
|
|
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoWrite, this: %p", |
|
|
|
|
this); |
|
|
|
|
GRPC_TRACE_LOG(event_engine_endpoint, INFO) |
|
|
|
|
<< "CFStreamEndpointImpl::DoWrite, this: " << this; |
|
|
|
|
|
|
|
|
|
size_t total_written_size = 0; |
|
|
|
|
for (size_t i = 0; i < data->Count(); i++) { |
|
|
|
|