mirror of https://github.com/grpc/grpc.git
Revert "[ObjC] CF Stream Event Engine Client" (#33027)
Reverts grpc/grpc#32924. This breaks the build again, unfortunately. From `test/core/event_engine/cf:cf_engine_test`: ``` error: module .../grpc/test/core/event_engine/cf:cf_engine_test does not depend on a module exporting 'grpc/support/port_platform.h' ``` @sampajano I recommend looking into CI tests to catch iOS problems before merging. We can enable EventEngine experiments in the CI generally once this PR lands, but this broken test is not one of those experiments. A normal build should have caught this. cc @HannahShiSFBpull/33031/head
parent
e2fcade407
commit
ee0aaacbde
32 changed files with 65 additions and 1092 deletions
@ -1,354 +0,0 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_APPLE |
||||
|
||||
#include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h" |
||||
#include "src/core/lib/event_engine/trace.h" |
||||
#include "src/core/lib/gprpp/strerror.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
namespace { |
||||
|
||||
int kDefaultReadBufferSize = 8192; |
||||
|
||||
absl::Status CFErrorToStatus(CFTypeUniqueRef<CFErrorRef> cf_error) { |
||||
if (cf_error == nullptr) { |
||||
return absl::OkStatus(); |
||||
} |
||||
CFErrorDomain cf_domain = CFErrorGetDomain((cf_error)); |
||||
CFIndex code = CFErrorGetCode((cf_error)); |
||||
CFTypeUniqueRef<CFStringRef> cf_desc = CFErrorCopyDescription((cf_error)); |
||||
char domain_buf[256]; |
||||
char desc_buf[256]; |
||||
CFStringGetCString(cf_domain, domain_buf, 256, kCFStringEncodingUTF8); |
||||
CFStringGetCString(cf_desc, desc_buf, 256, kCFStringEncodingUTF8); |
||||
return absl::Status(absl::StatusCode::kUnknown, |
||||
absl::StrFormat("(domain:%s, code:%ld, description:%s)", |
||||
domain_buf, code, desc_buf)); |
||||
} |
||||
|
||||
absl::StatusOr<EventEngine::ResolvedAddress> CFReadStreamLocallAddress( |
||||
CFReadStreamRef stream) { |
||||
CFTypeUniqueRef<CFDataRef> cf_native_handle = static_cast<CFDataRef>( |
||||
CFReadStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle)); |
||||
CFSocketNativeHandle socket; |
||||
CFDataGetBytes(cf_native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)), |
||||
(UInt8*)&socket); |
||||
EventEngine::ResolvedAddress addr; |
||||
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; |
||||
if (getsockname(socket, const_cast<sockaddr*>(addr.address()), &len) < 0) { |
||||
return absl::InternalError( |
||||
absl::StrCat("getsockname:", grpc_core::StrError(errno))); |
||||
} |
||||
return EventEngine::ResolvedAddress(addr.address(), len); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
bool CFStreamEndpointImpl::CancelConnect(absl::Status status) { |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE( |
||||
"CFStreamEndpointImpl::CancelConnect: status: %s, this: %p", |
||||
status.ToString().c_str(), this); |
||||
|
||||
return open_event_.SetShutdown(std::move(status)); |
||||
} |
||||
|
||||
void CFStreamEndpointImpl::Connect( |
||||
absl::AnyInvocable<void(absl::Status)> on_connect, |
||||
EventEngine::ResolvedAddress addr) { |
||||
auto addr_uri = ResolvedAddressToURI(addr); |
||||
|
||||
if (!addr_uri.ok()) { |
||||
on_connect(std::move(addr_uri).status()); |
||||
return; |
||||
} |
||||
|
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Connect: %s", |
||||
addr_uri.value().c_str()); |
||||
|
||||
peer_address_ = std::move(addr); |
||||
auto host_port = ResolvedAddressToNormalizedString(peer_address_); |
||||
if (!host_port.ok()) { |
||||
on_connect(std::move(host_port).status()); |
||||
return; |
||||
} |
||||
|
||||
peer_address_string_ = host_port.value(); |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE( |
||||
"CFStreamEndpointImpl::Connect, host_port: %s", host_port->c_str()); |
||||
|
||||
std::string host_string; |
||||
std::string port_string; |
||||
grpc_core::SplitHostPort(host_port.value(), &host_string, &port_string); |
||||
CFStringRef host = CFStringCreateWithCString(NULL, host_string.c_str(), |
||||
kCFStringEncodingUTF8); |
||||
int port = ResolvedAddressGetPort(peer_address_); |
||||
CFStreamCreatePairWithSocketToHost(NULL, host, port, &cf_read_stream_, |
||||
&cf_write_stream_); |
||||
|
||||
CFStreamClientContext cf_context = {0, this, Retain, Release, nullptr}; |
||||
CFReadStreamSetClient( |
||||
cf_read_stream_, |
||||
kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | |
||||
kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, |
||||
ReadCallback, &cf_context); |
||||
CFWriteStreamSetClient( |
||||
cf_write_stream_, |
||||
kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | |
||||
kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, |
||||
WriteCallback, &cf_context); |
||||
CFReadStreamSetDispatchQueue(cf_read_stream_, |
||||
dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)); |
||||
CFWriteStreamSetDispatchQueue( |
||||
cf_write_stream_, dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)); |
||||
|
||||
if (!CFReadStreamOpen(cf_read_stream_)) { |
||||
auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_)); |
||||
on_connect(std::move(status)); |
||||
return; |
||||
} |
||||
|
||||
if (!CFWriteStreamOpen(cf_write_stream_)) { |
||||
auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_)); |
||||
on_connect(std::move(status)); |
||||
return; |
||||
} |
||||
|
||||
open_event_.NotifyOn(new PosixEngineClosure( |
||||
[that = Ref(), |
||||
on_connect = std::move(on_connect)](absl::Status status) mutable { |
||||
if (!status.ok()) { |
||||
on_connect(std::move(status)); |
||||
return; |
||||
} |
||||
|
||||
auto local_addr = CFReadStreamLocallAddress(that->cf_read_stream_); |
||||
if (!local_addr.ok()) { |
||||
on_connect(std::move(local_addr).status()); |
||||
return; |
||||
} |
||||
|
||||
that->local_address_ = local_addr.value(); |
||||
that->local_address_string_ = |
||||
*ResolvedAddressToURI(that->local_address_); |
||||
on_connect(absl::OkStatus()); |
||||
}, |
||||
false /* is_permanent */)); |
||||
} |
||||
|
||||
/* static */ void CFStreamEndpointImpl::ReadCallback( |
||||
CFReadStreamRef stream, CFStreamEventType type, |
||||
void* client_callback_info) { |
||||
auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info); |
||||
|
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE( |
||||
"CFStreamEndpointImpl::ReadCallback, type: %lu, this: %p", type, self); |
||||
|
||||
switch (type) { |
||||
case kCFStreamEventOpenCompleted: |
||||
// wait for write stream open completed to signal connection ready
|
||||
break; |
||||
case kCFStreamEventHasBytesAvailable: |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case kCFStreamEventEndEncountered: |
||||
self->read_event_.SetReady(); |
||||
break; |
||||
case kCFStreamEventErrorOccurred: { |
||||
auto status = CFErrorToStatus(CFReadStreamCopyError(stream)); |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Read error: %s", |
||||
status.ToString().c_str()); |
||||
|
||||
self->open_event_.SetShutdown(status); |
||||
self->read_event_.SetShutdown(status); |
||||
self->write_event_.SetShutdown(status); |
||||
} break; |
||||
default: |
||||
GPR_UNREACHABLE_CODE(return); |
||||
} |
||||
} |
||||
|
||||
/* static */ |
||||
void CFStreamEndpointImpl::WriteCallback(CFWriteStreamRef stream, |
||||
CFStreamEventType type, |
||||
void* client_callback_info) { |
||||
auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info); |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE( |
||||
"CFStreamEndpointImpl::WriteCallback, type: %lu, this: %p", type, self); |
||||
|
||||
switch (type) { |
||||
case kCFStreamEventOpenCompleted: |
||||
self->open_event_.SetReady(); |
||||
break; |
||||
case kCFStreamEventCanAcceptBytes: |
||||
ABSL_FALLTHROUGH_INTENDED; |
||||
case kCFStreamEventEndEncountered: |
||||
self->write_event_.SetReady(); |
||||
break; |
||||
case kCFStreamEventErrorOccurred: { |
||||
auto status = CFErrorToStatus(CFWriteStreamCopyError(stream)); |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Write error: %s", |
||||
status.ToString().c_str()); |
||||
|
||||
self->open_event_.SetShutdown(status); |
||||
self->read_event_.SetShutdown(status); |
||||
self->write_event_.SetShutdown(status); |
||||
} break; |
||||
default: |
||||
GPR_UNREACHABLE_CODE(return); |
||||
} |
||||
} |
||||
|
||||
CFStreamEndpointImpl::CFStreamEndpointImpl( |
||||
std::shared_ptr<CFEventEngine> engine, MemoryAllocator memory_allocator) |
||||
: engine_(std::move(engine)), |
||||
memory_allocator_(std::move(memory_allocator)), |
||||
open_event_(engine_.get()), |
||||
read_event_(engine_.get()), |
||||
write_event_(engine_.get()) { |
||||
open_event_.InitEvent(); |
||||
read_event_.InitEvent(); |
||||
write_event_.InitEvent(); |
||||
} |
||||
|
||||
CFStreamEndpointImpl::~CFStreamEndpointImpl() { |
||||
open_event_.DestroyEvent(); |
||||
read_event_.DestroyEvent(); |
||||
write_event_.DestroyEvent(); |
||||
} |
||||
|
||||
void CFStreamEndpointImpl::Shutdown() { |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Shutdown: this: %p", |
||||
this); |
||||
|
||||
auto shutdownStatus = |
||||
absl::Status(absl::StatusCode::kUnknown, |
||||
absl::StrFormat("Shutting down CFStreamEndpointImpl")); |
||||
open_event_.SetShutdown(shutdownStatus); |
||||
read_event_.SetShutdown(shutdownStatus); |
||||
write_event_.SetShutdown(shutdownStatus); |
||||
|
||||
CFReadStreamSetClient(cf_read_stream_, kCFStreamEventNone, nullptr, nullptr); |
||||
CFWriteStreamSetClient(cf_write_stream_, kCFStreamEventNone, nullptr, |
||||
nullptr); |
||||
CFReadStreamClose(cf_read_stream_); |
||||
CFWriteStreamClose(cf_write_stream_); |
||||
} |
||||
|
||||
bool CFStreamEndpointImpl::Read( |
||||
absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, |
||||
const EventEngine::Endpoint::ReadArgs* /* args */) { |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Read, this: %p", |
||||
this); |
||||
|
||||
read_event_.NotifyOn(new PosixEngineClosure( |
||||
[that = Ref(), on_read = std::move(on_read), |
||||
buffer](absl::Status status) mutable { |
||||
if (status.ok()) { |
||||
that->DoRead(std::move(on_read), buffer); |
||||
} else { |
||||
on_read(status); |
||||
} |
||||
}, |
||||
false /* is_permanent*/)); |
||||
|
||||
return false; |
||||
} |
||||
|
||||
void CFStreamEndpointImpl::DoRead( |
||||
absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer) { |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoRead, this: %p", |
||||
this); |
||||
|
||||
auto buffer_index = buffer->AppendIndexed( |
||||
Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize))); |
||||
|
||||
CFIndex read_size = CFReadStreamRead( |
||||
cf_read_stream_, |
||||
internal::SliceCast<MutableSlice>(buffer->MutableSliceAt(buffer_index)) |
||||
.begin(), |
||||
kDefaultReadBufferSize); |
||||
|
||||
if (read_size < 0) { |
||||
auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_)); |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream read error: %s, read_size: %ld", |
||||
status.ToString().c_str(), read_size); |
||||
on_read(status); |
||||
return; |
||||
} |
||||
|
||||
buffer->RemoveLastNBytes(buffer->Length() - read_size); |
||||
on_read(absl::OkStatus()); |
||||
} |
||||
|
||||
bool CFStreamEndpointImpl::Write( |
||||
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data, |
||||
const EventEngine::Endpoint::WriteArgs* /* args */) { |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Write, this: %p", |
||||
this); |
||||
|
||||
write_event_.NotifyOn(new PosixEngineClosure( |
||||
[that = Ref(), on_writable = std::move(on_writable), |
||||
data](absl::Status status) mutable { |
||||
if (status.ok()) { |
||||
that->DoWrite(std::move(on_writable), data); |
||||
} else { |
||||
on_writable(status); |
||||
} |
||||
}, |
||||
false /* is_permanent*/)); |
||||
|
||||
return false; |
||||
} |
||||
|
||||
void CFStreamEndpointImpl::DoWrite( |
||||
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data) { |
||||
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoWrite, this: %p", |
||||
this); |
||||
|
||||
size_t total_written_size = 0; |
||||
for (size_t i = 0; i < data->Count(); i++) { |
||||
auto slice = data->RefSlice(i); |
||||
size_t written_size = |
||||
CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size()); |
||||
|
||||
total_written_size += written_size; |
||||
if (written_size < slice.size()) { |
||||
SliceBuffer written; |
||||
data->MoveFirstNBytesIntoSliceBuffer(total_written_size, written); |
||||
|
||||
write_event_.NotifyOn(new PosixEngineClosure( |
||||
[that = Ref(), on_writable = std::move(on_writable), |
||||
data](absl::Status status) mutable { |
||||
if (status.ok()) { |
||||
that->DoWrite(std::move(on_writable), data); |
||||
} else { |
||||
on_writable(status); |
||||
} |
||||
}, |
||||
false /* is_permanent*/)); |
||||
return; |
||||
} |
||||
} |
||||
on_writable(absl::OkStatus()); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GPR_APPLE
|
@ -1,146 +0,0 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H |
||||
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_APPLE |
||||
|
||||
#include <CoreFoundation/CoreFoundation.h> |
||||
|
||||
#include "absl/strings/str_format.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/address_utils/sockaddr_utils.h" |
||||
#include "src/core/lib/event_engine/cf_engine/cf_engine.h" |
||||
#include "src/core/lib/event_engine/cf_engine/cftype_unique_ref.h" |
||||
#include "src/core/lib/event_engine/posix_engine/lockfree_event.h" |
||||
#include "src/core/lib/event_engine/tcp_socket_utils.h" |
||||
#include "src/core/lib/gprpp/host_port.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
class CFStreamEndpointImpl |
||||
: public grpc_core::RefCounted<CFStreamEndpointImpl> { |
||||
public: |
||||
CFStreamEndpointImpl(std::shared_ptr<CFEventEngine> engine, |
||||
MemoryAllocator memory_allocator); |
||||
~CFStreamEndpointImpl(); |
||||
|
||||
void Shutdown(); |
||||
|
||||
bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, |
||||
const EventEngine::Endpoint::ReadArgs* args); |
||||
bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, |
||||
SliceBuffer* data, const EventEngine::Endpoint::WriteArgs* args); |
||||
|
||||
const EventEngine::ResolvedAddress& GetPeerAddress() const { |
||||
return peer_address_; |
||||
} |
||||
const EventEngine::ResolvedAddress& GetLocalAddress() const { |
||||
return local_address_; |
||||
} |
||||
|
||||
public: |
||||
void Connect(absl::AnyInvocable<void(absl::Status)> on_connect, |
||||
EventEngine::ResolvedAddress addr); |
||||
bool CancelConnect(absl::Status status); |
||||
|
||||
private: |
||||
void DoWrite(absl::AnyInvocable<void(absl::Status)> on_writable, |
||||
SliceBuffer* data); |
||||
void DoRead(absl::AnyInvocable<void(absl::Status)> on_read, |
||||
SliceBuffer* buffer); |
||||
|
||||
private: |
||||
static void* Retain(void* info) { |
||||
auto that = static_cast<CFStreamEndpointImpl*>(info); |
||||
return that->Ref().release(); |
||||
} |
||||
|
||||
static void Release(void* info) { |
||||
auto that = static_cast<CFStreamEndpointImpl*>(info); |
||||
that->Unref(); |
||||
} |
||||
|
||||
static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type, |
||||
void* client_callback_info); |
||||
static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, |
||||
void* client_callback_info); |
||||
|
||||
private: |
||||
CFTypeUniqueRef<CFReadStreamRef> cf_read_stream_; |
||||
CFTypeUniqueRef<CFWriteStreamRef> cf_write_stream_; |
||||
|
||||
std::shared_ptr<CFEventEngine> engine_; |
||||
|
||||
EventEngine::ResolvedAddress peer_address_; |
||||
EventEngine::ResolvedAddress local_address_; |
||||
std::string peer_address_string_; |
||||
std::string local_address_string_; |
||||
MemoryAllocator memory_allocator_; |
||||
|
||||
LockfreeEvent open_event_; |
||||
LockfreeEvent read_event_; |
||||
LockfreeEvent write_event_; |
||||
}; |
||||
|
||||
class CFStreamEndpoint : public EventEngine::Endpoint { |
||||
public: |
||||
CFStreamEndpoint(std::shared_ptr<CFEventEngine> engine, |
||||
MemoryAllocator memory_allocator) { |
||||
impl_ = grpc_core::MakeRefCounted<CFStreamEndpointImpl>( |
||||
std::move(engine), std::move(memory_allocator)); |
||||
} |
||||
~CFStreamEndpoint() override { impl_->Shutdown(); } |
||||
|
||||
bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, |
||||
const ReadArgs* args) override { |
||||
return impl_->Read(std::move(on_read), buffer, args); |
||||
} |
||||
bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, |
||||
SliceBuffer* data, const WriteArgs* args) override { |
||||
return impl_->Write(std::move(on_writable), data, args); |
||||
} |
||||
|
||||
const EventEngine::ResolvedAddress& GetPeerAddress() const override { |
||||
return impl_->GetPeerAddress(); |
||||
} |
||||
const EventEngine::ResolvedAddress& GetLocalAddress() const override { |
||||
return impl_->GetLocalAddress(); |
||||
} |
||||
|
||||
public: |
||||
void Connect(absl::AnyInvocable<void(absl::Status)> on_connect, |
||||
EventEngine::ResolvedAddress addr) { |
||||
impl_->Connect(std::move(on_connect), std::move(addr)); |
||||
} |
||||
bool CancelConnect(absl::Status status) { |
||||
return impl_->CancelConnect(std::move(status)); |
||||
} |
||||
|
||||
private: |
||||
grpc_core::RefCountedPtr<CFStreamEndpointImpl> impl_; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GPR_APPLE
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H
|
@ -1,79 +0,0 @@ |
||||
// Copyright 2023 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFTYPE_UNIQUE_REF_H |
||||
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFTYPE_UNIQUE_REF_H |
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_APPLE |
||||
|
||||
#include <CoreFoundation/CoreFoundation.h> |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
template <typename T> |
||||
class CFTypeUniqueRef { |
||||
static_assert(std::is_convertible<T, CFTypeRef>::value, |
||||
"T should be `CFXxxRef` type"); |
||||
|
||||
public: |
||||
/* implicit */ |
||||
CFTypeUniqueRef(T cf_type_ref = nullptr) : cf_type_ref_(cf_type_ref) {} |
||||
~CFTypeUniqueRef() { reset(); } |
||||
|
||||
CFTypeUniqueRef(CFTypeUniqueRef const&) = delete; |
||||
CFTypeUniqueRef& operator=(CFTypeUniqueRef const&) = delete; |
||||
|
||||
CFTypeUniqueRef(CFTypeUniqueRef&& other) : cf_type_ref_(other.release()){}; |
||||
CFTypeUniqueRef& operator=(CFTypeUniqueRef&& other) { |
||||
reset(other.release()); |
||||
return *this; |
||||
} |
||||
|
||||
operator T() { return cf_type_ref_; } |
||||
|
||||
// Note: this is for passing a CFTypeRef as output parameter to a CF API, the
|
||||
// current ref is released (if any) regardless of whether new value is set
|
||||
T* operator&() { |
||||
reset(); |
||||
return &cf_type_ref_; |
||||
} |
||||
|
||||
T release() { |
||||
T old = cf_type_ref_; |
||||
cf_type_ref_ = nullptr; |
||||
return old; |
||||
} |
||||
|
||||
void reset(T other = nullptr) { |
||||
if (cf_type_ref_ == other) { |
||||
return; |
||||
} |
||||
T old = cf_type_ref_; |
||||
cf_type_ref_ = other; |
||||
if (old) { |
||||
CFRelease(old); |
||||
} |
||||
} |
||||
|
||||
private: |
||||
T cf_type_ref_; |
||||
}; |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
#endif // GPR_APPLE
|
||||
|
||||
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFTYPE_UNIQUE_REF_H
|
@ -1,33 +0,0 @@ |
||||
# Copyright 2023 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
load("//bazel:grpc_build_system.bzl", "grpc_cc_test") |
||||
|
||||
licenses(["notice"]) |
||||
|
||||
grpc_cc_test( |
||||
name = "cf_engine_test", |
||||
timeout = "short", |
||||
srcs = ["cf_engine_test.cc"], |
||||
external_deps = ["gtest"], |
||||
language = "C++", |
||||
tags = [ |
||||
"no_linux", |
||||
"no_windows", |
||||
], |
||||
deps = [ |
||||
"//src/core:cf_event_engine", |
||||
"//test/core/util:grpc_test_util", |
||||
], |
||||
) |
@ -1,96 +0,0 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#ifdef GPR_APPLE |
||||
|
||||
#include <thread> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/lib/event_engine/cf_engine/cf_engine.h" |
||||
#include "src/core/lib/event_engine/channel_args_endpoint_config.h" |
||||
#include "src/core/lib/event_engine/tcp_socket_utils.h" |
||||
#include "src/core/lib/resource_quota/memory_quota.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "test/core/util/port.h" |
||||
|
||||
using namespace std::chrono_literals; |
||||
|
||||
namespace grpc_event_engine { |
||||
namespace experimental { |
||||
|
||||
TEST(CFEventEngineTest, TestConnectionTimeout) { |
||||
// use a non-routable IP so connection will timeout
|
||||
auto resolved_addr = URIToResolvedAddress("ipv4:10.255.255.255:1234"); |
||||
GPR_ASSERT(resolved_addr.ok()); |
||||
|
||||
grpc_core::MemoryQuota memory_quota("cf_engine_test"); |
||||
grpc_core::Notification client_signal; |
||||
auto cf_engine = std::make_shared<CFEventEngine>(); |
||||
|
||||
ChannelArgsEndpointConfig config(grpc_core::ChannelArgs().Set( |
||||
GRPC_ARG_RESOURCE_QUOTA, grpc_core::ResourceQuota::Default())); |
||||
cf_engine->Connect( |
||||
[&client_signal](auto endpoint) { |
||||
EXPECT_EQ(endpoint.status().code(), |
||||
absl::StatusCode::kDeadlineExceeded); |
||||
client_signal.Notify(); |
||||
}, |
||||
*resolved_addr, config, memory_quota.CreateMemoryAllocator("conn1"), 1ms); |
||||
|
||||
client_signal.WaitForNotification(); |
||||
} |
||||
|
||||
TEST(CFEventEngineTest, TestConnectionCancelled) { |
||||
// use a non-routable IP so to cancel connection before timeout
|
||||
auto resolved_addr = URIToResolvedAddress("ipv4:10.255.255.255:1234"); |
||||
GPR_ASSERT(resolved_addr.ok()); |
||||
|
||||
grpc_core::MemoryQuota memory_quota("cf_engine_test"); |
||||
grpc_core::Notification client_signal; |
||||
auto cf_engine = std::make_shared<CFEventEngine>(); |
||||
|
||||
ChannelArgsEndpointConfig config(grpc_core::ChannelArgs().Set( |
||||
GRPC_ARG_RESOURCE_QUOTA, grpc_core::ResourceQuota::Default())); |
||||
auto conn_handle = cf_engine->Connect( |
||||
[&client_signal](auto endpoint) { |
||||
EXPECT_EQ(endpoint.status().code(), absl::StatusCode::kCancelled); |
||||
client_signal.Notify(); |
||||
}, |
||||
*resolved_addr, config, memory_quota.CreateMemoryAllocator("conn1"), 1h); |
||||
|
||||
cf_engine->CancelConnect(conn_handle); |
||||
client_signal.WaitForNotification(); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace grpc_event_engine
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
grpc_init(); |
||||
int status = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return status; |
||||
} |
||||
|
||||
#else // not GPR_APPLE
|
||||
int main(int /* argc */, char** /* argv */) { return 0; } |
||||
#endif |
Loading…
Reference in new issue