Merge pull request #13450 from daniel-j-born/tcp_client

Refactor POSIX TCP client connect.
pull/13753/head
Mark D. Roth 7 years ago committed by GitHub
commit dcb9493e26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 101
      src/core/lib/iomgr/tcp_client_posix.cc
  2. 37
      src/core/lib/iomgr/tcp_client_posix.h
  3. 21
      test/core/end2end/dualstack_socket_test.cc

@ -236,65 +236,68 @@ finish:
GRPC_CLOSURE_SCHED(closure, error);
}
static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_millis deadline) {
int fd;
grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_resolved_address* mapped_addr,
grpc_fd** fdobj) {
grpc_dualstack_mode dsmode;
int err;
async_connect* ac;
grpc_resolved_address addr6_v4mapped;
grpc_resolved_address addr4_copy;
grpc_fd* fdobj;
int fd;
grpc_error* error;
char* name;
char* addr_str;
grpc_error* error;
*ep = nullptr;
/* Use dualstack sockets where available. */
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = &addr6_v4mapped;
*fdobj = nullptr;
/* Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
v6. */
if (!grpc_sockaddr_to_v4mapped(addr, mapped_addr)) {
/* addr is v4 mapped to v6 or v6. */
memcpy(mapped_addr, addr, sizeof(*mapped_addr));
}
error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
error =
grpc_create_dualstack_socket(mapped_addr, SOCK_STREAM, 0, &dsmode, &fd);
if (error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(closure, error);
return;
return error;
}
if (dsmode == GRPC_DSMODE_IPV4) {
/* If we got an AF_INET socket, map the address back to IPv4. */
GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy));
addr = &addr4_copy;
/* Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4. */
if (!grpc_sockaddr_is_v4mapped(addr, mapped_addr)) {
memcpy(mapped_addr, addr, sizeof(*mapped_addr));
}
}
if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(closure, error);
return;
if ((error = prepare_socket(mapped_addr, fd, channel_args)) !=
GRPC_ERROR_NONE) {
return error;
}
addr_str = grpc_sockaddr_to_uri(mapped_addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
*fdobj = grpc_fd_create(fd, name);
gpr_free(name);
gpr_free(addr_str);
return GRPC_ERROR_NONE;
}
void grpc_tcp_client_create_from_prepared_fd(
grpc_pollset_set* interested_parties, grpc_closure* closure, grpc_fd* fdobj,
const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
grpc_millis deadline, grpc_endpoint** ep) {
const int fd = grpc_fd_wrapped_fd(fdobj);
int err;
async_connect* ac;
do {
GPR_ASSERT(addr->len < ~(socklen_t)0);
err = connect(fd, (const struct sockaddr*)addr->addr, (socklen_t)addr->len);
} while (err < 0 && errno == EINTR);
addr_str = grpc_sockaddr_to_uri(addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
fdobj = grpc_fd_create(fd, name);
if (err >= 0) {
char* addr_str = grpc_sockaddr_to_uri(addr);
*ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_str);
gpr_free(addr_str);
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
goto done;
return;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(fdobj, nullptr, nullptr, false /* already_closed */,
"tcp_client_connect_error");
GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
return;
}
grpc_pollset_set_add_fd(interested_parties, fdobj);
@ -304,8 +307,7 @@ static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep,
ac->ep = ep;
ac->fd = fdobj;
ac->interested_parties = interested_parties;
ac->addr_str = addr_str;
addr_str = nullptr;
ac->addr_str = grpc_sockaddr_to_uri(addr);
gpr_mu_init(&ac->mu);
ac->refs = 2;
GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
@ -322,10 +324,25 @@ static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep,
grpc_timer_init(&ac->alarm, deadline, &ac->on_alarm);
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);
}
done:
gpr_free(name);
gpr_free(addr_str);
static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_millis deadline) {
grpc_resolved_address mapped_addr;
grpc_fd* fdobj = nullptr;
grpc_error* error;
*ep = nullptr;
if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
&fdobj)) != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(closure, error);
return;
}
grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fdobj,
channel_args, &mapped_addr, deadline,
ep);
}
// overridden by api_fuzzer.c

@ -23,7 +23,44 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/tcp_client.h"
/* Create an endpoint from a connected grpc_fd.
fd: a connected FD. Ownership is taken.
channel_args: may contain custom settings for the endpoint
addr_str: destination address in printable format
Returns: a new endpoint
*/
grpc_endpoint* grpc_tcp_client_create_from_fd(
grpc_fd* fd, const grpc_channel_args* channel_args, const char* addr_str);
/* Return a configured, unbound, unconnected TCP client grpc_fd.
channel_args: may contain custom settings for the fd
addr: the destination address
mapped_addr: out parameter. addr mapped to an address appropriate to the
type of socket FD created. For example, if addr is IPv4 and dual stack
sockets are available, mapped_addr will be an IPv4-mapped IPv6 address
fdobj: out parameter. The new FD
Returns: error, if any. Out parameters are not set on error
*/
grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_resolved_address* mapped_addr,
grpc_fd** fdobj);
/* Connect a configured TCP client grpc_fd.
interested_parties: a set of pollsets that would be interested in this
connection being established (in order to continue their work
closure: called when complete. On success, *ep will be set.
fdobj: an FD returned from grpc_tcp_client_prepare_fd(). Ownership is taken
channel_args: may contain custom settings for the endpoint
deadline: connection deadline
ep: out parameter. Set before closure is called if successful
*/
void grpc_tcp_client_create_from_prepared_fd(
grpc_pollset_set* interested_parties, grpc_closure* closure, grpc_fd* fdobj,
const grpc_channel_args* channel_args, const grpc_resolved_address* addr,
grpc_millis deadline, grpc_endpoint** ep);
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */

@ -29,7 +29,9 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@ -54,6 +56,21 @@ static void drain_cq(grpc_completion_queue* cq) {
static void do_nothing(void* ignored) {}
static void log_resolved_addrs(const char* label, const char* hostname) {
grpc_resolved_addresses* res = nullptr;
grpc_error* error = grpc_blocking_resolve_address(hostname, "80", &res);
if (error != GRPC_ERROR_NONE || res == nullptr) {
GRPC_LOG_IF_ERROR(hostname, error);
return;
}
for (size_t i = 0; i < res->naddrs; ++i) {
char* addr_str = grpc_sockaddr_to_uri(&res->addrs[i]);
gpr_log(GPR_INFO, "%s: %s", label, addr_str);
gpr_free(addr_str);
}
grpc_resolved_addresses_destroy(res);
}
void test_connect(const char* server_host, const char* client_host, int port,
int expect_ok) {
char* client_hostport;
@ -140,6 +157,8 @@ void test_connect(const char* server_host, const char* client_host, int port,
gpr_log(GPR_INFO, "Testing with server=%s client=%s (expecting %s)",
server_hostport, client_hostport, expect_ok ? "success" : "failure");
log_resolved_addrs("server resolved addr", server_host);
log_resolved_addrs("client resolved addr", client_host);
gpr_free(client_hostport);
gpr_free(server_hostport);
@ -236,6 +255,8 @@ void test_connect(const char* server_host, const char* client_host, int port,
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
gpr_log(GPR_INFO, "status: %d (expected: %d)", status,
GRPC_STATUS_UNAVAILABLE);
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
}

Loading…
Cancel
Save