Merge pull request #4627 from daniel-j-born/tcp_fd

Expose the fd in grpc_fd and grpc_tcp.
pull/4647/head
Craig Tiller 9 years ago
commit fdd02259b5
  1. 9
      src/core/iomgr/fd_posix.c
  2. 3
      src/core/iomgr/fd_posix.h
  3. 6
      src/core/iomgr/tcp_posix.c
  4. 6
      src/core/iomgr/tcp_posix.h
  5. 4
      test/core/iomgr/tcp_posix_test.c

@ -101,6 +101,7 @@ static grpc_fd *alloc_fd(int fd) {
r->read_watcher = r->write_watcher = NULL; r->read_watcher = r->write_watcher = NULL;
r->on_done_closure = NULL; r->on_done_closure = NULL;
r->closed = 0; r->closed = 0;
r->released = 0;
return r; return r;
} }
@ -210,6 +211,14 @@ static int has_watchers(grpc_fd *fd) {
fd->inactive_watcher_root.next != &fd->inactive_watcher_root; fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
} }
int grpc_fd_wrapped_fd(grpc_fd *fd) {
if (fd->released || fd->closed) {
return -1;
} else {
return fd->fd;
}
}
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason) { int *release_fd, const char *reason) {
fd->on_done_closure = on_done; fd->on_done_closure = on_done;

@ -105,6 +105,9 @@ struct grpc_fd {
This takes ownership of closing fd. */ This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd, const char *name); grpc_fd *grpc_fd_create(int fd, const char *name);
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd *fd);
/* Releases fd to be asynchronously destroyed. /* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d. on_done is called when the underlying file descriptor is definitely close()d.
If on_done is NULL, no callback will be made. If on_done is NULL, no callback will be made.

@ -473,6 +473,12 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
return &tcp->base; return &tcp->base;
} }
int grpc_tcp_fd(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(ep->vtable == &vtable);
return grpc_fd_wrapped_fd(tcp->em_fd);
}
void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int *fd, grpc_closure *done) { int *fd, grpc_closure *done) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;

@ -56,6 +56,12 @@ extern int grpc_tcp_trace;
grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size, grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size,
const char *peer_string); const char *peer_string);
/* Return the tcp endpoint's fd, or -1 if this is not available. Does not
release the fd.
Requires: ep must be a tcp endpoint.
*/
int grpc_tcp_fd(grpc_endpoint *ep);
/* Destroy the tcp endpoint without closing its fd. *fd will be set and done /* Destroy the tcp endpoint without closing its fd. *fd will be set and done
* will be called when the endpoint is destroyed. * will be called when the endpoint is destroyed.
* Requires: ep must be a tcp endpoint and fd must not be NULL. */ * Requires: ep must be a tcp endpoint and fd must not be NULL. */

@ -389,7 +389,8 @@ void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, int success) {
grpc_pollset_kick(&g_pollset, NULL); grpc_pollset_kick(&g_pollset, NULL);
} }
/* Do a read_test, then release fd and try to read/write again. */ /* Do a read_test, then release fd and try to read/write again. Verify that
grpc_tcp_fd() is available before the fd is released. */
static void release_fd_test(size_t num_bytes, size_t slice_size) { static void release_fd_test(size_t num_bytes, size_t slice_size) {
int sv[2]; int sv[2];
grpc_endpoint *ep; grpc_endpoint *ep;
@ -408,6 +409,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv); create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes); written_bytes = fill_socket_partial(sv[0], num_bytes);

Loading…
Cancel
Save