Buffer pool integration progress

reviewable/pr8239/r2
Craig Tiller 9 years ago
parent 9cf0cec74c
commit e34c285ac9
  1. 2
      src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
  2. 20
      src/core/lib/http/httpcli.c
  3. 2
      src/core/lib/http/httpcli.h
  4. 4
      src/core/lib/iomgr/endpoint.h
  5. 4
      src/core/lib/iomgr/endpoint_pair.h
  6. 12
      src/core/lib/iomgr/endpoint_pair_posix.c
  7. 1
      src/core/lib/iomgr/tcp_client.h
  8. 20
      src/core/lib/iomgr/tcp_client_posix.c
  9. 13
      src/core/lib/iomgr/tcp_posix.c
  10. 4
      src/core/lib/iomgr/tcp_posix.h
  11. 3
      src/core/lib/iomgr/tcp_server.h
  12. 23
      src/core/lib/iomgr/tcp_server_posix.c
  13. 4
      src/core/lib/security/credentials/google_default/google_default_credentials.c

@ -256,7 +256,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
state = gpr_malloc(sizeof(*state));
memset(state, 0, sizeof(*state));
grpc_closure_init(&state->destroy_closure, destroy_done, state);
err = grpc_tcp_server_create(&state->destroy_closure,
err = grpc_tcp_server_create(&exec_ctx, &state->destroy_closure,
grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;

@ -71,6 +71,7 @@ typedef struct {
grpc_closure done_write;
grpc_closure connected;
grpc_error *overall_error;
grpc_buffer_pool *buffer_pool;
} internal_request;
static grpc_httpcli_get_override g_get_override = NULL;
@ -118,6 +119,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
gpr_slice_buffer_destroy(&req->incoming);
gpr_slice_buffer_destroy(&req->outgoing);
GRPC_ERROR_UNREF(req->overall_error);
grpc_buffer_pool_internal_unref(exec_ctx, req->buffer_pool);
gpr_free(req);
}
@ -224,9 +226,10 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req,
}
addr = &req->addresses->addrs[req->next_address++];
grpc_closure_init(&req->connected, on_connected, req);
grpc_tcp_client_connect(
exec_ctx, &req->connected, &req->ep, req->context->pollset_set,
(struct sockaddr *)&addr->addr, addr->len, req->deadline);
grpc_tcp_client_connect(exec_ctx, &req->connected, &req->ep,
req->context->pollset_set, req->buffer_pool,
(struct sockaddr *)&addr->addr, addr->len,
req->deadline);
}
static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@ -242,6 +245,7 @@ static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void internal_request_begin(grpc_exec_ctx *exec_ctx,
grpc_httpcli_context *context,
grpc_polling_entity *pollent,
grpc_buffer_pool *buffer_pool,
const grpc_httpcli_request *request,
gpr_timespec deadline, grpc_closure *on_done,
grpc_httpcli_response *response,
@ -257,6 +261,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
req->context = context;
req->pollent = pollent;
req->overall_error = GRPC_ERROR_NONE;
req->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool);
grpc_closure_init(&req->on_read, on_read, req);
grpc_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming);
@ -274,6 +279,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx,
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_polling_entity *pollent,
grpc_buffer_pool *buffer_pool,
const grpc_httpcli_request *request,
gpr_timespec deadline, grpc_closure *on_done,
grpc_httpcli_response *response) {
@ -283,14 +289,15 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
return;
}
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->http.path);
internal_request_begin(exec_ctx, context, pollent, request, deadline, on_done,
response, name,
internal_request_begin(exec_ctx, context, pollent, buffer_pool, request,
deadline, on_done, response, name,
grpc_httpcli_format_get_request(request));
gpr_free(name);
}
void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_polling_entity *pollent,
grpc_buffer_pool *buffer_pool,
const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
gpr_timespec deadline, grpc_closure *on_done,
@ -303,7 +310,8 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
}
gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->http.path);
internal_request_begin(
exec_ctx, context, pollent, request, deadline, on_done, response, name,
exec_ctx, context, pollent, buffer_pool, request, deadline, on_done,
response, name,
grpc_httpcli_format_post_request(request, body_bytes, body_size));
gpr_free(name);
}

@ -97,6 +97,7 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context *context);
supplied pointer to pass to said call) */
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_polling_entity *pollent,
grpc_buffer_pool *buffer_pool,
const grpc_httpcli_request *request,
gpr_timespec deadline, grpc_closure *on_complete,
grpc_httpcli_response *response);
@ -118,6 +119,7 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
Does not support ?var1=val1&var2=val2 in the path. */
void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
grpc_polling_entity *pollent,
grpc_buffer_pool *buffer_pool,
const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
gpr_timespec deadline, grpc_closure *on_complete,

@ -37,6 +37,7 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/buffer_pool.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
@ -58,6 +59,7 @@ struct grpc_endpoint_vtable {
grpc_pollset_set *pollset);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
grpc_buffer_user *(*get_buffer_user)(grpc_endpoint *ep);
char *(*get_peer)(grpc_endpoint *ep);
};
@ -99,6 +101,8 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set);
grpc_buffer_user *grpc_endpoint_get_buffer_user(grpc_endpoint *endpoint);
struct grpc_endpoint {
const grpc_endpoint_vtable *vtable;
};

@ -41,7 +41,7 @@ typedef struct {
grpc_endpoint *server;
} grpc_endpoint_pair;
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size);
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size);
#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */

