Merge pull request #10231 from ctiller/dynamic_tcp_sizing

Dynamically adjust TCP read buffer size
pull/10237/head
Craig Tiller 8 years ago committed by GitHub
commit 71f1b8ebf7
  1. 8
      include/grpc/impl/codegen/grpc_types.h
  2. 7
      src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
  3. 5
      src/core/lib/iomgr/endpoint_pair.h
  4. 17
      src/core/lib/iomgr/endpoint_pair_posix.c
  5. 5
      src/core/lib/iomgr/endpoint_pair_uv.c
  6. 15
      src/core/lib/iomgr/endpoint_pair_windows.c
  7. 9
      src/core/lib/iomgr/resource_quota.c
  8. 2
      src/core/lib/iomgr/resource_quota.h
  9. 4
      src/core/lib/iomgr/tcp_client.h
  10. 24
      src/core/lib/iomgr/tcp_client_posix.c
  11. 21
      src/core/lib/iomgr/tcp_client_windows.c
  12. 114
      src/core/lib/iomgr/tcp_posix.c
  13. 7
      src/core/lib/iomgr/tcp_posix.h
  14. 22
      src/core/lib/iomgr/tcp_server_posix.c
  15. 3
      src/core/lib/iomgr/tcp_server_utils_posix.h
  16. 25
      src/core/lib/iomgr/tcp_server_windows.c
  17. 14
      src/core/lib/iomgr/tcp_windows.c
  18. 4
      src/core/lib/iomgr/tcp_windows.h
  19. 5
      test/core/bad_client/bad_client.c
  20. 4
      test/core/end2end/fixtures/h2_sockpair+trace.c
  21. 4
      test/core/end2end/fixtures/h2_sockpair.c
  22. 14
      test/core/end2end/fixtures/h2_sockpair_1byte.c
  23. 10
      test/core/iomgr/endpoint_pair_test.c
  24. 2
      test/core/iomgr/fd_conservation_posix_test.c
  25. 55
      test/core/iomgr/tcp_posix_test.c
  26. 10
      test/core/security/secure_endpoint_test.c
  27. 4
      test/cpp/microbenchmarks/fullstack_fixtures.h

@ -271,6 +271,14 @@ typedef struct {
* possible. */
#define GRPC_ARG_USE_CRONET_PACKET_COALESCING \
"grpc.use_cronet_packet_coalescing"
/* Channel arg (integer) setting how large a slice to try and read from the wire
each time recvmsg (or equivalent) is called */
#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size"
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
#define GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE \
"grpc.experimental.tcp_min_read_chunk_size"
#define GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE \
"grpc.experimental.tcp_max_read_chunk_size"
/** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a

@ -57,12 +57,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
char *name;
gpr_asprintf(&name, "fd:%d", fd);
grpc_resource_quota *resource_quota = grpc_resource_quota_from_channel_args(
grpc_server_get_channel_args(server));
grpc_endpoint *server_endpoint =
grpc_tcp_create(grpc_fd_create(fd, name), resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_tcp_create(&exec_ctx, grpc_fd_create(fd, name),
grpc_server_get_channel_args(server), name);
gpr_free(name);

@ -41,8 +41,7 @@ typedef struct {
grpc_endpoint *server;
} grpc_endpoint_pair;
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
const char *name, grpc_resource_quota *resource_quota,
size_t read_slice_size);
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
grpc_channel_args *args);
#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */

@ -62,22 +62,25 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE);
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
const char *name, grpc_resource_quota *resource_quota,
size_t read_slice_size) {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
grpc_channel_args *args) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_asprintf(&final_name, "%s:client", name);
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), resource_quota,
read_slice_size, "socketpair-server");
p.client = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], final_name), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), resource_quota,
read_slice_size, "socketpair-client");
p.server = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[0], final_name), args,
"socketpair-client");
gpr_free(final_name);
grpc_exec_ctx_finish(&exec_ctx);
return p;
}

