Merge pull request #4157 from nicolasnoble/paddywhack

Adding grpc_tcp_listener as an actual object being returned.
pull/4159/merge
Craig Tiller 9 years ago
commit 4dd8f43970
  1. 20
      src/core/iomgr/tcp_server.h
  2. 150
      src/core/iomgr/tcp_server_posix.c
  3. 121
      src/core/iomgr/tcp_server_windows.c
  4. 4
      src/core/security/server_secure_chttp2.c
  5. 4
      src/core/surface/server_chttp2.c
  6. 4
      test/core/util/reconnect_server.c

@ -39,6 +39,9 @@
/* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server;
/* Forward decl of grpc_tcp_listener */
typedef struct grpc_tcp_listener grpc_tcp_listener;
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep);
@ -51,19 +54,18 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, void *cb_arg);
/* Add a port to the server, returning port number on success, or negative
on failure.
/* Add a port to the server, returning the newly created listener on success,
or a null pointer on failure.
The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
both IPv4 and IPv6 connections, but :: is the preferred style. This usually
creates one socket, but possibly two on systems which support IPv6,
but not dualstack sockets.
For raw access to the underlying sockets, see grpc_tcp_server_get_fd(). */
but not dualstack sockets. */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
size_t addr_len);
grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
const void *addr,
size_t addr_len);
/* Returns the file descriptor of the Nth listening socket on this server,
or -1 if the index is out of bounds.
@ -75,4 +77,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index);
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_closure *closure);
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener);
void grpc_tcp_listener_ref(grpc_tcp_listener *listener);
void grpc_tcp_listener_unref(grpc_tcp_listener *listener);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */

@ -67,14 +67,13 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#define INIT_PORT_CAP 2
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
static gpr_once s_init_max_accept_queue_size;
static int s_max_accept_queue_size;
/* one listening port */
typedef struct {
struct grpc_tcp_listener {
int fd;
grpc_fd *emfd;
grpc_tcp_server *server;
@ -84,9 +83,18 @@ typedef struct {
struct sockaddr_un un;
} addr;
size_t addr_len;
int port;
grpc_closure read_closure;
grpc_closure destroyed_closure;
} server_port;
gpr_refcount refs;
struct grpc_tcp_listener *next;
/* When we add a listener, more than one can be created, mainly because of
IPv6. A sibling will still be in the normal list, but will be flagged
as such. Any action, such as ref or unref, will affect all of the
siblings in the list. */
struct grpc_tcp_listener *sibling;
int is_sibling;
};
static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
struct stat st;
@ -112,10 +120,9 @@ struct grpc_tcp_server {
/* is this server shutting down? (boolean) */
int shutdown;
/* all listening ports */
server_port *ports;
size_t nports;
size_t port_capacity;
/* linked list of server ports */
grpc_tcp_listener *head;
unsigned nports;
/* shutdown callback */
grpc_closure *shutdown_complete;
@ -134,9 +141,8 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->shutdown = 0;
s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->head = NULL;
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
return s;
}
@ -145,7 +151,12 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_mu_destroy(&s->mu);
gpr_free(s->ports);
while (s->head) {
grpc_tcp_listener *sp = s->head;
s->head = sp->next;
grpc_tcp_listener_unref(sp);
}
gpr_free(s);
}
@ -166,8 +177,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) {
events will be received on them - at this point it's safe to destroy
things */
static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
size_t i;
/* delete ALL the things */
gpr_mu_lock(&s->mu);
@ -176,9 +185,9 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
return;
}
if (s->nports) {
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
if (s->head) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
if (sp->addr.sockaddr.sa_family == AF_UNIX) {
unlink_if_unix_domain_socket(&sp->addr.un);
}
@ -196,7 +205,6 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_closure *closure) {
size_t i;
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
@ -206,8 +214,9 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
/* shutdown all fd's */
if (s->active_ports) {
for (i = 0; i < s->nports; i++) {
grpc_fd_shutdown(exec_ctx, s->ports[i].emfd);
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(exec_ctx, sp->emfd);
}
gpr_mu_unlock(&s->mu);
} else {
@ -298,7 +307,7 @@ error:
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
server_port *sp = arg;
grpc_tcp_listener *sp = arg;
grpc_fd *fdobj;
size_t i;
@ -364,9 +373,10 @@ error:
}
}
static int add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr, size_t addr_len) {
server_port *sp;
static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr,
size_t addr_len) {
grpc_tcp_listener *sp = NULL;
int port;
char *addr_str;
char *name;
@ -376,32 +386,35 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
gpr_mu_lock(&s->mu);
s->nports++;
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
/* append it to the list under a lock */
if (s->nports == s->port_capacity) {
s->port_capacity *= 2;
s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity);
}
sp = &s->ports[s->nports++];
sp = gpr_malloc(sizeof(grpc_tcp_listener));
sp->next = s->head;
s->head = sp;
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->port = port;
sp->is_sibling = 0;
sp->sibling = NULL;
gpr_ref_init(&sp->refs, 1);
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(addr_str);
gpr_free(name);
}
return port;
return sp;
}
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
size_t addr_len) {
int allocated_port1 = -1;
int allocated_port2 = -1;
unsigned i;
grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
const void *addr,
size_t addr_len) {
int allocated_port = -1;
grpc_tcp_listener *sp;
grpc_tcp_listener *sp2 = NULL;
int fd;
grpc_dualstack_mode dsmode;
struct sockaddr_in6 addr6_v4mapped;
@ -420,9 +433,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
for (i = 0; i < s->nports; i++) {
for (sp = s->head; sp; sp = sp->next) {
sockname_len = sizeof(sockname_temp);
if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp,
if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp,
&sockname_len)) {
port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
if (port > 0) {
@ -436,6 +449,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
}
}
sp = NULL;
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = (const struct sockaddr *)&addr6_v4mapped;
addr_len = sizeof(addr6_v4mapped);
@ -449,14 +464,16 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
allocated_port1 = add_socket_to_server(s, fd, addr, addr_len);
sp = add_socket_to_server(s, fd, addr, addr_len);
allocated_port = sp->port;
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
if (port == 0 && allocated_port1 > 0) {
grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1);
if (port == 0 && allocated_port > 0) {
grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port);
sp2 = sp;
}
addr = (struct sockaddr *)&wild4;
addr_len = sizeof(wild4);
@ -471,22 +488,31 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
allocated_port2 = add_socket_to_server(s, fd, addr, addr_len);
sp = add_socket_to_server(s, fd, addr, addr_len);
sp->sibling = sp2;
if (sp2) sp2->is_sibling = 1;
done:
gpr_free(allocated_addr);
return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
return sp;
}
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
return (port_index < s->nports) ? s->ports[port_index].fd : -1;
grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--);
if (port_index == 0 && sp) {
return sp->fd;
} else {
return -1;
}
}
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb,
void *on_accept_cb_arg) {
size_t i, j;
size_t i;
grpc_tcp_listener *sp;
GPR_ASSERT(on_accept_cb);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb);
@ -495,17 +521,41 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb_arg = on_accept_cb_arg;
s->pollsets = pollsets;
s->pollset_count = pollset_count;
for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) {
grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd);
for (sp = s->head; sp; sp = sp->next) {
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
s->ports[i].read_closure.cb = on_read;
s->ports[i].read_closure.cb_arg = &s->ports[i];
grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd,
&s->ports[i].read_closure);
sp->read_closure.cb = on_read;
sp->read_closure.cb_arg = sp;
grpc_fd_notify_on_read(exec_ctx, sp->emfd,
&sp->read_closure);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
}
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
return sp->port;
}
void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
gpr_ref(&sp->refs);
}
void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
if (sp->is_sibling) return;
if (gpr_unref(&sp->refs)) {
grpc_tcp_listener *sibling = sp->sibling;
while (sibling) {
sp = sibling;
sibling = sp->sibling;
gpr_free(sp);
}
gpr_free(listener);
}
}
#endif

