Merge pull request #8588 from murgatroid99/uv_resource_quota

Add resource quota support to uv TCP code
pull/8611/head
Michael Lumish 8 years ago committed by GitHub
commit 11948f7441
  1. 7
      src/core/lib/iomgr/resource_quota.c
  2. 5
      src/core/lib/iomgr/resource_quota.h
  3. 36
      src/core/lib/iomgr/tcp_client_uv.c
  4. 24
      src/core/lib/iomgr/tcp_server_uv.c
  5. 71
      src/core/lib/iomgr/tcp_uv.c
  6. 4
      src/core/lib/iomgr/tcp_uv.h

@ -715,3 +715,10 @@ void grpc_resource_user_alloc_slices(
grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user,
count * length, &slice_allocator->on_allocated); count * length, &slice_allocator->on_allocated);
} }
gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
size_t size) {
grpc_resource_user_alloc(exec_ctx, resource_user, size, NULL);
return ru_slice_create(resource_user, size);
}

@ -221,4 +221,9 @@ void grpc_resource_user_alloc_slices(
grpc_resource_user_slice_allocator *slice_allocator, size_t length, grpc_resource_user_slice_allocator *slice_allocator, size_t length,
size_t count, gpr_slice_buffer *dest); size_t count, gpr_slice_buffer *dest);
/* Allocate one slice of length \a size synchronously. */
gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
size_t size);
#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ #endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */

@ -54,9 +54,12 @@ typedef struct grpc_uv_tcp_connect {
grpc_endpoint **endpoint; grpc_endpoint **endpoint;
int refs; int refs;
char *addr_name; char *addr_name;
grpc_resource_quota *resource_quota;
} grpc_uv_tcp_connect; } grpc_uv_tcp_connect;
static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) { static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx,
grpc_uv_tcp_connect *connect) {
grpc_resource_quota_internal_unref(exec_ctx, connect->resource_quota);
gpr_free(connect); gpr_free(connect);
} }
@ -74,7 +77,7 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
} }
done = (--connect->refs == 0); done = (--connect->refs == 0);
if (done) { if (done) {
uv_tcp_connect_cleanup(connect); uv_tcp_connect_cleanup(exec_ctx, connect);
} }
} }
@ -86,8 +89,8 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
grpc_closure *closure = connect->closure; grpc_closure *closure = connect->closure;
grpc_timer_cancel(&exec_ctx, &connect->alarm); grpc_timer_cancel(&exec_ctx, &connect->alarm);
if (status == 0) { if (status == 0) {
*connect->endpoint = *connect->endpoint = grpc_tcp_create(
grpc_tcp_create(connect->tcp_handle, connect->addr_name); connect->tcp_handle, connect->resource_quota, connect->addr_name);
} else { } else {
error = GRPC_ERROR_CREATE("Failed to connect to remote host"); error = GRPC_ERROR_CREATE("Failed to connect to remote host");
error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status); error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status);
@ -105,7 +108,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
} }
done = (--connect->refs == 0); done = (--connect->refs == 0);
if (done) { if (done) {
uv_tcp_connect_cleanup(connect); uv_tcp_connect_cleanup(&exec_ctx, connect);
} }
grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL); grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
@ -114,16 +117,31 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_endpoint **ep, grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *resolved_addr, const grpc_resolved_address *resolved_addr,
gpr_timespec deadline) { gpr_timespec deadline) {
grpc_uv_tcp_connect *connect; grpc_uv_tcp_connect *connect;
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
(void)channel_args;
(void)interested_parties; (void)interested_parties;
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_internal_ref(
channel_args->args[i].value.pointer.p);
}
}
}
connect = gpr_malloc(sizeof(grpc_uv_tcp_connect)); connect = gpr_malloc(sizeof(grpc_uv_tcp_connect));
memset(connect, 0, sizeof(grpc_uv_tcp_connect)); memset(connect, 0, sizeof(grpc_uv_tcp_connect));
connect->closure = closure; connect->closure = closure;
connect->endpoint = ep; connect->endpoint = ep;
connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t)); connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t));
connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
connect->resource_quota = resource_quota;
uv_tcp_init(uv_default_loop(), connect->tcp_handle); uv_tcp_init(uv_default_loop(), connect->tcp_handle);
connect->connect_req.data = connect; connect->connect_req.data = connect;
// TODO(murgatroid99): figure out what the return value here means // TODO(murgatroid99): figure out what the return value here means
@ -138,16 +156,18 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
// overridden by api_fuzzer.c // overridden by api_fuzzer.c
void (*grpc_tcp_client_connect_impl)( void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, const grpc_resolved_address *addr, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline) = tcp_client_connect_impl; gpr_timespec deadline) = tcp_client_connect_impl;
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_endpoint **ep, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr, const grpc_resolved_address *addr,
gpr_timespec deadline) { gpr_timespec deadline) {
grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
deadline); channel_args, addr, deadline);
} }
#endif /* GRPC_UV */ #endif /* GRPC_UV */

