From cdf709c76e1e0bf69426f69854e61eeba42e0ecd Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 25 Apr 2018 13:58:10 -0700 Subject: [PATCH] CFStream tcp client and endpoint --- src/core/lib/iomgr/tcp_cfstream.h | 52 ++++ src/core/lib/iomgr/tcp_cfstream.mm | 330 ++++++++++++++++++++++ src/core/lib/iomgr/tcp_client_cfstream.mm | 203 +++++++++++++ 3 files changed, 585 insertions(+) create mode 100644 src/core/lib/iomgr/tcp_cfstream.h create mode 100644 src/core/lib/iomgr/tcp_cfstream.mm create mode 100644 src/core/lib/iomgr/tcp_client_cfstream.mm diff --git a/src/core/lib/iomgr/tcp_cfstream.h b/src/core/lib/iomgr/tcp_cfstream.h new file mode 100644 index 00000000000..dedcad49419 --- /dev/null +++ b/src/core/lib/iomgr/tcp_cfstream.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H +#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H +/* + Low level TCP "bottom half" implementation, for use by transports built on + top of a TCP connection. + + Note that this file does not (yet) include APIs for creating the socket in + the first place. + + All calls passing slice transfer ownership of a slice refcount unless + otherwise specified. +*/ + +#include + +#ifdef GRPC_CFSTREAM + +#import + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/tcp_cfstream_sync.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream, + const char* peer_string, + grpc_resource_quota* resource_quota, + CFStreamSync* stream_sync); + +#endif /* GRPC_CFSTREAM */ + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H */ diff --git a/src/core/lib/iomgr/tcp_cfstream.mm b/src/core/lib/iomgr/tcp_cfstream.mm new file mode 100644 index 00000000000..04086aef6fc --- /dev/null +++ b/src/core/lib/iomgr/tcp_cfstream.mm @@ -0,0 +1,330 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_TCP + +#import +#import "src/core/lib/iomgr/tcp_cfstream.h" + +#include +#include +#include + +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error_apple.h" +#include "src/core/lib/iomgr/tcp_cfstream_sync.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct { + grpc_endpoint base; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamSync* stream_sync; + + grpc_closure* read_cb; + grpc_closure* write_cb; + grpc_slice_buffer* read_slices; + grpc_slice_buffer* write_slices; + + grpc_closure read_action; + grpc_closure write_action; + CFStreamEventType read_type; + + char* peer_string; + grpc_resource_user* resource_user; + grpc_resource_user_slice_allocator slice_allocator; +} CFStreamTCP; + +static void TCPFree(CFStreamTCP* tcp) { + grpc_resource_user_unref(tcp->resource_user); + CFRelease(tcp->read_stream); + CFRelease(tcp->write_stream); + CFSTREAM_SYNC_UNREF(tcp->stream_sync, "free"); + gpr_free(tcp->peer_string); + gpr_free(tcp); +} + +#ifndef NDEBUG +#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) +#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file, int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, + reason, val, val - 1); + } + if (gpr_unref(&tcp->refcount)) { + TCPFree(tcp); + } +} +static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file, int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, + reason, val, val + 1); + } + gpr_ref(&tcp->refcount); +} +#else +#define TCP_REF(tcp, reason) tcp_ref((tcp)) +#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +static void tcp_unref(CFStreamTCP* tcp) { + if (gpr_unref(&tcp->refcount)) { + tcp_free(tcp); + } +} +static void tcp_ref(CFStreamTCP* tcp) { gpr_ref(&tcp->refcount); } +#endif + +static grpc_error* TCPAnnotateError(grpc_error* src_error, CFStreamTCP* tcp) { + return grpc_error_set_str( + grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(tcp->peer_string)); +} + +static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb, tcp->read_cb->cb, + tcp->read_cb->cb_arg); + size_t i; + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + + for (i = 0; i < tcp->read_slices->count; i++) { + char* dump = grpc_dump_slice(tcp->read_slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); + gpr_free(dump); + } + } + grpc_closure* cb = tcp->read_cb; + tcp->read_cb = nullptr; + tcp->read_slices = nullptr; + GRPC_CLOSURE_RUN(cb, error); +} + +static void CallWriteCB(CFStreamTCP* tcp, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb, tcp->write_cb->cb, + tcp->write_cb->cb_arg); + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "write: error=%s", str); + } + grpc_closure* cb = tcp->write_cb; + tcp->write_cb = nullptr; + tcp->write_slices = nullptr; + GRPC_CLOSURE_RUN(cb, error); +} + +static void ReadAction(void* arg, grpc_error* error) { + CFStreamTCP* tcp = static_cast(arg); + GPR_ASSERT(tcp->read_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); + CallReadCB(tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(tcp, "read"); + return; + } + + GPR_ASSERT(tcp->read_slices->count == 1); + grpc_slice slice = tcp->read_slices->slices[0]; + size_t len = GRPC_SLICE_LENGTH(slice); + CFIndex read_size = CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len); + if (read_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); + CFErrorRef stream_error = CFReadStreamCopyError(tcp->read_stream); + CallReadCB(tcp, + TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), tcp)); + CFRelease(stream_error); + TCP_UNREF(tcp, "read"); + } else if (read_size == 0) { + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); + CallReadCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); + TCP_UNREF(tcp, "read"); + } else { + if (read_size < len) { + grpc_slice_buffer_trim_end(tcp->read_slices, len - read_size, nullptr); + } + CallReadCB(tcp, GRPC_ERROR_NONE); + TCP_UNREF(tcp, "read"); + } +} + +static void WriteAction(void* arg, grpc_error* error) { + CFStreamTCP* tcp = static_cast(arg); + GPR_ASSERT(tcp->write_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices); + CallWriteCB(tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(tcp, "write"); + return; + } + + grpc_slice slice = grpc_slice_buffer_take_first(tcp->write_slices); + size_t slice_len = GRPC_SLICE_LENGTH(slice); + CFIndex write_size = + CFWriteStreamWrite(tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); + if (write_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices); + CFErrorRef stream_error = CFWriteStreamCopyError(tcp->write_stream); + CallWriteCB( + tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), tcp)); + CFRelease(stream_error); + TCP_UNREF(tcp, "write"); + } else { + if (write_size < GRPC_SLICE_LENGTH(slice)) { + grpc_slice_buffer_undo_take_first(tcp->write_slices, + grpc_slice_sub(slice, write_size, slice_len)); + } + if (tcp->write_slices->length > 0) { + tcp->stream_sync->NotifyOnWrite(&tcp->write_action); + } else { + CallWriteCB(tcp, GRPC_ERROR_NONE); + TCP_UNREF(tcp, "write"); + } + + if (grpc_tcp_trace.enabled()) { + grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size); + char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, dump); + gpr_free(dump); + grpc_slice_unref(trace_slice); + } + } + grpc_slice_unref(slice); +} + +static void TCPReadAllocationDone(void* arg, grpc_error* error) { + CFStreamTCP* tcp = static_cast(arg); + if (error == GRPC_ERROR_NONE) { + tcp->stream_sync->NotifyOnRead(&tcp->read_action); + } else { + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); + CallReadCB(tcp, error); + TCP_UNREF(tcp, "read"); + } +} + +static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb) { + CFStreamTCP* tcp = reinterpret_cast(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb, slices->length); + } + GPR_ASSERT(tcp->read_cb == nullptr); + tcp->read_cb = cb; + tcp->read_slices = slices; + grpc_slice_buffer_reset_and_unref_internal(slices); + grpc_resource_user_alloc_slices(&tcp->slice_allocator, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, + tcp->read_slices); + TCP_REF(tcp, "read"); +} + +static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb) { + CFStreamTCP* tcp = reinterpret_cast(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb, slices->length); + } + GPR_ASSERT(tcp->write_cb == nullptr); + tcp->write_cb = cb; + tcp->write_slices = slices; + TCP_REF(tcp, "write"); + tcp->stream_sync->NotifyOnWrite(&tcp->write_action); +} + +void TCPShutdown(grpc_endpoint* ep, grpc_error* why) { + CFStreamTCP* tcp = reinterpret_cast(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p shutdown (%p)", tcp, why); + } + CFReadStreamClose(tcp->read_stream); + CFWriteStreamClose(tcp->write_stream); + tcp->stream_sync->Shutdown(why); + grpc_resource_user_shutdown(tcp->resource_user); +} + +void TCPDestroy(grpc_endpoint* ep) { + CFStreamTCP* tcp = reinterpret_cast(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p destroy", tcp); + } + TCP_UNREF(tcp, "destroy"); +} + +grpc_resource_user* TCPGetResourceUser(grpc_endpoint* ep) { + CFStreamTCP* tcp = reinterpret_cast(ep); + return tcp->resource_user; +} + +char* TCPGetPeer(grpc_endpoint* ep) { + CFStreamTCP* tcp = reinterpret_cast(ep); + return gpr_strdup(tcp->peer_string); +} + +int TCPGetFD(grpc_endpoint* ep) { return 0; } + +void TCPAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} +void TCPAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} +void TCPDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} + +static const grpc_endpoint_vtable vtable = { + TCPRead, TCPWrite, TCPAddToPollset, TCPAddToPollsetSet, TCPDeleteFromPollsetSet, + TCPShutdown, TCPDestroy, TCPGetResourceUser, TCPGetPeer, TCPGetFD}; + +grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, CFWriteStreamRef write_stream, + const char* peer_string, grpc_resource_quota* resource_quota, + CFStreamSync* stream_sync) { + CFStreamTCP* tcp = static_cast(gpr_malloc(sizeof(CFStreamTCP))); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp, read_stream, + write_stream); + } + tcp->base.vtable = &vtable; + gpr_ref_init(&tcp->refcount, 1); + tcp->read_stream = read_stream; + tcp->write_stream = write_stream; + CFRetain(read_stream); + CFRetain(write_stream); + tcp->stream_sync = stream_sync; + CFSTREAM_SYNC_REF(tcp->stream_sync, "endpoint create"); + + tcp->peer_string = gpr_strdup(peer_string); + tcp->read_cb = nil; + tcp->write_cb = nil; + tcp->read_slices = nil; + tcp->write_slices = nil; + GRPC_CLOSURE_INIT(&tcp->read_action, ReadAction, static_cast(tcp), + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&tcp->write_action, WriteAction, static_cast(tcp), + grpc_schedule_on_exec_ctx); + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); + grpc_resource_user_slice_allocator_init(&tcp->slice_allocator, tcp->resource_user, + TCPReadAllocationDone, tcp); + + return &tcp->base; +} + +#endif /* GRPC_CFSTREAM_TCP */ diff --git a/src/core/lib/iomgr/tcp_client_cfstream.mm b/src/core/lib/iomgr/tcp_client_cfstream.mm new file mode 100644 index 00000000000..c2c77bd9caf --- /dev/null +++ b/src/core/lib/iomgr/tcp_client_cfstream.mm @@ -0,0 +1,203 @@ + +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_TCP_CLIENT + +#include + +#include + +#include +#include +#include + +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/host_port.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/error_apple.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_cfstream.h" +#include "src/core/lib/iomgr/tcp_cfstream_sync.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/timer.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct CFStreamTCPConnect { + gpr_mu mu; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamSync* stream_sync; + + grpc_timer alarm; + grpc_closure on_alarm; + grpc_closure on_open; + + bool read_stream_open; + bool write_stream_open; + bool failed; + + grpc_closure* closure; + grpc_endpoint** endpoint; + int refs; + char* addr_name; + grpc_resource_quota* resource_quota; +} CFStreamTCPConnect; + +static void TCPConnectCleanup(CFStreamTCPConnect* connect) { + grpc_resource_quota_unref_internal(connect->resource_quota); + CFSTREAM_SYNC_UNREF(connect->stream_sync, "async connect clean up"); + CFRelease(connect->read_stream); + CFRelease(connect->write_stream); + gpr_mu_destroy(&connect->mu); + gpr_free(connect->addr_name); + gpr_free(connect); +} + +static void OnAlarm(void* arg, grpc_error* error) { + CFStreamTCPConnect* connect = static_cast(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_closure* closure = connect->closure; + connect->closure = nil; + const bool done = (--connect->refs == 0); + gpr_mu_unlock(&connect->mu); + // Only schedule a callback once, by either on_timer or on_connected. The first one issues + // callback while the second one does cleanup. + if (done) { + TCPConnectCleanup(connect); + } else { + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); + GRPC_CLOSURE_SCHED(closure, error); + } +} + +static void OnOpen(void* arg, grpc_error* error) { + CFStreamTCPConnect* connect = static_cast(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_timer_cancel(&connect->alarm); + grpc_closure* closure = connect->closure; + connect->closure = nil; + + bool done = (--connect->refs == 0); + grpc_endpoint** endpoint = connect->endpoint; + + if (done) { + gpr_mu_unlock(&connect->mu); + TCPConnectCleanup(connect); + } else { + if (error == GRPC_ERROR_NONE) { + CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream); + if (stream_error == NULL) { + stream_error = CFWriteStreamCopyError(connect->write_stream); + } + if (stream_error) { + error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error"); + CFRelease(stream_error); + } + if (error == GRPC_ERROR_NONE) { + *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream, connect->addr_name, + connect->resource_quota, connect->stream_sync); + } + } + gpr_mu_unlock(&connect->mu); + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(error)); + } +} + +static void ParseResolvedAddress(const grpc_resolved_address* addr, CFStringRef* host, int* port) { + char *host_port, *host_string, *port_string; + grpc_sockaddr_to_string(&host_port, addr, 1); + gpr_split_host_port(host_port, &host_string, &port_string); + *host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8); + gpr_free(host_string); + gpr_free(port_string); + gpr_free(host_port); + *port = grpc_sockaddr_get_port(addr); +} + +static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* resolved_addr, grpc_millis deadline) { + CFStreamTCPConnect* connect; + + connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect)); + connect->closure = closure; + connect->endpoint = ep; + connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); + // connect->resource_quota = resource_quota; + connect->refs = 2; // One for the connect operation, one for the timer. + gpr_ref_init(&connect->refcount, 1); + gpr_mu_init(&connect->mu); + + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", connect->addr_name); + } + + grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + (grpc_resource_quota*)channel_args->args[i].value.pointer.p); + } + } + } + connect->resource_quota = resource_quota; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + + CFStringRef host; + int port; + ParseResolvedAddress(resolved_addr, &host, &port); + CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, &write_stream); + CFRelease(host); + connect->read_stream = read_stream; + connect->write_stream = write_stream; + connect->stream_sync = CFStreamSync::CreateStreamSync(read_stream, write_stream); + GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast(connect), + grpc_schedule_on_exec_ctx); + connect->stream_sync->NotifyOnOpen(&connect->on_open); + GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, grpc_schedule_on_exec_ctx); + gpr_mu_lock(&connect->mu); + CFReadStreamOpen(read_stream); + CFWriteStreamOpen(write_stream); + grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); + gpr_mu_unlock(&connect->mu); +} + +grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {TCPClientConnect}; + +#endif /* GRPC_CFSTREAM_TCP_CLIENT */