Merge github.com:grpc/grpc into no-transport-metadata

Conflicts:
	test/cpp/interop/stress_test.cc
pull/4188/head
Craig Tiller 9 years ago
commit 6d3b229209
  1. 23
      include/grpc++/impl/thd_no_cxx11.h
  2. 20
      src/core/iomgr/tcp_server.h
  3. 150
      src/core/iomgr/tcp_server_posix.c
  4. 121
      src/core/iomgr/tcp_server_windows.c
  5. 4
      src/core/security/server_secure_chttp2.c
  6. 4
      src/core/surface/server_chttp2.c
  7. 4
      test/core/util/reconnect_server.c
  8. 64
      test/cpp/interop/stress_test.cc
  9. 22
      test/proto/benchmarks/control.proto
  10. 14
      test/proto/benchmarks/services.proto
  11. 8
      test/proto/benchmarks/stats.proto
  12. 14
      tools/run_tests/run_tests.py

@ -46,10 +46,21 @@ class thread {
joined_ = false; joined_ = false;
start(); start();
} }
template <class T, class U>
thread(void (T::*fptr)(U arg), T *obj, U arg) {
func_ = new thread_function_arg<T, U>(fptr, obj, arg);
joined_ = false;
start();
}
~thread() { ~thread() {
if (!joined_) std::terminate(); if (!joined_) std::terminate();
delete func_; delete func_;
} }
thread(thread &&other)
: func_(other.func_), thd_(other.thd_), joined_(other.joined_) {
other.joined_ = true;
other.func_ = NULL;
}
void join() { void join() {
gpr_thd_join(thd_); gpr_thd_join(thd_);
joined_ = true; joined_ = true;
@ -80,6 +91,18 @@ class thread {
void (T::*fptr_)(); void (T::*fptr_)();
T *obj_; T *obj_;
}; };
template <class T, class U>
class thread_function_arg : public thread_function_base {
public:
thread_function_arg(void (T::*fptr)(U arg), T *obj, U arg)
: fptr_(fptr), obj_(obj), arg_(arg) {}
virtual void call() { (obj_->*fptr_)(arg_); }
private:
void (T::*fptr_)(U arg);
T *obj_;
U arg_;
};
thread_function_base *func_; thread_function_base *func_;
gpr_thd_id thd_; gpr_thd_id thd_;
bool joined_; bool joined_;

@ -39,6 +39,9 @@
/* Forward decl of grpc_tcp_server */ /* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server; typedef struct grpc_tcp_server grpc_tcp_server;
/* Forward decl of grpc_tcp_listener */
typedef struct grpc_tcp_listener grpc_tcp_listener;
/* Called for newly connected TCP connections. */ /* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep); grpc_endpoint *ep);
@ -51,19 +54,18 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_pollset **pollsets, size_t pollset_count, grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, void *cb_arg); grpc_tcp_server_cb on_accept_cb, void *cb_arg);
/* Add a port to the server, returning port number on success, or negative /* Add a port to the server, returning the newly created listener on success,
on failure. or a null pointer on failure.
The :: and 0.0.0.0 wildcard addresses are treated identically, accepting The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
both IPv4 and IPv6 connections, but :: is the preferred style. This usually both IPv4 and IPv6 connections, but :: is the preferred style. This usually
creates one socket, but possibly two on systems which support IPv6, creates one socket, but possibly two on systems which support IPv6,
but not dualstack sockets. but not dualstack sockets. */
For raw access to the underlying sockets, see grpc_tcp_server_get_fd(). */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */ all of the multiple socket port matching logic in one place */
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
size_t addr_len); const void *addr,
size_t addr_len);
/* Returns the file descriptor of the Nth listening socket on this server, /* Returns the file descriptor of the Nth listening socket on this server,
or -1 if the index is out of bounds. or -1 if the index is out of bounds.
@ -75,4 +77,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index);
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_closure *closure); grpc_closure *closure);
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener);
void grpc_tcp_listener_ref(grpc_tcp_listener *listener);
void grpc_tcp_listener_unref(grpc_tcp_listener *listener);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */

