diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc index 827fd24831e..bb402f96cf5 100644 --- a/src/core/lib/iomgr/cfstream_handle.cc +++ b/src/core/lib/iomgr/cfstream_handle.cc @@ -52,62 +52,53 @@ CFStreamHandle* CFStreamHandle::CreateStreamHandle( void CFStreamHandle::ReadCallback(CFReadStreamRef stream, CFStreamEventType type, void* client_callback_info) { + grpc_core::ExecCtx exec_ctx; CFStreamHandle* handle = static_cast(client_callback_info); - CFSTREAM_HANDLE_REF(handle, "read callback"); - dispatch_async( - dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ - grpc_core::ExecCtx exec_ctx; - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle, - stream, type, client_callback_info); - } - switch (type) { - case kCFStreamEventOpenCompleted: - handle->open_event_.SetReady(); - break; - case kCFStreamEventHasBytesAvailable: - case kCFStreamEventEndEncountered: - handle->read_event_.SetReady(); - break; - case kCFStreamEventErrorOccurred: - handle->open_event_.SetReady(); - handle->read_event_.SetReady(); - break; - default: - GPR_UNREACHABLE_CODE(return ); - } - CFSTREAM_HANDLE_UNREF(handle, "read callback"); - }); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle, + stream, type, client_callback_info); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventHasBytesAvailable: + case kCFStreamEventEndEncountered: + handle->read_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->read_event_.SetReady(); + break; + default: + GPR_UNREACHABLE_CODE(return ); + } } void CFStreamHandle::WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, void* clientCallBackInfo) { + grpc_core::ExecCtx exec_ctx; CFStreamHandle* handle = static_cast(clientCallBackInfo); - CFSTREAM_HANDLE_REF(handle, "write callback"); - dispatch_async( - dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ - grpc_core::ExecCtx exec_ctx; - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle, - stream, type, clientCallBackInfo); - } - switch (type) { - case kCFStreamEventOpenCompleted: - handle->open_event_.SetReady(); - break; - case kCFStreamEventCanAcceptBytes: - case kCFStreamEventEndEncountered: - handle->write_event_.SetReady(); - break; - case kCFStreamEventErrorOccurred: - handle->open_event_.SetReady(); - handle->write_event_.SetReady(); - break; - default: - GPR_UNREACHABLE_CODE(return ); - } - CFSTREAM_HANDLE_UNREF(handle, "write callback"); - }); + printf("** CFStreamHandle::WriteCallback\n"); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle, + stream, type, clientCallBackInfo); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventCanAcceptBytes: + case kCFStreamEventEndEncountered: + handle->write_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->write_event_.SetReady(); + break; + default: + GPR_UNREACHABLE_CODE(return ); + } } CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, @@ -116,6 +107,7 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, open_event_.InitEvent(); read_event_.InitEvent(); write_event_.InitEvent(); + dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL); CFStreamClientContext ctx = {0, static_cast(this), CFStreamHandle::Retain, CFStreamHandle::Release, nil}; @@ -129,10 +121,8 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, CFStreamHandle::WriteCallback, &ctx); - CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), - kCFRunLoopCommonModes); - CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), - kCFRunLoopCommonModes); + CFReadStreamSetDispatchQueue(read_stream, dispatch_queue_); + CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue_); } CFStreamHandle::~CFStreamHandle() { diff --git a/src/core/lib/iomgr/cfstream_handle.h b/src/core/lib/iomgr/cfstream_handle.h index 4258e72431c..93ec5f044bb 100644 --- a/src/core/lib/iomgr/cfstream_handle.h +++ b/src/core/lib/iomgr/cfstream_handle.h @@ -62,6 +62,8 @@ class CFStreamHandle final { grpc_core::LockfreeEvent read_event_; grpc_core::LockfreeEvent write_event_; + dispatch_queue_t dispatch_queue_; + gpr_refcount refcount_; };