mirror of https://github.com/grpc/grpc.git
parent
6abc0aef5d
commit
cdf709c76e
3 changed files with 585 additions and 0 deletions
@ -0,0 +1,52 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2018 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H |
||||||
|
#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H |
||||||
|
/*
|
||||||
|
Low level TCP "bottom half" implementation, for use by transports built on |
||||||
|
top of a TCP connection. |
||||||
|
|
||||||
|
Note that this file does not (yet) include APIs for creating the socket in |
||||||
|
the first place. |
||||||
|
|
||||||
|
All calls passing slice transfer ownership of a slice refcount unless |
||||||
|
otherwise specified. |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#ifdef GRPC_CFSTREAM |
||||||
|
|
||||||
|
#import <Foundation/Foundation.h> |
||||||
|
|
||||||
|
#include "src/core/lib/debug/trace.h" |
||||||
|
#include "src/core/lib/iomgr/endpoint.h" |
||||||
|
#include "src/core/lib/iomgr/tcp_cfstream_sync.h" |
||||||
|
|
||||||
|
extern grpc_core::TraceFlag grpc_tcp_trace; |
||||||
|
|
||||||
|
grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, |
||||||
|
CFWriteStreamRef write_stream, |
||||||
|
const char* peer_string, |
||||||
|
grpc_resource_quota* resource_quota, |
||||||
|
CFStreamSync* stream_sync); |
||||||
|
|
||||||
|
#endif /* GRPC_CFSTREAM */ |
||||||
|
|
||||||
|
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H */ |
@ -0,0 +1,330 @@ |
|||||||
|
/* |
||||||
|
* |
||||||
|
* Copyright 2018 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
#include "src/core/lib/iomgr/port.h" |
||||||
|
|
||||||
|
#ifdef GRPC_CFSTREAM_TCP |
||||||
|
|
||||||
|
#import <Foundation/Foundation.h> |
||||||
|
#import "src/core/lib/iomgr/tcp_cfstream.h" |
||||||
|
|
||||||
|
#include <grpc/slice_buffer.h> |
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
#include <grpc/support/string_util.h> |
||||||
|
|
||||||
|
#include "src/core/lib/gpr/string.h" |
||||||
|
#include "src/core/lib/iomgr/closure.h" |
||||||
|
#include "src/core/lib/iomgr/endpoint.h" |
||||||
|
#include "src/core/lib/iomgr/error_apple.h" |
||||||
|
#include "src/core/lib/iomgr/tcp_cfstream_sync.h" |
||||||
|
#include "src/core/lib/slice/slice_internal.h" |
||||||
|
#include "src/core/lib/slice/slice_string_helpers.h" |
||||||
|
|
||||||
|
extern grpc_core::TraceFlag grpc_tcp_trace; |
||||||
|
|
||||||
|
typedef struct { |
||||||
|
grpc_endpoint base; |
||||||
|
gpr_refcount refcount; |
||||||
|
|
||||||
|
CFReadStreamRef read_stream; |
||||||
|
CFWriteStreamRef write_stream; |
||||||
|
CFStreamSync* stream_sync; |
||||||
|
|
||||||
|
grpc_closure* read_cb; |
||||||
|
grpc_closure* write_cb; |
||||||
|
grpc_slice_buffer* read_slices; |
||||||
|
grpc_slice_buffer* write_slices; |
||||||
|
|
||||||
|
grpc_closure read_action; |
||||||
|
grpc_closure write_action; |
||||||
|
CFStreamEventType read_type; |
||||||
|
|
||||||
|
char* peer_string; |
||||||
|
grpc_resource_user* resource_user; |
||||||
|
grpc_resource_user_slice_allocator slice_allocator; |
||||||
|
} CFStreamTCP; |
||||||
|
|
||||||
|
static void TCPFree(CFStreamTCP* tcp) { |
||||||
|
grpc_resource_user_unref(tcp->resource_user); |
||||||
|
CFRelease(tcp->read_stream); |
||||||
|
CFRelease(tcp->write_stream); |
||||||
|
CFSTREAM_SYNC_UNREF(tcp->stream_sync, "free"); |
||||||
|
gpr_free(tcp->peer_string); |
||||||
|
gpr_free(tcp); |
||||||
|
} |
||||||
|
|
||||||
|
#ifndef NDEBUG |
||||||
|
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) |
||||||
|
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) |
||||||
|
static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file, int line) { |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); |
||||||
|
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, |
||||||
|
reason, val, val - 1); |
||||||
|
} |
||||||
|
if (gpr_unref(&tcp->refcount)) { |
||||||
|
TCPFree(tcp); |
||||||
|
} |
||||||
|
} |
||||||
|
static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file, int line) { |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); |
||||||
|
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, |
||||||
|
reason, val, val + 1); |
||||||
|
} |
||||||
|
gpr_ref(&tcp->refcount); |
||||||
|
} |
||||||
|
#else |
||||||
|
#define TCP_REF(tcp, reason) tcp_ref((tcp)) |
||||||
|
#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) |
||||||
|
static void tcp_unref(CFStreamTCP* tcp) { |
||||||
|
if (gpr_unref(&tcp->refcount)) { |
||||||
|
tcp_free(tcp); |
||||||
|
} |
||||||
|
} |
||||||
|
static void tcp_ref(CFStreamTCP* tcp) { gpr_ref(&tcp->refcount); } |
||||||
|
#endif |
||||||
|
|
||||||
|
static grpc_error* TCPAnnotateError(grpc_error* src_error, CFStreamTCP* tcp) { |
||||||
|
return grpc_error_set_str( |
||||||
|
grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), |
||||||
|
GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(tcp->peer_string)); |
||||||
|
} |
||||||
|
|
||||||
|
static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) { |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb, tcp->read_cb->cb, |
||||||
|
tcp->read_cb->cb_arg); |
||||||
|
size_t i; |
||||||
|
const char* str = grpc_error_string(error); |
||||||
|
gpr_log(GPR_DEBUG, "read: error=%s", str); |
||||||
|
|
||||||
|
for (i = 0; i < tcp->read_slices->count; i++) { |
||||||
|
char* dump = grpc_dump_slice(tcp->read_slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); |
||||||
|
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); |
||||||
|
gpr_free(dump); |
||||||
|
} |
||||||
|
} |
||||||
|
grpc_closure* cb = tcp->read_cb; |
||||||
|
tcp->read_cb = nullptr; |
||||||
|
tcp->read_slices = nullptr; |
||||||
|
GRPC_CLOSURE_RUN(cb, error); |
||||||
|
} |
||||||
|
|
||||||
|
static void CallWriteCB(CFStreamTCP* tcp, grpc_error* error) { |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb, tcp->write_cb->cb, |
||||||
|
tcp->write_cb->cb_arg); |
||||||
|
const char* str = grpc_error_string(error); |
||||||
|
gpr_log(GPR_DEBUG, "write: error=%s", str); |
||||||
|
} |
||||||
|
grpc_closure* cb = tcp->write_cb; |
||||||
|
tcp->write_cb = nullptr; |
||||||
|
tcp->write_slices = nullptr; |
||||||
|
GRPC_CLOSURE_RUN(cb, error); |
||||||
|
} |
||||||
|
|
||||||
|
static void ReadAction(void* arg, grpc_error* error) { |
||||||
|
CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg); |
||||||
|
GPR_ASSERT(tcp->read_cb != nullptr); |
||||||
|
if (error) { |
||||||
|
grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); |
||||||
|
CallReadCB(tcp, GRPC_ERROR_REF(error)); |
||||||
|
TCP_UNREF(tcp, "read"); |
||||||
|
return; |
||||||
|
} |
||||||
|
|
||||||
|
GPR_ASSERT(tcp->read_slices->count == 1); |
||||||
|
grpc_slice slice = tcp->read_slices->slices[0]; |
||||||
|
size_t len = GRPC_SLICE_LENGTH(slice); |
||||||
|
CFIndex read_size = CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len); |
||||||
|
if (read_size == -1) { |
||||||
|
grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); |
||||||
|
CFErrorRef stream_error = CFReadStreamCopyError(tcp->read_stream); |
||||||
|
CallReadCB(tcp, |
||||||
|
TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), tcp)); |
||||||
|
CFRelease(stream_error); |
||||||
|
TCP_UNREF(tcp, "read"); |
||||||
|
} else if (read_size == 0) { |
||||||
|
grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); |
||||||
|
CallReadCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); |
||||||
|
TCP_UNREF(tcp, "read"); |
||||||
|
} else { |
||||||
|
if (read_size < len) { |
||||||
|
grpc_slice_buffer_trim_end(tcp->read_slices, len - read_size, nullptr); |
||||||
|
} |
||||||
|
CallReadCB(tcp, GRPC_ERROR_NONE); |
||||||
|
TCP_UNREF(tcp, "read"); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
static void WriteAction(void* arg, grpc_error* error) { |
||||||
|
CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg); |
||||||
|
GPR_ASSERT(tcp->write_cb != nullptr); |
||||||
|
if (error) { |
||||||
|
grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices); |
||||||
|
CallWriteCB(tcp, GRPC_ERROR_REF(error)); |
||||||
|
TCP_UNREF(tcp, "write"); |
||||||
|
return; |
||||||
|
} |
||||||
|
|
||||||
|
grpc_slice slice = grpc_slice_buffer_take_first(tcp->write_slices); |
||||||
|
size_t slice_len = GRPC_SLICE_LENGTH(slice); |
||||||
|
CFIndex write_size = |
||||||
|
CFWriteStreamWrite(tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); |
||||||
|
if (write_size == -1) { |
||||||
|
grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices); |
||||||
|
CFErrorRef stream_error = CFWriteStreamCopyError(tcp->write_stream); |
||||||
|
CallWriteCB( |
||||||
|
tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), tcp)); |
||||||
|
CFRelease(stream_error); |
||||||
|
TCP_UNREF(tcp, "write"); |
||||||
|
} else { |
||||||
|
if (write_size < GRPC_SLICE_LENGTH(slice)) { |
||||||
|
grpc_slice_buffer_undo_take_first(tcp->write_slices, |
||||||
|
grpc_slice_sub(slice, write_size, slice_len)); |
||||||
|
} |
||||||
|
if (tcp->write_slices->length > 0) { |
||||||
|
tcp->stream_sync->NotifyOnWrite(&tcp->write_action); |
||||||
|
} else { |
||||||
|
CallWriteCB(tcp, GRPC_ERROR_NONE); |
||||||
|
TCP_UNREF(tcp, "write"); |
||||||
|
} |
||||||
|
|
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size); |
||||||
|
char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); |
||||||
|
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, dump); |
||||||
|
gpr_free(dump); |
||||||
|
grpc_slice_unref(trace_slice); |
||||||
|
} |
||||||
|
} |
||||||
|
grpc_slice_unref(slice); |
||||||
|
} |
||||||
|
|
||||||
|
static void TCPReadAllocationDone(void* arg, grpc_error* error) { |
||||||
|
CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg); |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
tcp->stream_sync->NotifyOnRead(&tcp->read_action); |
||||||
|
} else { |
||||||
|
grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); |
||||||
|
CallReadCB(tcp, error); |
||||||
|
TCP_UNREF(tcp, "read"); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb) { |
||||||
|
CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb, slices->length); |
||||||
|
} |
||||||
|
GPR_ASSERT(tcp->read_cb == nullptr); |
||||||
|
tcp->read_cb = cb; |
||||||
|
tcp->read_slices = slices; |
||||||
|
grpc_slice_buffer_reset_and_unref_internal(slices); |
||||||
|
grpc_resource_user_alloc_slices(&tcp->slice_allocator, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, |
||||||
|
tcp->read_slices); |
||||||
|
TCP_REF(tcp, "read"); |
||||||
|
} |
||||||
|
|
||||||
|
static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb) { |
||||||
|
CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb, slices->length); |
||||||
|
} |
||||||
|
GPR_ASSERT(tcp->write_cb == nullptr); |
||||||
|
tcp->write_cb = cb; |
||||||
|
tcp->write_slices = slices; |
||||||
|
TCP_REF(tcp, "write"); |
||||||
|
tcp->stream_sync->NotifyOnWrite(&tcp->write_action); |
||||||
|
} |
||||||
|
|
||||||
|
void TCPShutdown(grpc_endpoint* ep, grpc_error* why) { |
||||||
|
CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "tcp:%p shutdown (%p)", tcp, why); |
||||||
|
} |
||||||
|
CFReadStreamClose(tcp->read_stream); |
||||||
|
CFWriteStreamClose(tcp->write_stream); |
||||||
|
tcp->stream_sync->Shutdown(why); |
||||||
|
grpc_resource_user_shutdown(tcp->resource_user); |
||||||
|
} |
||||||
|
|
||||||
|
void TCPDestroy(grpc_endpoint* ep) { |
||||||
|
CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "tcp:%p destroy", tcp); |
||||||
|
} |
||||||
|
TCP_UNREF(tcp, "destroy"); |
||||||
|
} |
||||||
|
|
||||||
|
grpc_resource_user* TCPGetResourceUser(grpc_endpoint* ep) { |
||||||
|
CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); |
||||||
|
return tcp->resource_user; |
||||||
|
} |
||||||
|
|
||||||
|
char* TCPGetPeer(grpc_endpoint* ep) { |
||||||
|
CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); |
||||||
|
return gpr_strdup(tcp->peer_string); |
||||||
|
} |
||||||
|
|
||||||
|
int TCPGetFD(grpc_endpoint* ep) { return 0; } |
||||||
|
|
||||||
|
void TCPAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} |
||||||
|
void TCPAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} |
||||||
|
void TCPDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} |
||||||
|
|
||||||
|
static const grpc_endpoint_vtable vtable = { |
||||||
|
TCPRead, TCPWrite, TCPAddToPollset, TCPAddToPollsetSet, TCPDeleteFromPollsetSet, |
||||||
|
TCPShutdown, TCPDestroy, TCPGetResourceUser, TCPGetPeer, TCPGetFD}; |
||||||
|
|
||||||
|
grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, CFWriteStreamRef write_stream, |
||||||
|
const char* peer_string, grpc_resource_quota* resource_quota, |
||||||
|
CFStreamSync* stream_sync) { |
||||||
|
CFStreamTCP* tcp = static_cast<CFStreamTCP*>(gpr_malloc(sizeof(CFStreamTCP))); |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp, read_stream, |
||||||
|
write_stream); |
||||||
|
} |
||||||
|
tcp->base.vtable = &vtable; |
||||||
|
gpr_ref_init(&tcp->refcount, 1); |
||||||
|
tcp->read_stream = read_stream; |
||||||
|
tcp->write_stream = write_stream; |
||||||
|
CFRetain(read_stream); |
||||||
|
CFRetain(write_stream); |
||||||
|
tcp->stream_sync = stream_sync; |
||||||
|
CFSTREAM_SYNC_REF(tcp->stream_sync, "endpoint create"); |
||||||
|
|
||||||
|
tcp->peer_string = gpr_strdup(peer_string); |
||||||
|
tcp->read_cb = nil; |
||||||
|
tcp->write_cb = nil; |
||||||
|
tcp->read_slices = nil; |
||||||
|
tcp->write_slices = nil; |
||||||
|
GRPC_CLOSURE_INIT(&tcp->read_action, ReadAction, static_cast<void*>(tcp), |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
GRPC_CLOSURE_INIT(&tcp->write_action, WriteAction, static_cast<void*>(tcp), |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); |
||||||
|
grpc_resource_user_slice_allocator_init(&tcp->slice_allocator, tcp->resource_user, |
||||||
|
TCPReadAllocationDone, tcp); |
||||||
|
|
||||||
|
return &tcp->base; |
||||||
|
} |
||||||
|
|
||||||
|
#endif /* GRPC_CFSTREAM_TCP */ |
@ -0,0 +1,203 @@ |
|||||||
|
|
||||||
|
/* |
||||||
|
* |
||||||
|
* Copyright 2018 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
#include "src/core/lib/iomgr/port.h" |
||||||
|
|
||||||
|
#ifdef GRPC_CFSTREAM_TCP_CLIENT |
||||||
|
|
||||||
|
#include <Foundation/Foundation.h> |
||||||
|
|
||||||
|
#include <string.h> |
||||||
|
|
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
#include <grpc/support/sync.h> |
||||||
|
|
||||||
|
#include <netinet/in.h> |
||||||
|
|
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/gpr/host_port.h" |
||||||
|
#include "src/core/lib/iomgr/closure.h" |
||||||
|
#include "src/core/lib/iomgr/error.h" |
||||||
|
#include "src/core/lib/iomgr/error_apple.h" |
||||||
|
#include "src/core/lib/iomgr/sockaddr_utils.h" |
||||||
|
#include "src/core/lib/iomgr/tcp_cfstream.h" |
||||||
|
#include "src/core/lib/iomgr/tcp_cfstream_sync.h" |
||||||
|
#include "src/core/lib/iomgr/tcp_client.h" |
||||||
|
#include "src/core/lib/iomgr/timer.h" |
||||||
|
|
||||||
|
extern grpc_core::TraceFlag grpc_tcp_trace; |
||||||
|
|
||||||
|
typedef struct CFStreamTCPConnect { |
||||||
|
gpr_mu mu; |
||||||
|
gpr_refcount refcount; |
||||||
|
|
||||||
|
CFReadStreamRef read_stream; |
||||||
|
CFWriteStreamRef write_stream; |
||||||
|
CFStreamSync* stream_sync; |
||||||
|
|
||||||
|
grpc_timer alarm; |
||||||
|
grpc_closure on_alarm; |
||||||
|
grpc_closure on_open; |
||||||
|
|
||||||
|
bool read_stream_open; |
||||||
|
bool write_stream_open; |
||||||
|
bool failed; |
||||||
|
|
||||||
|
grpc_closure* closure; |
||||||
|
grpc_endpoint** endpoint; |
||||||
|
int refs; |
||||||
|
char* addr_name; |
||||||
|
grpc_resource_quota* resource_quota; |
||||||
|
} CFStreamTCPConnect; |
||||||
|
|
||||||
|
static void TCPConnectCleanup(CFStreamTCPConnect* connect) { |
||||||
|
grpc_resource_quota_unref_internal(connect->resource_quota); |
||||||
|
CFSTREAM_SYNC_UNREF(connect->stream_sync, "async connect clean up"); |
||||||
|
CFRelease(connect->read_stream); |
||||||
|
CFRelease(connect->write_stream); |
||||||
|
gpr_mu_destroy(&connect->mu); |
||||||
|
gpr_free(connect->addr_name); |
||||||
|
gpr_free(connect); |
||||||
|
} |
||||||
|
|
||||||
|
static void OnAlarm(void* arg, grpc_error* error) { |
||||||
|
CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg); |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error); |
||||||
|
} |
||||||
|
gpr_mu_lock(&connect->mu); |
||||||
|
grpc_closure* closure = connect->closure; |
||||||
|
connect->closure = nil; |
||||||
|
const bool done = (--connect->refs == 0); |
||||||
|
gpr_mu_unlock(&connect->mu); |
||||||
|
// Only schedule a callback once, by either on_timer or on_connected. The first one issues |
||||||
|
// callback while the second one does cleanup. |
||||||
|
if (done) { |
||||||
|
TCPConnectCleanup(connect); |
||||||
|
} else { |
||||||
|
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); |
||||||
|
GRPC_CLOSURE_SCHED(closure, error); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
static void OnOpen(void* arg, grpc_error* error) { |
||||||
|
CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg); |
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error); |
||||||
|
} |
||||||
|
gpr_mu_lock(&connect->mu); |
||||||
|
grpc_timer_cancel(&connect->alarm); |
||||||
|
grpc_closure* closure = connect->closure; |
||||||
|
connect->closure = nil; |
||||||
|
|
||||||
|
bool done = (--connect->refs == 0); |
||||||
|
grpc_endpoint** endpoint = connect->endpoint; |
||||||
|
|
||||||
|
if (done) { |
||||||
|
gpr_mu_unlock(&connect->mu); |
||||||
|
TCPConnectCleanup(connect); |
||||||
|
} else { |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream); |
||||||
|
if (stream_error == NULL) { |
||||||
|
stream_error = CFWriteStreamCopyError(connect->write_stream); |
||||||
|
} |
||||||
|
if (stream_error) { |
||||||
|
error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error"); |
||||||
|
CFRelease(stream_error); |
||||||
|
} |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
*endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream, connect->addr_name, |
||||||
|
connect->resource_quota, connect->stream_sync); |
||||||
|
} |
||||||
|
} |
||||||
|
gpr_mu_unlock(&connect->mu); |
||||||
|
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
static void ParseResolvedAddress(const grpc_resolved_address* addr, CFStringRef* host, int* port) { |
||||||
|
char *host_port, *host_string, *port_string; |
||||||
|
grpc_sockaddr_to_string(&host_port, addr, 1); |
||||||
|
gpr_split_host_port(host_port, &host_string, &port_string); |
||||||
|
*host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8); |
||||||
|
gpr_free(host_string); |
||||||
|
gpr_free(port_string); |
||||||
|
gpr_free(host_port); |
||||||
|
*port = grpc_sockaddr_get_port(addr); |
||||||
|
} |
||||||
|
|
||||||
|
static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, |
||||||
|
grpc_pollset_set* interested_parties, |
||||||
|
const grpc_channel_args* channel_args, |
||||||
|
const grpc_resolved_address* resolved_addr, grpc_millis deadline) { |
||||||
|
CFStreamTCPConnect* connect; |
||||||
|
|
||||||
|
connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect)); |
||||||
|
connect->closure = closure; |
||||||
|
connect->endpoint = ep; |
||||||
|
connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); |
||||||
|
// connect->resource_quota = resource_quota; |
||||||
|
connect->refs = 2; // One for the connect operation, one for the timer. |
||||||
|
gpr_ref_init(&connect->refcount, 1); |
||||||
|
gpr_mu_init(&connect->mu); |
||||||
|
|
||||||
|
if (grpc_tcp_trace.enabled()) { |
||||||
|
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", connect->addr_name); |
||||||
|
} |
||||||
|
|
||||||
|
grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); |
||||||
|
if (channel_args != NULL) { |
||||||
|
for (size_t i = 0; i < channel_args->num_args; i++) { |
||||||
|
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { |
||||||
|
grpc_resource_quota_unref_internal(resource_quota); |
||||||
|
resource_quota = grpc_resource_quota_ref_internal( |
||||||
|
(grpc_resource_quota*)channel_args->args[i].value.pointer.p); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
connect->resource_quota = resource_quota; |
||||||
|
|
||||||
|
CFReadStreamRef read_stream; |
||||||
|
CFWriteStreamRef write_stream; |
||||||
|
|
||||||
|
CFStringRef host; |
||||||
|
int port; |
||||||
|
ParseResolvedAddress(resolved_addr, &host, &port); |
||||||
|
CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, &write_stream); |
||||||
|
CFRelease(host); |
||||||
|
connect->read_stream = read_stream; |
||||||
|
connect->write_stream = write_stream; |
||||||
|
connect->stream_sync = CFStreamSync::CreateStreamSync(read_stream, write_stream); |
||||||
|
GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect), |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
connect->stream_sync->NotifyOnOpen(&connect->on_open); |
||||||
|
GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, grpc_schedule_on_exec_ctx); |
||||||
|
gpr_mu_lock(&connect->mu); |
||||||
|
CFReadStreamOpen(read_stream); |
||||||
|
CFWriteStreamOpen(write_stream); |
||||||
|
grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); |
||||||
|
gpr_mu_unlock(&connect->mu); |
||||||
|
} |
||||||
|
|
||||||
|
grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {TCPClientConnect}; |
||||||
|
|
||||||
|
#endif /* GRPC_CFSTREAM_TCP_CLIENT */ |
Loading…
Reference in new issue