Merge pull request #12219 from y-zeng/http2_connector

Add the endpoint used by the handshake process to connector's pollset_set
reviewable/pr12644/r10^2
Yuchen Zeng 7 years ago committed by GitHub
commit c01c7cbd85
  1. 4
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  2. 6
      src/core/lib/iomgr/endpoint.cc
  3. 11
      src/core/lib/iomgr/endpoint.h
  4. 21
      src/core/lib/iomgr/tcp_posix.cc
  5. 23
      src/core/lib/iomgr/tcp_uv.cc
  6. 18
      src/core/lib/iomgr/tcp_windows.cc
  7. 8
      src/core/lib/security/transport/secure_endpoint.cc
  8. 15
      test/core/util/mock_endpoint.c
  9. 15
      test/core/util/passthru_endpoint.c
  10. 21
      test/core/util/trickle_endpoint.c
  11. 18
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -115,6 +115,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
memset(c->result, 0, sizeof(*c->result));
} else {
grpc_endpoint_delete_from_pollset_set(exec_ctx, args->endpoint,
c->args.interested_parties);
c->result->transport =
grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1);
GPR_ASSERT(c->result->transport);
@ -136,6 +138,8 @@ static void start_handshake_locked(grpc_exec_ctx *exec_ctx,
c->handshake_mgr = grpc_handshake_manager_create();
grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, c->args.channel_args,
c->handshake_mgr);
grpc_endpoint_add_to_pollset_set(exec_ctx, c->endpoint,
c->args.interested_parties);
grpc_handshake_manager_do_handshake(
exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
c->args.deadline, NULL /* acceptor */, on_handshake_done, c);

@ -39,6 +39,12 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx,
ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set);
}
void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx* exec_ctx,
grpc_endpoint* ep,
grpc_pollset_set* pollset_set) {
ep->vtable->delete_from_pollset_set(exec_ctx, ep, pollset_set);
}
void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_error* why) {
ep->vtable->shutdown(exec_ctx, ep, why);

@ -45,6 +45,8 @@ struct grpc_endpoint_vtable {
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset);
void (*delete_from_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep);
@ -85,14 +87,19 @@ void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why);
void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
/* Add an endpoint to a pollset, so that when the pollset is polled, events from
this endpoint are considered */
/* Add an endpoint to a pollset or pollset_set, so that when the pollset is
polled, events from this endpoint are considered */
void grpc_endpoint_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset *pollset);
void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set);
/* Delete an endpoint from a pollset_set */
void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set);
grpc_resource_user *grpc_endpoint_get_resource_user(grpc_endpoint *endpoint);
struct grpc_endpoint {

@ -704,6 +704,13 @@ static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
}
static void tcp_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_set_del_fd(exec_ctx, pollset_set, tcp->em_fd);
}
static char *tcp_get_peer(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return gpr_strdup(tcp->peer_string);
@ -719,10 +726,16 @@ static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) {
return tcp->resource_user;
}
static const grpc_endpoint_vtable vtable = {
tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer,
tcp_get_fd};
static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_write,
tcp_add_to_pollset,
tcp_add_to_pollset_set,
tcp_delete_from_pollset_set,
tcp_shutdown,
tcp_destroy,
tcp_get_resource_user,
tcp_get_peer,
tcp_get_fd};
#define MAX_CHUNK_SIZE 32 * 1024 * 1024

@ -304,6 +304,15 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
(void)pollset;
}
static void uv_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset) {
// No-op. We're ignoring pollsets currently
(void)exec_ctx;
(void)ep;
(void)pollset;
}
static void shutdown_callback(uv_shutdown_t *req, int status) {}
static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@ -340,10 +349,16 @@ static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
static int uv_get_fd(grpc_endpoint *ep) { return -1; }
static grpc_endpoint_vtable vtable = {
uv_endpoint_read, uv_endpoint_write, uv_add_to_pollset,
uv_add_to_pollset_set, uv_endpoint_shutdown, uv_destroy,
uv_get_resource_user, uv_get_peer, uv_get_fd};
static grpc_endpoint_vtable vtable = {uv_endpoint_read,
uv_endpoint_write,
uv_add_to_pollset,
uv_add_to_pollset_set,
uv_delete_from_pollset_set,
uv_endpoint_shutdown,
uv_destroy,
uv_get_resource_user,
uv_get_peer,
uv_get_fd};
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,