@ -67,14 +67,13 @@
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#define INIT_PORT_CAP 2
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
static gpr_once s_init_max_accept_queue_size; static gpr_once s_init_max_accept_queue_size;
static int s_max_accept_queue_size; static int s_max_accept_queue_size;
/* one listening port */ /* one listening port */
typedef struct { struct grpc_tcp_listener {
int fd; int fd;
grpc_fd *emfd; grpc_fd *emfd;
grpc_tcp_server *server; grpc_tcp_server *server;
@ -84,9 +83,18 @@ typedef struct {
struct sockaddr_un un; struct sockaddr_un un;
} addr; } addr;
size_t addr_len; size_t addr_len;
int port;
grpc_closure read_closure; grpc_closure read_closure;
grpc_closure destroyed_closure; grpc_closure destroyed_closure;
} server_port; gpr_refcount refs;
struct grpc_tcp_listener *next;
/* When we add a listener, more than one can be created, mainly because of
IPv6. A sibling will still be in the normal list, but will be flagged
as such. Any action, such as ref or unref, will affect all of the
siblings in the list. */
struct grpc_tcp_listener *sibling;
int is_sibling;
};
static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
struct stat st; struct stat st;
@ -112,10 +120,9 @@ struct grpc_tcp_server {
/* is this server shutting down? (boolean) */ /* is this server shutting down? (boolean) */
int shutdown; int shutdown;
/* all listening ports */ /* linked list of server ports */
server_port *ports; grpc_tcp_listener *head;
size_t nports; unsigned nports;
size_t port_capacity;
/* shutdown callback */ /* shutdown callback */
grpc_closure *shutdown_complete; grpc_closure *shutdown_complete;
@ -134,9 +141,8 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->shutdown = 0; s->shutdown = 0;
s->on_accept_cb = NULL; s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL; s->on_accept_cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->head = NULL;
s->nports = 0; s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
return s; return s;
} }
@ -145,7 +151,12 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_mu_destroy(&s->mu); gpr_mu_destroy(&s->mu);
gpr_free(s->ports); while (s->head) {
grpc_tcp_listener *sp = s->head;
s->head = sp->next;
grpc_tcp_listener_unref(sp);
}
gpr_free(s); gpr_free(s);
} }
@ -166,8 +177,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) {
events will be received on them - at this point it's safe to destroy events will be received on them - at this point it's safe to destroy
things */ things */
static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
size_t i;
/* delete ALL the things */ /* delete ALL the things */
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
@ -176,9 +185,9 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
return; return;
} }
if (s->nports) { if (s->head) {
for (i = 0; i < s->nports; i++) { grpc_tcp_listener *sp;
server_port *sp = &s->ports[i]; for (sp = s->head; sp; sp = sp->next) {
if (sp->addr.sockaddr.sa_family == AF_UNIX) { if (sp->addr.sockaddr.sa_family == AF_UNIX) {
unlink_if_unix_domain_socket(&sp->addr.un); unlink_if_unix_domain_socket(&sp->addr.un);
} }
@ -196,7 +205,6 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_closure *closure) { grpc_closure *closure) {
size_t i;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown); GPR_ASSERT(!s->shutdown);
@ -206,8 +214,9 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
/* shutdown all fd's */ /* shutdown all fd's */
if (s->active_ports) { if (s->active_ports) {
for (i = 0; i < s->nports; i++) { grpc_tcp_listener *sp;
grpc_fd_shutdown(exec_ctx, s->ports[i].emfd); for (sp = s->head; sp; sp = sp->next) {
grpc_fd_shutdown(exec_ctx, sp->emfd);
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} else { } else {
@ -298,7 +307,7 @@ error:
/* event manager callback when reads are ready */ /* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
server_port *sp = arg; grpc_tcp_listener *sp = arg;
grpc_fd *fdobj; grpc_fd *fdobj;
size_t i; size_t i;
@ -364,9 +373,10 @@ error:
} }
} }
static int add_socket_to_server(grpc_tcp_server *s, int fd, static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr, size_t addr_len) { const struct sockaddr *addr,
server_port *sp; size_t addr_len) {
grpc_tcp_listener *sp = NULL;
int port; int port;
char *addr_str; char *addr_str;
char *name; char *name;
@ -376,32 +386,35 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
s->nports++;
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
/* append it to the list under a lock */ sp = gpr_malloc(sizeof(grpc_tcp_listener));
if (s->nports == s->port_capacity) { sp->next = s->head;
s->port_capacity *= 2; s->head = sp;
s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity);
}
sp = &s->ports[s->nports++];
sp->server = s; sp->server = s;
sp->fd = fd; sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name); sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len); memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len; sp->addr_len = addr_len;
sp->port = port;
sp->is_sibling = 0;
sp->sibling = NULL;
gpr_ref_init(&sp->refs, 1);
GPR_ASSERT(sp->emfd); GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
gpr_free(addr_str); gpr_free(addr_str);
gpr_free(name); gpr_free(name);
} }
return port; return sp;
} }
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
size_t addr_len) { const void *addr,
int allocated_port1 = -1; size_t addr_len) {
int allocated_port2 = -1; int allocated_port = -1;
unsigned i; grpc_tcp_listener *sp;
grpc_tcp_listener *sp2 = NULL;
int fd; int fd;
grpc_dualstack_mode dsmode; grpc_dualstack_mode dsmode;
struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in6 addr6_v4mapped;
@ -420,9 +433,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
/* Check if this is a wildcard port, and if so, try to keep the port the same /* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */ as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) { if (grpc_sockaddr_get_port(addr) == 0) {
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
sockname_len = sizeof(sockname_temp); sockname_len = sizeof(sockname_temp);
if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp,
&sockname_len)) { &sockname_len)) {
port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
if (port > 0) { if (port > 0) {
@ -436,6 +449,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
} }
} }
sp = NULL;
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = (const struct sockaddr *)&addr6_v4mapped; addr = (const struct sockaddr *)&addr6_v4mapped;
addr_len = sizeof(addr6_v4mapped); addr_len = sizeof(addr6_v4mapped);
@ -449,14 +464,16 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
addr = (struct sockaddr *)&wild6; addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6); addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
allocated_port1 = add_socket_to_server(s, fd, addr, addr_len); sp = add_socket_to_server(s, fd, addr, addr_len);
allocated_port = sp->port;
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done; goto done;
} }
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
if (port == 0 && allocated_port1 > 0) { if (port == 0 && allocated_port > 0) {
grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1); grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port);
sp2 = sp;
} }
addr = (struct sockaddr *)&wild4; addr = (struct sockaddr *)&wild4;
addr_len = sizeof(wild4); addr_len = sizeof(wild4);
@ -471,22 +488,31 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
addr = (struct sockaddr *)&addr4_copy; addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy); addr_len = sizeof(addr4_copy);
} }
allocated_port2 = add_socket_to_server(s, fd, addr, addr_len); sp = add_socket_to_server(s, fd, addr, addr_len);
sp->sibling = sp2;
if (sp2) sp2->is_sibling = 1;
done: done:
gpr_free(allocated_addr); gpr_free(allocated_addr);
return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; return sp;
} }
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
return (port_index < s->nports) ? s->ports[port_index].fd : -1; grpc_tcp_listener *sp;
for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--);
if (port_index == 0 && sp) {
return sp->fd;
} else {
return -1;
}
} }
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_pollset **pollsets, size_t pollset_count, grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, grpc_tcp_server_cb on_accept_cb,
void *on_accept_cb_arg) { void *on_accept_cb_arg) {
size_t i, j; size_t i;
grpc_tcp_listener *sp;
GPR_ASSERT(on_accept_cb); GPR_ASSERT(on_accept_cb);
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(!s->on_accept_cb);
@ -495,17 +521,41 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s->on_accept_cb_arg = on_accept_cb_arg; s->on_accept_cb_arg = on_accept_cb_arg;
s->pollsets = pollsets; s->pollsets = pollsets;
s->pollset_count = pollset_count; s->pollset_count = pollset_count;
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
for (j = 0; j < pollset_count; j++) { for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
} }
s->ports[i].read_closure.cb = on_read; sp->read_closure.cb = on_read;
s->ports[i].read_closure.cb_arg = &s->ports[i]; sp->read_closure.cb_arg = sp;
grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd, grpc_fd_notify_on_read(exec_ctx, sp->emfd,
&s->ports[i].read_closure); &sp->read_closure);
s->active_ports++; s->active_ports++;
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
return sp->port;
}
void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
gpr_ref(&sp->refs);
}
void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
if (sp->is_sibling) return;
if (gpr_unref(&sp->refs)) {
grpc_tcp_listener *sibling = sp->sibling;
while (sibling) {
sp = sibling;
sibling = sp->sibling;
gpr_free(sp);
}
gpr_free(listener);
}
}
#endif #endif