@ -76,13 +76,30 @@ struct grpc_tcp_server {
/* shutdown callback */ /* shutdown callback */
grpc_closure *shutdown_complete; grpc_closure *shutdown_complete;
grpc_resource_quota *resource_quota;
}; };
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_closure *shutdown_complete,
const grpc_channel_args *args, const grpc_channel_args *args,
grpc_tcp_server **server) { grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
(void)args; s->resource_quota = grpc_resource_quota_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
s->resource_quota =
grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
} else {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
}
}
}
gpr_ref_init(&s->refs, 1); gpr_ref_init(&s->refs, 1);
s->on_accept_cb = NULL; s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL; s->on_accept_cb_arg = NULL;
@ -119,6 +136,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp->handle); gpr_free(sp->handle);
gpr_free(sp); gpr_free(sp);
} }
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s); gpr_free(s);
} }
@ -201,7 +219,7 @@ static void on_connect(uv_stream_t *server, int status) {
} else { } else {
gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
} }
ep = grpc_tcp_create(client, peer_name_string); ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor); &acceptor);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);

@ -54,6 +54,9 @@ typedef struct {
grpc_endpoint base; grpc_endpoint base;
gpr_refcount refcount; gpr_refcount refcount;
uv_write_t write_req;
uv_shutdown_t shutdown_req;
uv_tcp_t *handle; uv_tcp_t *handle;
grpc_closure *read_cb; grpc_closure *read_cb;
@ -64,14 +67,23 @@ typedef struct {
gpr_slice_buffer *write_slices; gpr_slice_buffer *write_slices;
uv_buf_t *write_buffers; uv_buf_t *write_buffers;
grpc_resource_user resource_user;
bool shutting_down; bool shutting_down;
bool resource_user_shutting_down;
char *peer_string; char *peer_string;
grpc_pollset *pollset; grpc_pollset *pollset;
} grpc_tcp; } grpc_tcp;
static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); } static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); }
static void tcp_free(grpc_tcp *tcp) { gpr_free(tcp); } static void tcp_free(grpc_tcp *tcp) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user);
gpr_free(tcp);
grpc_exec_ctx_finish(&exec_ctx);
}
/*#define GRPC_TCP_REFCOUNT_DEBUG*/ /*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG #ifdef GRPC_TCP_REFCOUNT_DEBUG
@ -106,11 +118,14 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
uv_buf_t *buf) { uv_buf_t *buf) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp *tcp = handle->data; grpc_tcp *tcp = handle->data;
(void)suggested_size; (void)suggested_size;
tcp->read_slice = gpr_slice_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE); tcp->read_slice = grpc_resource_user_slice_malloc(
&exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice); buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
buf->len = GPR_SLICE_LENGTH(tcp->read_slice); buf->len = GPR_SLICE_LENGTH(tcp->read_slice);
grpc_exec_ctx_finish(&exec_ctx);
} }
static void read_callback(uv_stream_t *stream, ssize_t nread, static void read_callback(uv_stream_t *stream, ssize_t nread,
@ -198,7 +213,8 @@ static void write_callback(uv_write_t *req, int status) {
gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
} }
gpr_free(tcp->write_buffers); gpr_free(tcp->write_buffers);
gpr_free(req); grpc_resource_user_free(&exec_ctx, &tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count);
grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
@ -243,12 +259,15 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->write_cb = cb; tcp->write_cb = cb;
buffer_count = (unsigned int)tcp->write_slices->count; buffer_count = (unsigned int)tcp->write_slices->count;
buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count); buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
grpc_resource_user_alloc(exec_ctx, &tcp->resource_user,
sizeof(uv_buf_t) * buffer_count, NULL);
for (i = 0; i < buffer_count; i++) { for (i = 0; i < buffer_count; i++) {
slice = &tcp->write_slices->slices[i]; slice = &tcp->write_slices->slices[i];
buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice); buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice);
buffers[i].len = GPR_SLICE_LENGTH(*slice); buffers[i].len = GPR_SLICE_LENGTH(*slice);
} }
write_req = gpr_malloc(sizeof(uv_write_t)); tcp->write_buffers = buffers;
write_req = &tcp->write_req;
write_req->data = tcp; write_req->data = tcp;
TCP_REF(tcp, "write"); TCP_REF(tcp, "write");
// TODO(murgatroid99): figure out what the return value here means // TODO(murgatroid99): figure out what the return value here means
@ -274,13 +293,29 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
(void)pollset; (void)pollset;
} }
static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); } static void shutdown_callback(uv_shutdown_t *req, int status) {}
static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
TCP_UNREF(arg, "resource_user");
}
static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx,
grpc_tcp *tcp) {
if (!tcp->resource_user_shutting_down) {
tcp->resource_user_shutting_down = true;
TCP_REF(tcp, "resource_user");
grpc_resource_user_shutdown(
exec_ctx, &tcp->resource_user,
grpc_closure_create(resource_user_shutdown_done, tcp));
}
}
static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
if (!tcp->shutting_down) { if (!tcp->shutting_down) {
tcp->shutting_down = true; tcp->shutting_down = true;
uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t)); uv_shutdown_t *req = &tcp->shutdown_req;
uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
} }
} }
@ -289,6 +324,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep); grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
uv_close((uv_handle_t *)tcp->handle, uv_close_callback); uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
uv_resource_user_maybe_shutdown(exec_ctx, tcp);
TCP_UNREF(tcp, "destroy"); TCP_UNREF(tcp, "destroy");
} }
@ -297,18 +333,21 @@ static char *uv_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string); return gpr_strdup(tcp->peer_string);
} }
static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return &tcp->resource_user;
}
static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
static grpc_endpoint_vtable vtable = {uv_endpoint_read, static grpc_endpoint_vtable vtable = {
uv_endpoint_write, uv_endpoint_read, uv_endpoint_write, uv_get_workqueue,
uv_get_workqueue, uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown,
uv_add_to_pollset, uv_destroy, uv_get_resource_user, uv_get_peer};
uv_add_to_pollset_set,
uv_endpoint_shutdown,
uv_destroy,
uv_get_peer};
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
if (grpc_tcp_trace) { if (grpc_tcp_trace) {
@ -325,6 +364,8 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string); tcp->peer_string = gpr_strdup(peer_string);
tcp->shutting_down = false; tcp->shutting_down = false;
tcp->resource_user_shutting_down = false;
grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */ /* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base); grpc_network_status_register_endpoint(&tcp->base);

@ -52,6 +52,8 @@ extern int grpc_tcp_trace;
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string); grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
char *peer_string);
#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */ #endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */

Loading…
Cancel
Save