@ -371,6 +371,10 @@ static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_iocp_add_socket(tcp->socket);
}
static void win_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pss) {}
/* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks
for the potential read and write operations. It is up to the caller to
guarantee this isn't called in parallel to a read or write request, so
@ -412,10 +416,16 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
static int win_get_fd(grpc_endpoint *ep) { return -1; }
static grpc_endpoint_vtable vtable = {
win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
win_shutdown, win_destroy, win_get_resource_user, win_get_peer,
win_get_fd};
static grpc_endpoint_vtable vtable = {win_read,
win_write,
win_add_to_pollset,
win_add_to_pollset_set,
win_delete_from_pollset_set,
win_shutdown,
win_destroy,
win_get_resource_user,
win_get_peer,
win_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_channel_args *channel_args,

@ -379,6 +379,13 @@ static void endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint_add_to_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set);
}
static void endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *secure_ep,
grpc_pollset_set *pollset_set) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
grpc_endpoint_delete_from_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set);
}
static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
return grpc_endpoint_get_peer(ep->wrapped_ep);
@ -399,6 +406,7 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_write,
endpoint_add_to_pollset,
endpoint_add_to_pollset_set,
endpoint_delete_from_pollset_set,
endpoint_shutdown,
endpoint_destroy,
endpoint_get_resource_user,

@ -69,6 +69,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
@ -103,8 +107,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) {
static int me_get_fd(grpc_endpoint *ep) { return -1; }
static const grpc_endpoint_vtable vtable = {
me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
me_shutdown, me_destroy, me_get_resource_user, me_get_peer,
me_read,
me_write,
me_add_to_pollset,
me_add_to_pollset_set,
me_delete_from_pollset_set,
me_shutdown,
me_destroy,
me_get_resource_user,
me_get_peer,
me_get_fd,
};

@ -107,6 +107,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
half *m = (half *)ep;
@ -160,8 +164,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) {
}
static const grpc_endpoint_vtable vtable = {
me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
me_shutdown, me_destroy, me_get_resource_user, me_get_peer,
me_read,
me_write,
me_add_to_pollset,
me_add_to_pollset_set,
me_delete_from_pollset_set,
me_shutdown,
me_destroy,
me_get_resource_user,
me_get_peer,
me_get_fd,
};

@ -89,6 +89,13 @@ static void te_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_endpoint_add_to_pollset_set(exec_ctx, te->wrapped, pollset_set);
}
static void te_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set) {
trickle_endpoint *te = (trickle_endpoint *)ep;
grpc_endpoint_delete_from_pollset_set(exec_ctx, te->wrapped, pollset_set);
}
static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
trickle_endpoint *te = (trickle_endpoint *)ep;
@ -135,10 +142,16 @@ static void te_finish_write(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&te->mu);
}
static const grpc_endpoint_vtable vtable = {
te_read, te_write, te_add_to_pollset, te_add_to_pollset_set,
te_shutdown, te_destroy, te_get_resource_user, te_get_peer,
te_get_fd};
static const grpc_endpoint_vtable vtable = {te_read,
te_write,
te_add_to_pollset,
te_add_to_pollset_set,
te_delete_from_pollset_set,
te_shutdown,
te_destroy,
te_get_resource_user,
te_get_peer,
te_get_fd};
grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
double bytes_per_second) {

@ -44,10 +44,16 @@ auto &force_library_initialization = Library::get();
class DummyEndpoint : public grpc_endpoint {
public:
DummyEndpoint() {
static const grpc_endpoint_vtable my_vtable = {
read, write, add_to_pollset, add_to_pollset_set,
shutdown, destroy, get_resource_user, get_peer,
get_fd};
static const grpc_endpoint_vtable my_vtable = {read,
write,
add_to_pollset,
add_to_pollset_set,
delete_from_pollset_set,
shutdown,
destroy,
get_resource_user,
get_peer,
get_fd};
grpc_endpoint::vtable = &my_vtable;
ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint");
}
@ -102,6 +108,10 @@ class DummyEndpoint : public grpc_endpoint {
static void add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_resource_user_shutdown(exec_ctx,

Loading…
Cancel
Save