@ -35,7 +35,8 @@
#ifdef GPR_WINSOCK_SOCKET #ifdef GPR_WINSOCK_SOCKET
#define _GNU_SOURCE #include <io.h>
#include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/sockaddr_utils.h"
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
@ -51,25 +52,29 @@
#include "src/core/iomgr/tcp_server.h" #include "src/core/iomgr/tcp_server.h"
#include "src/core/iomgr/tcp_windows.h" #include "src/core/iomgr/tcp_windows.h"
#define INIT_PORT_CAP 2
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
/* one listening port */ /* one listening port */
typedef struct server_port { struct grpc_tcp_listener {
/* This seemingly magic number comes from AcceptEx's documentation. each /* This seemingly magic number comes from AcceptEx's documentation. each
address buffer needs to have at least 16 more bytes at their end. */ address buffer needs to have at least 16 more bytes at their end. */
gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2]; gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2];
/* This will hold the socket for the next accept. */ /* This will hold the socket for the next accept. */
SOCKET new_socket; SOCKET new_socket;
/* The listener winsocked. */ /* The listener winsocket. */
grpc_winsocket *socket; grpc_winsocket *socket;
/* The actual TCP port number. */
int port;
grpc_tcp_server *server; grpc_tcp_server *server;
/* The cached AcceptEx for that port. */ /* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx; LPFN_ACCEPTEX AcceptEx;
int shutting_down; int shutting_down;
/* closure for socket notification of accept being ready */ /* closure for socket notification of accept being ready */
grpc_closure on_accept; grpc_closure on_accept;
} server_port; gpr_refcount refs;
/* linked list */
struct grpc_tcp_listener *next;
};
/* the overall server */ /* the overall server */
struct grpc_tcp_server { struct grpc_tcp_server {
@ -82,10 +87,8 @@ struct grpc_tcp_server {
/* active port count: how many ports are actually still listening */ /* active port count: how many ports are actually still listening */
int active_ports; int active_ports;
/* all listening ports */ /* linked list of server ports */
server_port *ports; grpc_tcp_listener *head;
size_t nports;
size_t port_capacity;
/* shutdown callback */ /* shutdown callback */
grpc_closure *shutdown_complete; grpc_closure *shutdown_complete;
@ -99,9 +102,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->active_ports = 0; s->active_ports = 0;
s->on_accept_cb = NULL; s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL; s->on_accept_cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->head = NULL;
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
s->shutdown_complete = NULL; s->shutdown_complete = NULL;
return s; return s;
} }
@ -109,26 +110,26 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
static void dont_care_about_shutdown_completion(void *arg) {} static void dont_care_about_shutdown_completion(void *arg) {}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
size_t i;
grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
/* Now that the accepts have been aborted, we can destroy the sockets. /* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already The IOCP won't get notified on these, so we can flag them as already
closed by the system. */ closed by the system. */
for (i = 0; i < s->nports; i++) { while (s->head) {
server_port *sp = &s->ports[i]; grpc_tcp_listener *sp = s->head;
s->head = sp->next;
sp->next = NULL;
grpc_winsocket_destroy(sp->socket); grpc_winsocket_destroy(sp->socket);
grpc_tcp_listener_unref(sp);
} }
gpr_free(s->ports);
gpr_free(s); gpr_free(s);
} }
/* Public function. Stops and destroys a grpc_tcp_server. */ /* Public function. Stops and destroys a grpc_tcp_server. */
void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_closure *shutdown_complete) { grpc_closure *shutdown_complete) {
size_t i;
int immediately_done = 0; int immediately_done = 0;
grpc_tcp_listener *sp;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
s->shutdown_complete = shutdown_complete; s->shutdown_complete = shutdown_complete;
@ -138,8 +139,7 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
if (s->active_ports == 0) { if (s->active_ports == 0) {
immediately_done = 1; immediately_done = 1;
} }
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
server_port *sp = &s->ports[i];
sp->shutting_down = 1; sp->shutting_down = 1;
grpc_winsocket_shutdown(sp->socket); grpc_winsocket_shutdown(sp->socket);
} }
@ -199,7 +199,7 @@ error:
} }
static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
server_port *sp) { grpc_tcp_listener *sp) {
int notify = 0; int notify = 0;
sp->shutting_down = 0; sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu); gpr_mu_lock(&sp->server->mu);
@ -216,7 +216,7 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
/* In order to do an async accept, we need to create a socket first which /* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */ will be the one assigned to the new incoming connection. */
static void start_accept(grpc_exec_ctx *exec_ctx, server_port *port) { static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
SOCKET sock = INVALID_SOCKET; SOCKET sock = INVALID_SOCKET;
char *message; char *message;
char *utf8_message; char *utf8_message;
@ -276,7 +276,7 @@ failure:
/* Event manager callback when reads are ready. */ /* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
server_port *sp = arg; grpc_tcp_listener *sp = arg;
SOCKET sock = sp->new_socket; SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
@ -351,16 +351,17 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
start_accept(exec_ctx, sp); start_accept(exec_ctx, sp);
} }
static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
const struct sockaddr *addr, size_t addr_len) { const struct sockaddr *addr,
server_port *sp; size_t addr_len) {
grpc_tcp_listener *sp = NULL;
int port; int port;
int status; int status;
GUID guid = WSAID_ACCEPTEX; GUID guid = WSAID_ACCEPTEX;
DWORD ioctl_num_bytes; DWORD ioctl_num_bytes;
LPFN_ACCEPTEX AcceptEx; LPFN_ACCEPTEX AcceptEx;
if (sock == INVALID_SOCKET) return -1; if (sock == INVALID_SOCKET) return NULL;
/* We need to grab the AcceptEx pointer for that port, as it may be /* We need to grab the AcceptEx pointer for that port, as it may be
interface-dependent. We'll cache it to avoid doing that again. */ interface-dependent. We'll cache it to avoid doing that again. */
@ -373,37 +374,35 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
closesocket(sock); closesocket(sock);
return -1; return NULL;
} }
port = prepare_socket(sock, addr, addr_len); port = prepare_socket(sock, addr, addr_len);
if (port >= 0) { if (port >= 0) {
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
/* append it to the list under a lock */ sp = gpr_malloc(sizeof(grpc_tcp_listener));
if (s->nports == s->port_capacity) { sp->next = s->head;
/* too many ports, and we need to store their address in a closure */ s->head = sp;
/* TODO(ctiller): make server_port a linked list */
abort();
}
sp = &s->ports[s->nports++];
sp->server = s; sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener"); sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0; sp->shutting_down = 0;
sp->AcceptEx = AcceptEx; sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET; sp->new_socket = INVALID_SOCKET;
sp->port = port;
gpr_ref_init(&sp->refs, 1);
grpc_closure_init(&sp->on_accept, on_accept, sp); grpc_closure_init(&sp->on_accept, on_accept, sp);
GPR_ASSERT(sp->socket); GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
return port; return sp;
} }
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
size_t addr_len) { const void *addr,
int allocated_port = -1; size_t addr_len) {
unsigned i; grpc_tcp_listener *sp;
SOCKET sock; SOCKET sock;
struct sockaddr_in6 addr6_v4mapped; struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in6 wildcard; struct sockaddr_in6 wildcard;
@ -415,9 +414,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
/* Check if this is a wildcard port, and if so, try to keep the port the same /* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */ as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) { if (grpc_sockaddr_get_port(addr) == 0) {
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
sockname_len = sizeof(sockname_temp); sockname_len = sizeof(sockname_temp);
if (0 == getsockname(s->ports[i].socket->socket, if (0 == getsockname(sp->socket->socket,
(struct sockaddr *)&sockname_temp, &sockname_len)) { (struct sockaddr *)&sockname_temp, &sockname_len)) {
port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
if (port > 0) { if (port > 0) {
@ -452,33 +451,55 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
gpr_free(utf8_message); gpr_free(utf8_message);
} }
allocated_port = add_socket_to_server(s, sock, addr, addr_len); sp = add_socket_to_server(s, sock, addr, addr_len);
gpr_free(allocated_addr); gpr_free(allocated_addr);
return allocated_port; return sp;
} }
SOCKET int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) { grpc_tcp_listener *sp;
return (index < s->nports) ? s->ports[index].socket->socket : INVALID_SOCKET; for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--);
if (port_index == 0 && sp) {
return _open_osfhandle(sp->socket->socket, 0);
} else {
return -1;
}
} }
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_pollset **pollset, size_t pollset_count, grpc_pollset **pollset, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, grpc_tcp_server_cb on_accept_cb,
void *on_accept_cb_arg) { void *on_accept_cb_arg) {
size_t i; grpc_tcp_listener *sp;
GPR_ASSERT(on_accept_cb); GPR_ASSERT(on_accept_cb);
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(!s->on_accept_cb);
GPR_ASSERT(s->active_ports == 0); GPR_ASSERT(s->active_ports == 0);
s->on_accept_cb = on_accept_cb; s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg; s->on_accept_cb_arg = on_accept_cb_arg;
for (i = 0; i < s->nports; i++) { for (sp = s->head; sp; sp = sp->next) {
start_accept(exec_ctx, s->ports + i); start_accept(exec_ctx, sp);
s->active_ports++; s->active_ports++;
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} }
int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
return sp->port;
}
void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
gpr_ref(&sp->refs);
}
void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
grpc_tcp_listener *sp = listener;
if (gpr_unref(&sp->refs)) {
gpr_free(listener);
}
}
#endif /* GPR_WINSOCK_SOCKET */ #endif /* GPR_WINSOCK_SOCKET */