@ -41,9 +41,8 @@
#include "src/core/lib/iomgr/endpoint_pair.h"
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
const char *name, grpc_resource_quota *resource_quota,
size_t read_slice_size) {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
grpc_channel_args *args) {
grpc_endpoint_pair endpoint_pair;
// TODO(mlumish): implement this properly under libuv
GPR_ASSERT(false &&

@ -83,15 +83,18 @@ static void create_sockets(SOCKET sv[2]) {
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
const char *name, grpc_resource_quota *resource_quota,
size_t read_slice_size) {
const char *name, grpc_channel_args *channel_args) {
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"),
resource_quota, "endpoint:server");
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"),
resource_quota, "endpoint:client");
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
p.client = grpc_tcp_create(&exec_ctx,
grpc_winsocket_create(sv[1], "endpoint:client"),
channel_args, "endpoint:server");
p.server = grpc_tcp_create(&exec_ctx,
grpc_winsocket_create(sv[0], "endpoint:server"),
channel_args, "endpoint:client");
grpc_exec_ctx_finish(&exec_ctx);
return p;
}

@ -142,6 +142,8 @@ struct grpc_resource_quota {
/* Amount of free memory in the resource quota */
int64_t free_pool;
gpr_atm last_size;
/* Has rq_step been scheduled to occur? */
bool step_scheduled;
/* Are we currently reclaiming memory */
@ -581,6 +583,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
resource_quota->combiner = grpc_combiner_create(NULL);
resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX;
gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
resource_quota->step_scheduled = false;
resource_quota->reclaiming = false;
gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
@ -643,11 +646,17 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
rq_resize_args *a = gpr_malloc(sizeof(*a));
a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
a->size = (int64_t)size;
gpr_atm_no_barrier_store(&resource_quota->last_size,
(gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota) {
return (size_t)gpr_atm_no_barrier_load(&resource_quota->last_size);
}
/*******************************************************************************
* grpc_resource_user channel args api
*/

@ -90,6 +90,8 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args(
double grpc_resource_quota_get_memory_pressure(
grpc_resource_quota *resource_quota);
size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota);
typedef struct grpc_resource_user grpc_resource_user;
grpc_resource_user *grpc_resource_user_create(

@ -40,10 +40,6 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
/* Channel arg (integer) setting how large a slice to try and read from the wire
each time recvmsg (or equivalent) is called */
#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size"
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
NULL on failure).

@ -137,29 +137,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
grpc_endpoint *grpc_tcp_client_create_from_fd(
grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
const char *addr_str) {
size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
8 * 1024 * 1024};
tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer(
&channel_args->args[i], options);
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
}
grpc_endpoint *ep =
grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
return ep;
return grpc_tcp_create(exec_ctx, fd, channel_args, addr_str);
}
static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {

@ -63,7 +63,7 @@ typedef struct {
int refs;
grpc_closure on_connect;
grpc_endpoint **endpoint;
grpc_resource_quota *resource_quota;
grpc_channel_args *channel_args;
} async_connect;
static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
@ -72,7 +72,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
grpc_resource_quota_unref_internal(exec_ctx, ac->resource_quota);
grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
@ -119,7 +119,8 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (!wsa_success) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
} else {
*ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
*ep =
grpc_tcp_create(exec_ctx, socket, ac->channel_args, ac->addr_name);
socket = NULL;
}
} else {
@ -152,17 +153,6 @@ static void tcp_client_connect_impl(
grpc_winsocket_callback_info *info;
grpc_error *error = GRPC_ERROR_NONE;
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
}
*endpoint = NULL;
/* Use dualstack sockets where available. */
@ -225,7 +215,7 @@ static void tcp_client_connect_impl(
ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint;
ac->resource_quota = resource_quota;
ac->channel_args = grpc_channel_args_copy(channel_args);
grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
@ -247,7 +237,6 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
grpc_closure_sched(exec_ctx, on_done, final_error);
}

@ -52,7 +52,9 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/profiling/timers.h"
@ -80,10 +82,14 @@ typedef struct {
int fd;
bool finished_edge;
msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
size_t slice_size;
double target_length;
double bytes_read_this_round;
gpr_refcount refcount;
gpr_atm shutdown_count;
int min_read_chunk_size;
int max_read_chunk_size;
/* garbage after the last read */
grpc_slice_buffer last_read_buffer;
@ -108,6 +114,42 @@ typedef struct {
grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
static void add_to_estimate(grpc_tcp *tcp, size_t bytes) {
tcp->bytes_read_this_round += (double)bytes;
}
static void finish_estimate(grpc_tcp *tcp) {
/* If we read >80% of the target buffer in one read loop, increase the size
of the target buffer to either the amount read, or twice its previous
value */
if (tcp->bytes_read_this_round > tcp->target_length * 0.8) {
tcp->target_length =
GPR_MAX(2 * tcp->target_length, tcp->bytes_read_this_round);
} else {
tcp->target_length =
0.99 * tcp->target_length + 0.01 * tcp->bytes_read_this_round;
}
tcp->bytes_read_this_round = 0;
}
static size_t get_target_read_size(grpc_tcp *tcp) {
grpc_resource_quota *rq = grpc_resource_user_quota(tcp->resource_user);
double pressure = grpc_resource_quota_get_memory_pressure(rq);
double target =
tcp->target_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0);
size_t sz = (((size_t)GPR_CLAMP(target, tcp->min_read_chunk_size,
tcp->max_read_chunk_size)) +
255) &
~(size_t)255;
/* don't use more than 1/16th of the overall resource quota for a single read
* alloc */
size_t rqmax = grpc_resource_quota_peek_size(rq);
if (sz > rqmax / 16 && rqmax > 1024) {
sz = rqmax / 16;
}
return sz;
}
static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) {
return grpc_error_set_str(
grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
@ -232,9 +274,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
if (tcp->iov_size > 1) {
tcp->iov_size /= 2;
}
finish_estimate(tcp);
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
@ -253,14 +293,13 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
add_to_estimate(tcp, (size_t)read_bytes);
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
grpc_slice_buffer_trim_end(
tcp->incoming_buffer,
tcp->incoming_buffer->length - (size_t)read_bytes,
&tcp->last_read_buffer);
} else if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size;
}
GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
@ -285,11 +324,11 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
}
static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
grpc_resource_user_alloc_slices(
exec_ctx, &tcp->slice_allocator, tcp->slice_size,
(size_t)tcp->iov_size - tcp->incoming_buffer->count,
tcp->incoming_buffer);
size_t target_read_size = get_target_read_size(tcp);
if (tcp->incoming_buffer->length < target_read_size &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator,
target_read_size, 1, tcp->incoming_buffer);
} else {
tcp_do_read(exec_ctx, tcp);
}
@ -540,9 +579,50 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_get_peer,
tcp_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
grpc_resource_quota *resource_quota,
size_t slice_size, const char *peer_string) {
#define MAX_CHUNK_SIZE 32 * 1024 * 1024
grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd,
const grpc_channel_args *channel_args,
const char *peer_string) {
int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
int tcp_max_read_chunk_size = 4 * 1024 * 1024;
int tcp_min_read_chunk_size = 256;
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
MAX_CHUNK_SIZE};
tcp_read_chunk_size =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)) {
grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
MAX_CHUNK_SIZE};
tcp_min_read_chunk_size =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)) {
grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
MAX_CHUNK_SIZE};
tcp_max_read_chunk_size =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
}
if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) {
tcp_min_read_chunk_size = tcp_max_read_chunk_size;
}
tcp_read_chunk_size = GPR_CLAMP(tcp_read_chunk_size, tcp_min_read_chunk_size,
tcp_max_read_chunk_size);
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->peer_string = gpr_strdup(peer_string);
@ -552,7 +632,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
tcp->release_fd_cb = NULL;
tcp->release_fd = NULL;
tcp->incoming_buffer = NULL;
tcp->slice_size = slice_size;
tcp->target_length = (double)tcp_read_chunk_size;
tcp->min_read_chunk_size = tcp_min_read_chunk_size;
tcp->max_read_chunk_size = tcp_max_read_chunk_size;
tcp->bytes_read_this_round = 0;
tcp->iov_size = 1;
tcp->finished_edge = true;
/* paired with unref in grpc_tcp_destroy */
@ -569,6 +652,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
&tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
return &tcp->base;
}