@ -62,20 +62,20 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE);
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size) {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
gpr_asprintf(&final_name, "%s:client", name);
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size,
"socketpair-server");
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), buffer_pool,
read_slice_size, "socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size,
"socketpair-client");
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), buffer_pool,
read_slice_size, "socketpair-client");
gpr_free(final_name);
return p;
}

@ -47,6 +47,7 @@
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect,
grpc_endpoint **endpoint,
grpc_pollset_set *interested_parties,
grpc_buffer_pool *buffer_pool,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline);

@ -69,6 +69,7 @@ typedef struct {
char *addr_str;
grpc_endpoint **ep;
grpc_closure *closure;
grpc_buffer_pool *buffer_pool;
} async_connect;
static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) {
@ -114,6 +115,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
grpc_buffer_pool_internal_unref(exec_ctx, ac->buffer_pool);
gpr_free(ac);
}
}
@ -190,7 +192,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
}
} else {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
*ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
*ep = grpc_tcp_create(fd, ac->buffer_pool,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
fd = NULL;
goto finish;
}
@ -227,6 +230,7 @@ finish:
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
grpc_buffer_pool *buffer_pool,
const struct sockaddr *addr,
size_t addr_len, gpr_timespec deadline) {
int fd;
@ -275,7 +279,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
fdobj = grpc_fd_create(fd, name);
if (err >= 0) {
*ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str);
*ep = grpc_tcp_create(fdobj, buffer_pool, GRPC_TCP_DEFAULT_READ_SLICE_SIZE,
addr_str);
grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
goto done;
}
@ -300,6 +305,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
ac->refs = 2;
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;
ac->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool);
if (grpc_tcp_trace) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
@ -321,16 +327,18 @@ done:
// overridden by api_fuzzer.c
void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, const struct sockaddr *addr,
size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl;
grpc_pollset_set *interested_parties, grpc_buffer_pool *buffer_pool,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) = tcp_client_connect_impl;
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
grpc_buffer_pool *buffer_pool,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) {
grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr,
addr_len, deadline);
grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
buffer_pool, addr, addr_len, deadline);
}
#endif

@ -100,6 +100,8 @@ typedef struct {
grpc_closure write_closure;
char *peer_string;
grpc_buffer_user buffer_user;
} grpc_tcp;
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
@ -469,6 +471,11 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
return grpc_fd_get_workqueue(tcp->em_fd);
}
static grpc_buffer_user *tcp_get_buffer_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return &tcp->buffer_user;
}
static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_write,
tcp_get_workqueue,
@ -476,10 +483,11 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_add_to_pollset_set,
tcp_shutdown,
tcp_destroy,
tcp_get_buffer_user,
tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) {
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool,
size_t slice_size, const char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->peer_string = gpr_strdup(peer_string);
@ -500,6 +508,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
gpr_slice_buffer_init(&tcp->last_read_buffer);
grpc_buffer_user_init(&tcp->buffer_user, buffer_pool);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);

@ -53,8 +53,8 @@ extern int grpc_tcp_trace;
/* Create a tcp endpoint given a file desciptor and a read slice size.
Takes ownership of fd. */
grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size,
const char *peer_string);
grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_buffer_pool *buffer_pool,
size_t read_slice_size, const char *peer_string);
/* Return the tcp endpoint's fd, or -1 if this is not available. Does not
release the fd.

@ -60,7 +60,8 @@ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
/* Create a server, initially not bound to any ports. The caller owns one ref.
If shutdown_complete is not NULL, it will be used by
grpc_tcp_server_unref() when the ref count reaches zero. */
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_closure *shutdown_complete,
const grpc_channel_args *args,
grpc_tcp_server **server);

@ -137,6 +137,8 @@ struct grpc_tcp_server {
/* next pollset to assign a channel to */
gpr_atm next_pollset_to_assign;
grpc_buffer_pool *buffer_pool;
};
static gpr_once check_init = GPR_ONCE_INIT;
@ -153,23 +155,37 @@ static void init(void) {
#endif
}
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_closure *shutdown_complete,
const grpc_channel_args *args,
grpc_tcp_server **server) {
gpr_once_init(&check_init, init);
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
s->buffer_pool = grpc_buffer_pool_create();
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->so_reuseport =
has_so_reuseport && (args->args[i].value.integer != 0);
} else {
grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
} else if (0 == strcmp(GRPC_ARG_BUFFER_POOL, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
s->buffer_pool =
grpc_buffer_pool_internal_ref(args->args[i].value.pointer.p);
} else {
grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_BUFFER_POOL
" must be a pointer to a buffer pool");
}
}
}
gpr_ref_init(&s->refs, 1);
@ -203,6 +219,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp);
}
grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
gpr_free(s);
}
@ -419,7 +437,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
grpc_tcp_create(fdobj, sp->server->buffer_pool,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
read_notifier_pollset, &acceptor);
gpr_free(name);

@ -124,11 +124,13 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_context_init(&context);
grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
grpc_httpcli_get(
&exec_ctx, &context, &detector.pollent, &request,
&exec_ctx, &context, &detector.pollent, buffer_pool, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
grpc_closure_create(on_compute_engine_detection_http_response, &detector),
&detector.response);
grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
grpc_exec_ctx_flush(&exec_ctx);

Loading…
Cancel
Save