@ -247,9 +247,11 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
} }
for (i = 0; i < resolved->naddrs; i++) { for (i = 0; i < resolved->naddrs; i++) {
port_temp = grpc_tcp_server_add_port( grpc_tcp_listener *listener;
listener = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr, tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len); resolved->addrs[i].len);
port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp >= 0) { if (port_temp >= 0) {
if (port_num == -1) { if (port_num == -1) {
port_num = port_temp; port_num = port_temp;

@ -106,9 +106,11 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
} }
for (i = 0; i < resolved->naddrs; i++) { for (i = 0; i < resolved->naddrs; i++) {
port_temp = grpc_tcp_server_add_port( grpc_tcp_listener *listener;
listener = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr, tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len); resolved->addrs[i].len);
port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp >= 0) { if (port_temp >= 0) {
if (port_num == -1) { if (port_num == -1) {
port_num = port_temp; port_num = port_temp;

@ -113,6 +113,7 @@ void reconnect_server_init(reconnect_server *server) {
void reconnect_server_start(reconnect_server *server, int port) { void reconnect_server_start(reconnect_server *server, int port) {
struct sockaddr_in addr; struct sockaddr_in addr;
grpc_tcp_listener *listener;
int port_added; int port_added;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -121,8 +122,9 @@ void reconnect_server_start(reconnect_server *server, int port) {
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr)); memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
server->tcp_server = grpc_tcp_server_create(); server->tcp_server = grpc_tcp_server_create();
port_added = listener =
grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr));
port_added = grpc_tcp_listener_get_port(listener);
GPR_ASSERT(port_added == port); GPR_ASSERT(port_added == port);
grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1, grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1,

@ -38,10 +38,10 @@
#include <vector> #include <vector>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <grpc/support/time.h>
#include <grpc++/create_channel.h> #include <grpc++/create_channel.h>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include <grpc++/impl/thd.h> #include <grpc++/impl/thd.h>
#include <grpc/support/time.h>
#include "test/cpp/interop/interop_client.h" #include "test/cpp/interop/interop_client.h"
#include "test/cpp/interop/stress_interop_client.h" #include "test/cpp/interop/stress_interop_client.h"
@ -70,6 +70,8 @@ DEFINE_string(server_addresses, "localhost:8080",
" \"<name_1>:<port_1>,<name_2>:<port_1>...<name_N>:<port_N>\"\n" " \"<name_1>:<port_1>,<name_2>:<port_1>...<name_N>:<port_N>\"\n"
" Note: <name> can be servername or IP address."); " Note: <name> can be servername or IP address.");
DEFINE_int32(num_channels_per_server, 1, "Number of channels for each server");
DEFINE_int32(num_stubs_per_channel, 1, DEFINE_int32(num_stubs_per_channel, 1,
"Number of stubs per each channels to server. This number also " "Number of stubs per each channels to server. This number also "
"indicates the max number of parallel RPC calls on each channel " "indicates the max number of parallel RPC calls on each channel "
@ -216,30 +218,46 @@ int main(int argc, char** argv) {
std::vector<grpc::thread> test_threads; std::vector<grpc::thread> test_threads;
// Create and start the test threads.
// Note that:
// - Each server can have multiple channels (as configured by
// FLAGS_num_channels_per_server).
//
// - Each channel can have multiple stubs (as configured by
// FLAGS_num_stubs_per_channel). This is to test calling multiple RPCs in
// parallel on the same channel.
int thread_idx = 0; int thread_idx = 0;
int server_idx = -1;
char buffer[256];
for (auto it = server_addresses.begin(); it != server_addresses.end(); it++) { for (auto it = server_addresses.begin(); it != server_addresses.end(); it++) {
// TODO(sreek): This will change once we add support for other tests ++server_idx;
// that won't work with InsecureChannelCredentials() // Create channel(s) for each server
std::shared_ptr<grpc::Channel> channel( for (int channel_idx = 0; channel_idx < FLAGS_num_channels_per_server;
grpc::CreateChannel(*it, grpc::InsecureChannelCredentials())); channel_idx++) {
// TODO (sreek). This won't work for tests that require Authentication
// Make multiple stubs (as defined by num_stubs_per_channel flag) to use the std::shared_ptr<grpc::Channel> channel(
// same channel. This is to test calling multiple RPC calls in parallel on grpc::CreateChannel(*it, grpc::InsecureChannelCredentials()));
// each channel.
for (int i = 0; i < FLAGS_num_stubs_per_channel; i++) { // Create stub(s) for each channel
StressTestInteropClient* client = new StressTestInteropClient( for (int stub_idx = 0; stub_idx < FLAGS_num_stubs_per_channel;
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs, stub_idx++) {
FLAGS_sleep_duration_ms, FLAGS_metrics_collection_interval_secs); StressTestInteropClient* client = new StressTestInteropClient(
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
bool is_already_created; FLAGS_sleep_duration_ms, FLAGS_metrics_collection_interval_secs);
grpc::string metricName =
"/stress_test/qps/thread/" + std::to_string(thread_idx); bool is_already_created;
test_threads.emplace_back(grpc::thread( // Gauge name
&StressTestInteropClient::MainLoop, client, std::snprintf(buffer, sizeof(buffer),
metrics_service.CreateGauge(metricName, &is_already_created))); "/stress_test/server_%d/channel_%d/stub_%d/qps",
server_idx, channel_idx, stub_idx);
// The Gauge should not have been already created
GPR_ASSERT(!is_already_created); test_threads.emplace_back(grpc::thread(
&StressTestInteropClient::MainLoop, client,
metrics_service.CreateGauge(buffer, &is_already_created)));
// The Gauge should not have been already created
GPR_ASSERT(!is_already_created);
}
} }
} }

@ -49,7 +49,10 @@ enum RpcType {
STREAMING = 1; STREAMING = 1;
} }
// Parameters of poisson process distribution, which is a good representation
// of activity coming in from independent identical stationary sources.
message PoissonParams { message PoissonParams {
// The rate of arrivals (a.k.a. lambda parameter of the exp distribution).
double offered_load = 1; double offered_load = 1;
} }
@ -67,6 +70,8 @@ message ParetoParams {
double alpha = 2; double alpha = 2;
} }
// Once an RPC finishes, immediately start a new one.
// No configuration parameters needed.
message ClosedLoopParams { message ClosedLoopParams {
} }
@ -87,14 +92,20 @@ message SecurityParams {
} }
message ClientConfig { message ClientConfig {
// List of targets to connect to. At least one target needs to be specified.
repeated string server_targets = 1; repeated string server_targets = 1;
ClientType client_type = 2; ClientType client_type = 2;
SecurityParams security_params = 3; SecurityParams security_params = 3;
// How many concurrent RPCs to start for each channel.
// For synchronous client, use a separate thread for each outstanding RPC.
int32 outstanding_rpcs_per_channel = 4; int32 outstanding_rpcs_per_channel = 4;
// Number of independent client channels to create.
// i-th channel will connect to server_target[i % server_targets.size()]
int32 client_channels = 5; int32 client_channels = 5;
// only for async client: // Only for async client. Number of threads to use to start/manage RPCs.
int32 async_client_threads = 7; int32 async_client_threads = 7;
RpcType rpc_type = 8; RpcType rpc_type = 8;
// The requested load for the entire client (aggregated over all the threads).
LoadParams load_params = 10; LoadParams load_params = 10;
PayloadConfig payload_config = 11; PayloadConfig payload_config = 11;
HistogramParams histogram_params = 12; HistogramParams histogram_params = 12;
@ -106,6 +117,7 @@ message ClientStatus {
// Request current stats // Request current stats
message Mark { message Mark {
// if true, the stats will be reset after taking their snapshot.
bool reset = 1; bool reset = 1;
} }
@ -119,11 +131,13 @@ message ClientArgs {
message ServerConfig { message ServerConfig {
ServerType server_type = 1; ServerType server_type = 1;
SecurityParams security_params = 2; SecurityParams security_params = 2;
// Host on which to listen.
string host = 3; string host = 3;
// Port on which to listen. Zero means pick unused port.
int32 port = 4; int32 port = 4;
// only for async server // Only for async server. Number of threads used to serve the requests.
int32 async_server_threads = 7; int32 async_server_threads = 7;
// restrict core usage // restrict core usage, currently unused
int32 core_limit = 8; int32 core_limit = 8;
PayloadConfig payload_config = 9; PayloadConfig payload_config = 9;
} }
@ -137,6 +151,8 @@ message ServerArgs {
message ServerStatus { message ServerStatus {
ServerStats stats = 1; ServerStats stats = 1;
// the port bound by the server
int32 port = 2; int32 port = 2;
// Number of cores on the server. See gpr_cpu_num_cores.
int32 cores = 3; int32 cores = 3;
} }

@ -47,9 +47,19 @@ service BenchmarkService {
} }
service WorkerService { service WorkerService {
// Start server with specified workload // Start server with specified workload.
// First request sent specifies the ServerConfig followed by ServerStatus
// response. After that, a "Mark" can be sent anytime to request the latest
// stats. Closing the stream will initiate shutdown of the test server
// and once the shutdown has finished, the OK status is sent to terminate
// this RPC.
rpc RunServer(stream ServerArgs) returns (stream ServerStatus); rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
// Start client with specified workload // Start client with specified workload.
// First request sent specifies the ClientConfig followed by ClientStatus
// response. After that, a "Mark" can be sent anytime to request the latest
// stats. Closing the stream will initiate shutdown of the test client
// and once the shutdown has finished, the OK status is sent to terminate
// this RPC.
rpc RunClient(stream ClientArgs) returns (stream ClientStatus); rpc RunClient(stream ClientArgs) returns (stream ClientStatus);
} }

@ -32,14 +32,14 @@ syntax = "proto3";
package grpc.testing; package grpc.testing;
message ServerStats { message ServerStats {
// wall clock time change since last reset // wall clock time change in seconds since last reset
double time_elapsed = 1; double time_elapsed = 1;
// change in user time used by the server since last reset // change in user time (in seconds) used by the server since last reset
double time_user = 2; double time_user = 2;
// change in server time used by the server process and all threads since // change in server time (in seconds) used by the server process and all
// last reset // threads since last reset
double time_system = 3; double time_system = 3;
} }

@ -624,10 +624,15 @@ build_configs = set(cfg.build_config for cfg in run_configs)
if args.travis: if args.travis:
_FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'api'} _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'api'}
languages = set(_LANGUAGES[l] if 'all' in args.language:
for l in itertools.chain.from_iterable( lang_list = _LANGUAGES.keys()
_LANGUAGES.iterkeys() if x == 'all' else [x] else:
for x in args.language)) lang_list = args.language
# We don't support code coverage on ObjC
if 'gcov' in args.config and 'objc' in lang_list:
lang_list.remove('objc')
languages = set(_LANGUAGES[l] for l in lang_list)
if len(build_configs) > 1: if len(build_configs) > 1:
for language in languages: for language in languages:
@ -840,6 +845,7 @@ def _calculate_num_runs_failures(list_of_results):
num_failures += jobresult.num_failures num_failures += jobresult.num_failures
return num_runs, num_failures return num_runs, num_failures
def _build_and_run( def _build_and_run(
check_cancelled, newline_on_success, cache, xml_report=None): check_cancelled, newline_on_success, cache, xml_report=None):
"""Do one pass of building & running tests.""" """Do one pass of building & running tests."""

Loading…
Cancel
Save