@ -47,14 +47,13 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/ev_posix.h"
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
extern int grpc_tcp_trace;
/* Create a tcp endpoint given a file desciptor and a read slice size.
Takes ownership of fd. */
grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_resource_quota *resource_quota,
size_t read_slice_size, const char *peer_string);
grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
const grpc_channel_args *args,
const char *peer_string);
/* Return the tcp endpoint's fd, or -1 if this is not available. Does not
release the fd.

@ -59,6 +59,7 @@
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@ -90,7 +91,6 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
s->resource_quota = grpc_resource_quota_create(NULL);
s->expand_wildcard_addrs = false;
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
@ -98,27 +98,14 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
s->so_reuseport =
has_so_reuseport && (args->args[i].value.integer != 0);
} else {
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
} else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool");
}
} else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->expand_wildcard_addrs = (args->args[i].value.integer != 0);
} else {
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer");
@ -138,6 +125,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
s->head = NULL;
s->tail = NULL;
s->nports = 0;
s->channel_args = grpc_channel_args_copy(args);
gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
*server = s;
return GRPC_ERROR_NONE;
@ -158,8 +146,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
s->head = sp->next;
gpr_free(sp);
}
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
grpc_channel_args_destroy(exec_ctx, s->channel_args);
gpr_free(s);
}
@ -286,8 +273,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, sp->server->resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
grpc_tcp_create(exec_ctx, fdobj, sp->server->channel_args, addr_str),
read_notifier_pollset, acceptor);
gpr_free(name);

@ -103,7 +103,8 @@ struct grpc_tcp_server {
/* next pollset to assign a channel to */
gpr_atm next_pollset_to_assign;
grpc_resource_quota *resource_quota;
/* channel args for this server */
grpc_channel_args *channel_args;
};
/* If successful, add a listener to \a s for \a addr, set \a dsmode for the

@ -46,6 +46,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/pollset_windows.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -102,7 +103,7 @@ struct grpc_tcp_server {
/* shutdown callback */
grpc_closure *shutdown_complete;
grpc_resource_quota *resource_quota;
grpc_channel_args *channel_args;
};
/* Public function. Allocates the proper data structures to hold a
@ -112,21 +113,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
const grpc_channel_args *args,
grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->resource_quota = grpc_resource_quota_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool");
}
}
}
s->channel_args = grpc_channel_args_copy(args);
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
@ -155,7 +142,7 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
grpc_winsocket_destroy(sp->socket);
gpr_free(sp);
}
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
grpc_channel_args_destroy(exec_ctx, s->channel_args);
gpr_free(s);
}
@ -383,8 +370,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_free(utf8_message);
}
gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
sp->server->resource_quota, peer_name_string);
ep = grpc_tcp_create(exec_ctx, grpc_winsocket_create(sock, fd_name),
sp->server->channel_args, peer_name_string);
gpr_free(fd_name);
gpr_free(peer_name_string);
} else {

@ -430,9 +430,19 @@ static grpc_endpoint_vtable vtable = {win_read,
win_get_peer,
win_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_resource_quota *resource_quota,
grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_channel_args *channel_args,
char *peer_string) {
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
}
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;

@ -50,8 +50,8 @@
/* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle.
*/
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_resource_quota *resource_quota,
grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_channel_args *channel_args,
char *peer_string);
grpc_error *grpc_tcp_prepare_socket(SOCKET sock);