@ -35,7 +35,8 @@
#ifdef GPR_WINSOCK_SOCKET
#define _GNU_SOURCE
#include <io.h>
#include "src/core/iomgr/sockaddr_utils.h"
#include <grpc/support/alloc.h>
@ -51,25 +52,29 @@
#include "src/core/iomgr/tcp_server.h"
#include "src/core/iomgr/tcp_windows.h"
#define INIT_PORT_CAP 2
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
/* one listening port */
typedef struct server_port {
struct grpc_tcp_listener {
/* This seemingly magic number comes from AcceptEx's documentation. each
address buffer needs to have at least 16 more bytes at their end. */
gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2];
/* This will hold the socket for the next accept. */
SOCKET new_socket;
/* The listener winsocked. */
/* The listener winsocket. */
grpc_winsocket *socket;
/* The actual TCP port number. */
int port;
grpc_tcp_server *server;
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
/* closure for socket notification of accept being ready */
grpc_closure on_accept;
} server_port;
gpr_refcount refs;
/* linked list */
struct grpc_tcp_listener *next;
};
/* the overall server */
struct grpc_tcp_server {
@ -82,10 +87,8 @@ struct grpc_tcp_server {
/* active port count: how many ports are actually still listening */
int active_ports;
/* all listening ports */
server_port *ports;
size_t nports;
size_t port_capacity;
/* linked list of server ports */
grpc_tcp_listener *head;
/* shutdown callback */
grpc_closure *shutdown_complete;
@ -99,9 +102,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->active_ports = 0;
s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
s->head = NULL;
s->shutdown_complete = NULL;
return s;
}
@ -109,26 +110,26 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
static void dont_care_about_shutdown_completion(void *arg) {}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
size_t i;
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
closed by the system. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
while (s->head) {
grpc_tcp_listener *sp = s->head;
s->head = sp->next;
sp->next = NULL;
grpc_winsocket_destroy(sp->socket);
grpc_tcp_listener_unref(sp);
}
gpr_free(s->ports);
gpr_free(s);
}
/* Public function. Stops and destroys a grpc_tcp_server. */
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_closure *shutdown_complete) {
size_t i;
int immediately_done = 0;
grpc_tcp_listener *sp;
gpr_mu_lock(&s->mu);
s->shutdown_complete = shutdown_complete;
@ -138,8 +139,7 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
if (s->active_ports == 0) {
immediately_done = 1;
}
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
for (sp = s->head; sp; sp = sp->next) {
sp->shutting_down = 1;
grpc_winsocket_shutdown(sp->socket);
}
@ -199,7 +199,7 @@ error:
}
static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
server_port *sp) {
grpc_tcp_listener *sp) {
int notify = 0;
sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu);
@ -216,7 +216,7 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
static void start_accept(grpc_exec_ctx *exec_ctx, server_port *port) {
static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET;
char *message;
char *utf8_message;
@ -276,7 +276,7 @@ failure:
/* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
server_port *sp = arg;
grpc_tcp_listener *sp = arg;
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
@ -351,16 +351,17 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
start_accept(exec_ctx, sp);
}
static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
const struct sockaddr *addr, size_t addr_len) {
server_port *sp;
static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
const struct sockaddr *addr,
size_t addr_len) {
grpc_tcp_listener *sp = NULL;
int port;
int status;
GUID guid = WSAID_ACCEPTEX;
DWORD ioctl_num_bytes;
LPFN_ACCEPTEX AcceptEx;
if (sock == INVALID_SOCKET) return -1;
if (sock == INVALID_SOCKET) return NULL;
/* We need to grab the AcceptEx pointer for that port, as it may be
interface-dependent. We'll cache it to avoid doing that again. */
@ -373,37 +374,35 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message);
closesocket(sock);
return -1;
return NULL;
}
port = prepare_socket(sock, addr, addr_len);
if (port >= 0) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
/* append it to the list under a lock */
if (s->nports == s->port_capacity) {
/* too many ports, and we need to store their address in a closure */
/* TODO(ctiller): make server_port a linked list */
abort();
}
sp = &s->ports[s->nports++];
sp = gpr_malloc(sizeof(grpc_tcp_listener));
sp->next = s->head;
s->head = sp;
sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
sp->port = port;
gpr_ref_init(&sp->refs, 1);
grpc_closure_init(&sp->on_accept, on_accept, sp);
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
}
return port;
return sp;
}
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
size_t addr_len) {
int allocated_port = -1;
unsigned i;
grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
const void *addr,
size_t addr_len) {
grpc_tcp_listener *sp;
SOCKET sock;
struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in6 wildcard;
@ -415,9 +414,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
for (i = 0; i < s->nports; i++) {
for (sp = s->head; sp; sp = sp->next) {
sockname_len = sizeof(sockname_temp);
if (0 == getsockname(s->ports[i].socket->socket,
if (0 == getsockname(sp->socket->socket,
(struct sockaddr *)&sockname_temp, &sockname_len)) {
port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
if (port > 0) {
@ -452,33 +451,55 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
gpr_free(utf8_message);
}
allocated_port = add_socket_to_server(s, sock, addr, addr_len);
sp = add_socket_to_server(s, sock, addr, addr_len);
gpr_free(allocated_addr);
return allocated_port;
return sp;
}
SOCKET
grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) {
return (index < s->nports) ? s->ports[index].socket->socket : INVALID_SOCKET;
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--);
if (port_index == 0 && sp) {
return _open_osfhandle(sp->socket->socket, 0);
} else {
return -1;
}
}
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_pollset **pollset, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb,
void *on_accept_cb_arg) {
size_t i;
grpc_tcp_listener *sp;
GPR_ASSERT(on_accept_cb);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb);
GPR_ASSERT(s->active_ports == 0);
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
for (i = 0; i < s->nports; i++) {
start_accept(exec_ctx, s->ports + i);
for (sp = s->head; sp; sp = sp->next) {
start_accept(exec_ctx, sp);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
}
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
return sp->port;
}
void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
gpr_ref(&sp->refs);
}
void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
if (gpr_unref(&sp->refs)) {
gpr_free(listener);
}
}
#endif /* GPR_WINSOCK_SOCKET */

@ -249,9 +249,11 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
}
for (i = 0; i < resolved->naddrs; i++) {
port_temp = grpc_tcp_server_add_port(
grpc_tcp_listener *listener;
listener = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len);
port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp >= 0) {
if (port_num == -1) {
port_num = port_temp;

@ -107,9 +107,11 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
}
for (i = 0; i < resolved->naddrs; i++) {
port_temp = grpc_tcp_server_add_port(
grpc_tcp_listener *listener;
listener = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len);
port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp >= 0) {
if (port_num == -1) {
port_num = port_temp;

@ -113,6 +113,7 @@ void reconnect_server_init(reconnect_server *server) {
void reconnect_server_start(reconnect_server *server, int port) {
struct sockaddr_in addr;
grpc_tcp_listener *listener;
int port_added;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -121,8 +122,9 @@ void reconnect_server_start(reconnect_server *server, int port) {
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
server->tcp_server = grpc_tcp_server_create();
port_added =
listener =
grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr));
port_added = grpc_tcp_listener_get_port(listener);
GPR_ASSERT(port_added == port);
grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1,

Loading…
Cancel
Save