diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index da3e284fcf2..947e68f3f2f 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -256,7 +256,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, state = gpr_malloc(sizeof(*state)); memset(state, 0, sizeof(*state)); grpc_closure_init(&state->destroy_closure, destroy_done, state); - err = grpc_tcp_server_create(&state->destroy_closure, + err = grpc_tcp_server_create(&exec_ctx, &state->destroy_closure, grpc_server_get_channel_args(server), &tcp); if (err != GRPC_ERROR_NONE) { goto error; diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 18135bcb58e..593da734f29 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -71,6 +71,7 @@ typedef struct { grpc_closure done_write; grpc_closure connected; grpc_error *overall_error; + grpc_buffer_pool *buffer_pool; } internal_request; static grpc_httpcli_get_override g_get_override = NULL; @@ -118,6 +119,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, gpr_slice_buffer_destroy(&req->incoming); gpr_slice_buffer_destroy(&req->outgoing); GRPC_ERROR_UNREF(req->overall_error); + grpc_buffer_pool_internal_unref(exec_ctx, req->buffer_pool); gpr_free(req); } @@ -224,9 +226,10 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, } addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init(&req->connected, on_connected, req); - grpc_tcp_client_connect( - exec_ctx, &req->connected, &req->ep, req->context->pollset_set, - (struct sockaddr *)&addr->addr, addr->len, req->deadline); + grpc_tcp_client_connect(exec_ctx, &req->connected, &req->ep, + req->context->pollset_set, req->buffer_pool, + (struct sockaddr *)&addr->addr, addr->len, + req->deadline); } static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -242,6 +245,7 @@ static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void internal_request_begin(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, + grpc_buffer_pool *buffer_pool, const grpc_httpcli_request *request, gpr_timespec deadline, grpc_closure *on_done, grpc_httpcli_response *response, @@ -257,6 +261,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, req->context = context; req->pollent = pollent; req->overall_error = GRPC_ERROR_NONE; + req->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); grpc_closure_init(&req->on_read, on_read, req); grpc_closure_init(&req->done_write, done_write, req); gpr_slice_buffer_init(&req->incoming); @@ -274,6 +279,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, + grpc_buffer_pool *buffer_pool, const grpc_httpcli_request *request, gpr_timespec deadline, grpc_closure *on_done, grpc_httpcli_response *response) { @@ -283,14 +289,15 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, return; } gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->http.path); - internal_request_begin(exec_ctx, context, pollent, request, deadline, on_done, - response, name, + internal_request_begin(exec_ctx, context, pollent, buffer_pool, request, + deadline, on_done, response, name, grpc_httpcli_format_get_request(request)); gpr_free(name); } void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, + grpc_buffer_pool *buffer_pool, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, grpc_closure *on_done, @@ -303,7 +310,8 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, } gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->http.path); internal_request_begin( - exec_ctx, context, pollent, request, deadline, on_done, response, name, + exec_ctx, context, pollent, buffer_pool, request, deadline, on_done, + response, name, grpc_httpcli_format_post_request(request, body_bytes, body_size)); gpr_free(name); } diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index 662e176f4cc..7c7b20433e7 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -97,6 +97,7 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context *context); supplied pointer to pass to said call) */ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, + grpc_buffer_pool *buffer_pool, const grpc_httpcli_request *request, gpr_timespec deadline, grpc_closure *on_complete, grpc_httpcli_response *response); @@ -118,6 +119,7 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, Does not support ?var1=val1&var2=val2 in the path. */ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, grpc_polling_entity *pollent, + grpc_buffer_pool *buffer_pool, const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, grpc_closure *on_complete, diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 894efc0b238..453bfd1d560 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -37,6 +37,7 @@ #include #include #include +#include "src/core/lib/iomgr/buffer_pool.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" @@ -58,6 +59,7 @@ struct grpc_endpoint_vtable { grpc_pollset_set *pollset); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); + grpc_buffer_user *(*get_buffer_user)(grpc_endpoint *ep); char *(*get_peer)(grpc_endpoint *ep); }; @@ -99,6 +101,8 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset_set); +grpc_buffer_user *grpc_endpoint_get_buffer_user(grpc_endpoint *endpoint); + struct grpc_endpoint { const grpc_endpoint_vtable *vtable; }; diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index 5cd78051bdf..4938cf85993 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -41,7 +41,7 @@ typedef struct { grpc_endpoint *server; } grpc_endpoint_pair; -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size); +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( + const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size); #endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c index e295fb4867a..64c161675f3 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.c +++ b/src/core/lib/iomgr/endpoint_pair_posix.c @@ -62,20 +62,20 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE); } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( + const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size) { int sv[2]; grpc_endpoint_pair p; char *final_name; create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size, - "socketpair-server"); + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), buffer_pool, + read_slice_size, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size, - "socketpair-client"); + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), buffer_pool, + read_slice_size, "socketpair-client"); gpr_free(final_name); return p; } diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index a07e0b9f0cb..04e4108b35f 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -47,6 +47,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, + grpc_buffer_pool *buffer_pool, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline); diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 80c7a3f1287..42ceb339339 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -69,6 +69,7 @@ typedef struct { char *addr_str; grpc_endpoint **ep; grpc_closure *closure; + grpc_buffer_pool *buffer_pool; } async_connect; static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { @@ -114,6 +115,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); + grpc_buffer_pool_internal_unref(exec_ctx, ac->buffer_pool); gpr_free(ac); } } @@ -190,7 +192,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { } } else { grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd); - *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); + *ep = grpc_tcp_create(fd, ac->buffer_pool, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); fd = NULL; goto finish; } @@ -227,6 +230,7 @@ finish: static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, + grpc_buffer_pool *buffer_pool, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { int fd; @@ -275,7 +279,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, fdobj = grpc_fd_create(fd, name); if (err >= 0) { - *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); + *ep = grpc_tcp_create(fdobj, buffer_pool, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, + addr_str); grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } @@ -300,6 +305,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, ac->refs = 2; ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; + ac->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); if (grpc_tcp_trace) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", @@ -321,16 +327,18 @@ done: // overridden by api_fuzzer.c void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, - grpc_pollset_set *interested_parties, const struct sockaddr *addr, - size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl; + grpc_pollset_set *interested_parties, grpc_buffer_pool *buffer_pool, + const struct sockaddr *addr, size_t addr_len, + gpr_timespec deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, + grpc_buffer_pool *buffer_pool, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { - grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, - addr_len, deadline); + grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, + buffer_pool, addr, addr_len, deadline); } #endif diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 00fd77679ad..fd8fcb05c3c 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -100,6 +100,8 @@ typedef struct { grpc_closure write_closure; char *peer_string; + + grpc_buffer_user buffer_user; } grpc_tcp; static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, @@ -469,6 +471,11 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { return grpc_fd_get_workqueue(tcp->em_fd); } +static grpc_buffer_user *tcp_get_buffer_user(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return &tcp->buffer_user; +} + static const grpc_endpoint_vtable vtable = {tcp_read, tcp_write, tcp_get_workqueue, @@ -476,10 +483,11 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_add_to_pollset_set, tcp_shutdown, tcp_destroy, + tcp_get_buffer_user, tcp_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, - const char *peer_string) { +grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool, + size_t slice_size, const char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->peer_string = gpr_strdup(peer_string); @@ -500,6 +508,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; gpr_slice_buffer_init(&tcp->last_read_buffer); + grpc_buffer_user_init(&tcp->buffer_user, buffer_pool); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index 99125836d69..768355cf0c6 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -53,8 +53,8 @@ extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size, - const char *peer_string); +grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_buffer_pool *buffer_pool, + size_t read_slice_size, const char *peer_string); /* Return the tcp endpoint's fd, or -1 if this is not available. Does not release the fd. diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 5a25d39a0c4..409d1a8e96f 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -60,7 +60,8 @@ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, /* Create a server, initially not bound to any ports. The caller owns one ref. If shutdown_complete is not NULL, it will be used by grpc_tcp_server_unref() when the ref count reaches zero. */ -grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, +grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, + grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 2d3f6cf9a7d..33041523859 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -137,6 +137,8 @@ struct grpc_tcp_server { /* next pollset to assign a channel to */ gpr_atm next_pollset_to_assign; + + grpc_buffer_pool *buffer_pool; }; static gpr_once check_init = GPR_ONCE_INIT; @@ -153,23 +155,37 @@ static void init(void) { #endif } -grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, +grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, + grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server) { gpr_once_init(&check_init, init); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; + s->buffer_pool = grpc_buffer_pool_create(); for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { s->so_reuseport = has_so_reuseport && (args->args[i].value.integer != 0); } else { + grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); gpr_free(s); return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT " must be an integer"); } + } else if (0 == strcmp(GRPC_ARG_BUFFER_POOL, args->args[i].key)) { + if (args->args[i].type == GRPC_ARG_POINTER) { + grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); + s->buffer_pool = + grpc_buffer_pool_internal_ref(args->args[i].value.pointer.p); + } else { + grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); + gpr_free(s); + return GRPC_ERROR_CREATE(GRPC_ARG_BUFFER_POOL + " must be a pointer to a buffer pool"); + } } } gpr_ref_init(&s->refs, 1); @@ -203,6 +219,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(sp); } + grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); + gpr_free(s); } @@ -419,7 +437,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), + grpc_tcp_create(fdobj, sp->server->buffer_pool, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), read_notifier_pollset, &acceptor); gpr_free(name); diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index 312a3d4f90b..3bcde3da8b1 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -124,11 +124,13 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); grpc_httpcli_get( - &exec_ctx, &context, &detector.pollent, &request, + &exec_ctx, &context, &detector.pollent, buffer_pool, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), grpc_closure_create(on_compute_engine_detection_http_response, &detector), &detector.response); + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); grpc_exec_ctx_flush(&exec_ctx);