@ -117,10 +117,7 @@ void grpc_run_bad_client_test(
grpc_init();
/* Create endpoints */
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("bad_client_test");
sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 65536);
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
sfd = grpc_iomgr_create_endpoint_pair("fixture", NULL);
/* Create server, completion events */
a.server = grpc_server_create(NULL, NULL);

@ -96,9 +96,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.fixture_data = sfd;
f.cq = grpc_completion_queue_create(NULL);
grpc_resource_quota *resource_quota = grpc_resource_quota_create("fixture");
*sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 65536);
grpc_resource_quota_unref(resource_quota);
*sfd = grpc_iomgr_create_endpoint_pair("fixture", NULL);
return f;
}

@ -90,9 +90,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.fixture_data = sfd;
f.cq = grpc_completion_queue_create(NULL);
grpc_resource_quota *resource_quota = grpc_resource_quota_create("fixture");
*sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 65536);
grpc_resource_quota_unref(resource_quota);
*sfd = grpc_iomgr_create_endpoint_pair("fixture", NULL);
return f;
}

@ -90,9 +90,17 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.fixture_data = sfd;
f.cq = grpc_completion_queue_create(NULL);
grpc_resource_quota *resource_quota = grpc_resource_quota_create("fixture");
*sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 1);
grpc_resource_quota_unref(resource_quota);
grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = 1},
{.key = GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = 1},
{.key = GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = 1}};
grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
*sfd = grpc_iomgr_create_endpoint_pair("fixture", &args);
return f;
}

