|
|
|
@ -112,8 +112,10 @@ struct grpc_tcp_server { |
|
|
|
|
/* destroyed port count: how many ports are completely destroyed */ |
|
|
|
|
size_t destroyed_ports; |
|
|
|
|
|
|
|
|
|
/* is this server shutting down? (boolean) */ |
|
|
|
|
int shutdown; |
|
|
|
|
/* is this server shutting down? */ |
|
|
|
|
bool shutdown; |
|
|
|
|
/* use SO_REUSEPORT */ |
|
|
|
|
bool so_reuseport; |
|
|
|
|
|
|
|
|
|
/* linked list of server ports */ |
|
|
|
|
grpc_tcp_listener *head; |
|
|
|
@ -132,14 +134,42 @@ struct grpc_tcp_server { |
|
|
|
|
size_t pollset_count; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static gpr_once check_init = GPR_ONCE_INIT; |
|
|
|
|
static bool has_so_reuseport; |
|
|
|
|
|
|
|
|
|
static void init(void) { |
|
|
|
|
int s = socket(AF_INET, SOCK_STREAM, 0); |
|
|
|
|
if (s >= 0) { |
|
|
|
|
has_so_reuseport = GRPC_LOG_IF_ERROR("check for SO_REUSEPORT", |
|
|
|
|
grpc_set_socket_reuse_port(s, 1)); |
|
|
|
|
close(s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, |
|
|
|
|
const grpc_channel_args *args, |
|
|
|
|
grpc_tcp_server **server) { |
|
|
|
|
gpr_once_init(&check_init, init); |
|
|
|
|
|
|
|
|
|
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); |
|
|
|
|
s->so_reuseport = has_so_reuseport; |
|
|
|
|
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { |
|
|
|
|
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { |
|
|
|
|
if (args->args[i].type == GRPC_ARG_INTEGER) { |
|
|
|
|
s->so_reuseport = |
|
|
|
|
has_so_reuseport && (args->args[i].value.integer != 0); |
|
|
|
|
} else { |
|
|
|
|
gpr_free(s); |
|
|
|
|
return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT |
|
|
|
|
" must be an integer"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
gpr_ref_init(&s->refs, 1); |
|
|
|
|
gpr_mu_init(&s->mu); |
|
|
|
|
s->active_ports = 0; |
|
|
|
|
s->destroyed_ports = 0; |
|
|
|
|
s->shutdown = 0; |
|
|
|
|
s->shutdown = false; |
|
|
|
|
s->shutdown_starting.head = NULL; |
|
|
|
|
s->shutdown_starting.tail = NULL; |
|
|
|
|
s->shutdown_complete = shutdown_complete; |
|
|
|
@ -214,7 +244,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { |
|
|
|
|
gpr_mu_lock(&s->mu); |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(!s->shutdown); |
|
|
|
|
s->shutdown = 1; |
|
|
|
|
s->shutdown = true; |
|
|
|
|
|
|
|
|
|
/* shutdown all fd's */ |
|
|
|
|
if (s->active_ports) { |
|
|
|
@ -264,13 +294,19 @@ static int get_max_accept_queue_size(void) { |
|
|
|
|
|
|
|
|
|
/* Prepare a recently-created socket for listening. */ |
|
|
|
|
static grpc_error *prepare_socket(int fd, const struct sockaddr *addr, |
|
|
|
|
size_t addr_len, int *port) { |
|
|
|
|
size_t addr_len, bool so_reuseport, |
|
|
|
|
int *port) { |
|
|
|
|
struct sockaddr_storage sockname_temp; |
|
|
|
|
socklen_t sockname_len; |
|
|
|
|
grpc_error *err = GRPC_ERROR_NONE; |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(fd >= 0); |
|
|
|
|
|
|
|
|
|
if (so_reuseport) { |
|
|
|
|
err = grpc_set_socket_reuse_port(fd, 1); |
|
|
|
|
if (err != GRPC_ERROR_NONE) goto error; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err = grpc_set_socket_nonblocking(fd, 1); |
|
|
|
|
if (err != GRPC_ERROR_NONE) goto error; |
|
|
|
|
err = grpc_set_socket_cloexec(fd, 1); |
|
|
|
@ -397,7 +433,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, |
|
|
|
|
char *addr_str; |
|
|
|
|
char *name; |
|
|
|
|
|
|
|
|
|
grpc_error *err = prepare_socket(fd, addr, addr_len, &port); |
|
|
|
|
grpc_error *err = prepare_socket(fd, addr, addr_len, s->so_reuseport, &port); |
|
|
|
|
if (err == GRPC_ERROR_NONE) { |
|
|
|
|
GPR_ASSERT(port > 0); |
|
|
|
|
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); |
|
|
|
@ -433,6 +469,51 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, |
|
|
|
|
return err; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { |
|
|
|
|
grpc_tcp_listener *sp = NULL; |
|
|
|
|
char *addr_str; |
|
|
|
|
char *name; |
|
|
|
|
grpc_error *err; |
|
|
|
|
|
|
|
|
|
for (grpc_tcp_listener *l = listener->next; l && l->is_sibling; l = l->next) { |
|
|
|
|
l->fd_index += count; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (unsigned i = 0; i < count; i++) { |
|
|
|
|
int fd, port; |
|
|
|
|
grpc_dualstack_mode dsmode; |
|
|
|
|
err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0, |
|
|
|
|
&dsmode, &fd); |
|
|
|
|
if (err != GRPC_ERROR_NONE) return err; |
|
|
|
|
err = prepare_socket(fd, &listener->addr.sockaddr, listener->addr_len, true, |
|
|
|
|
&port); |
|
|
|
|
if (err != GRPC_ERROR_NONE) return err; |
|
|
|
|
grpc_sockaddr_to_string(&addr_str, &listener->addr.sockaddr, 1); |
|
|
|
|
gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i); |
|
|
|
|
sp = gpr_malloc(sizeof(grpc_tcp_listener)); |
|
|
|
|
sp->next = listener->next; |
|
|
|
|
listener->next = sp; |
|
|
|
|
sp->server = listener->server; |
|
|
|
|
sp->fd = fd; |
|
|
|
|
sp->emfd = grpc_fd_create(fd, name); |
|
|
|
|
memcpy(sp->addr.untyped, listener->addr.untyped, listener->addr_len); |
|
|
|
|
sp->addr_len = listener->addr_len; |
|
|
|
|
sp->port = port; |
|
|
|
|
sp->port_index = listener->port_index; |
|
|
|
|
sp->fd_index = listener->fd_index + count - i; |
|
|
|
|
sp->is_sibling = 1; |
|
|
|
|
sp->sibling = listener->is_sibling ? listener->sibling : listener; |
|
|
|
|
GPR_ASSERT(sp->emfd); |
|
|
|
|
while (listener->server->tail->next != NULL) { |
|
|
|
|
listener->server->tail = listener->server->tail->next; |
|
|
|
|
} |
|
|
|
|
gpr_free(addr_str); |
|
|
|
|
gpr_free(name); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, |
|
|
|
|
size_t addr_len, int *out_port) { |
|
|
|
|
grpc_tcp_listener *sp; |
|
|
|
@ -589,14 +670,29 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, |
|
|
|
|
s->on_accept_cb_arg = on_accept_cb_arg; |
|
|
|
|
s->pollsets = pollsets; |
|
|
|
|
s->pollset_count = pollset_count; |
|
|
|
|
for (sp = s->head; sp; sp = sp->next) { |
|
|
|
|
for (i = 0; i < pollset_count; i++) { |
|
|
|
|
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); |
|
|
|
|
sp = s->head; |
|
|
|
|
while (sp != NULL) { |
|
|
|
|
if (s->so_reuseport && pollset_count > 1) { |
|
|
|
|
GPR_ASSERT(GRPC_LOG_IF_ERROR( |
|
|
|
|
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1)))); |
|
|
|
|
for (i = 0; i < pollset_count; i++) { |
|
|
|
|
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); |
|
|
|
|
sp->read_closure.cb = on_read; |
|
|
|
|
sp->read_closure.cb_arg = sp; |
|
|
|
|
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); |
|
|
|
|
s->active_ports++; |
|
|
|
|
sp = sp->next; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
for (i = 0; i < pollset_count; i++) { |
|
|
|
|
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); |
|
|
|
|
} |
|
|
|
|
sp->read_closure.cb = on_read; |
|
|
|
|
sp->read_closure.cb_arg = sp; |
|
|
|
|
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); |
|
|
|
|
s->active_ports++; |
|
|
|
|
sp = sp->next; |
|
|
|
|
} |
|
|
|
|
sp->read_closure.cb = on_read; |
|
|
|
|
sp->read_closure.cb_arg = sp; |
|
|
|
|
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); |
|
|
|
|
s->active_ports++; |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&s->mu); |
|
|
|
|
} |
|
|
|
|