@ -49,11 +49,11 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair(
size_t slice_size) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_test_fixture f;
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("endpoint_pair_test");
grpc_endpoint_pair p =
grpc_iomgr_create_endpoint_pair("test", resource_quota, slice_size);
grpc_resource_quota_unref(resource_quota);
grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = (int)slice_size}};
grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", &args);
f.client_ep = p.client;
f.server_ep = p.server;

@ -57,7 +57,7 @@ int main(int argc, char **argv) {
for (i = 0; i < 100; i++) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
p = grpc_iomgr_create_endpoint_pair("test", resource_quota, 1);
p = grpc_iomgr_create_endpoint_pair("test", NULL);
grpc_endpoint_destroy(&exec_ctx, p.client);
grpc_endpoint_destroy(&exec_ctx, p.server);
grpc_exec_ctx_finish(&exec_ctx);

@ -183,10 +183,12 @@ static void read_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
grpc_resource_quota *resource_quota = grpc_resource_quota_create("read_test");
ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota,
slice_size, "test");
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = (int)slice_size}};
grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args,
"test");
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@ -233,11 +235,12 @@ static void large_read_test(size_t slice_size) {
create_sockets(sv);
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("large_read_test");
ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), resource_quota,
slice_size, "test");
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = (int)slice_size}};
grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "large_read_test"),
&args, "test");
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket(sv[0]);
@ -372,11 +375,12 @@ static void write_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("write_test");
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = (int)slice_size}};
grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "write_test"), &args,
"test");
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
state.ep = ep;
@ -441,12 +445,13 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("release_fd_test");
ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota,
slice_size, "test");
grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = (int)slice_size}};
grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args,
"test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@ -534,10 +539,14 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
create_sockets(sv);
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("tcp_posix_test_socketpair");
f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"),
resource_quota, slice_size, "test");
f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"),
resource_quota, slice_size, "test");
grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = (int)slice_size}};
grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
f.client_ep = grpc_tcp_create(
&exec_ctx, grpc_fd_create(sv[0], "fixture:client"), &args, "test");
f.server_ep = grpc_tcp_create(
&exec_ctx, grpc_fd_create(sv[1], "fixture:server"), &args, "test");
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset);
grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset);

@ -39,6 +39,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/endpoint_pair.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/security/transport/secure_endpoint.h"
@ -57,10 +58,11 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
grpc_endpoint_test_fixture f;
grpc_endpoint_pair tcp;
grpc_resource_quota *resource_quota =
grpc_resource_quota_create("secure_endpoint_test");
tcp = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, slice_size);
grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE,
.type = GRPC_ARG_INTEGER,
.value.integer = (int)slice_size}};
grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a};
tcp = grpc_iomgr_create_endpoint_pair("fixture", &args);
grpc_endpoint_add_to_pollset(&exec_ctx, tcp.client, g_pollset);
grpc_endpoint_add_to_pollset(&exec_ctx, tcp.server, g_pollset);

@ -212,8 +212,8 @@ class EndpointPairFixture : public BaseFixture {
class SockPair : public EndpointPairFixture {
public:
SockPair(Service* service)
: EndpointPairFixture(service, grpc_iomgr_create_endpoint_pair(
"test", Library::get().rq(), 8192)) {}
: EndpointPairFixture(service,
grpc_iomgr_create_endpoint_pair("test", NULL)) {}
};
class InProcessCHTTP2 : public EndpointPairFixture {

Loading…
Cancel
Save