Add grpc_endpoint_delete_from_pollset_set

reviewable/pr12219/r2
Yuchen Zeng 8 years ago
parent 0d6304a38c
commit ab74a8c86e
  1. 6
      src/core/lib/iomgr/endpoint.cc
  2. 11
      src/core/lib/iomgr/endpoint.h
  3. 21
      src/core/lib/iomgr/tcp_posix.cc
  4. 8
      src/core/lib/security/transport/secure_endpoint.cc
  5. 15
      test/core/util/mock_endpoint.c
  6. 15
      test/core/util/passthru_endpoint.c
  7. 21
      test/core/util/trickle_endpoint.c

@ -39,6 +39,12 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx,
ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set);
}
void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx* exec_ctx,
grpc_endpoint* ep,
grpc_pollset_set* pollset_set) {
ep->vtable->delete_from_pollset_set(exec_ctx, ep, pollset_set);
}
void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_error* why) {
ep->vtable->shutdown(exec_ctx, ep, why);

@ -45,6 +45,8 @@ struct grpc_endpoint_vtable {
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset);
void (*delete_from_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep);
@ -85,14 +87,19 @@ void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why);
void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
/* Add an endpoint to a pollset, so that when the pollset is polled, events from
this endpoint are considered */
/* Add an endpoint to a pollset or pollset_set, so that when the pollset is
polled, events from this endpoint are considered */
void grpc_endpoint_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset *pollset);
void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set);
/* Delete an endpoint from a pollset_set */
void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set);
grpc_resource_user *grpc_endpoint_get_resource_user(grpc_endpoint *endpoint);
struct grpc_endpoint {

@ -706,6 +706,13 @@ static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
}
static void tcp_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_set_del_fd(exec_ctx, pollset_set, tcp->em_fd);
}
static char *tcp_get_peer(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return gpr_strdup(tcp->peer_string);
@ -721,10 +728,16 @@ static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) {
return tcp->resource_user;
}
static const grpc_endpoint_vtable vtable = {
tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer,
tcp_get_fd};
static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_write,
tcp_add_to_pollset,
tcp_add_to_pollset_set,
tcp_delete_from_pollset_set,
tcp_shutdown,
tcp_destroy,
tcp_get_resource_user,
tcp_get_peer,
tcp_get_fd};
#define MAX_CHUNK_SIZE 32 * 1024 * 1024

@ -379,6 +379,13 @@ static void endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint_add_to_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set);
}
static void endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *secure_ep,
grpc_pollset_set *pollset_set) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
grpc_endpoint_delete_from_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set);
}
static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
return grpc_endpoint_get_peer(ep->wrapped_ep);
@ -399,6 +406,7 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_write,
endpoint_add_to_pollset,
endpoint_add_to_pollset_set,
endpoint_delete_from_pollset_set,
endpoint_shutdown,
endpoint_destroy,
endpoint_get_resource_user,

@ -69,6 +69,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
@ -103,8 +107,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) {
static int me_get_fd(grpc_endpoint *ep) { return -1; }
static const grpc_endpoint_vtable vtable = {
me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
me_shutdown, me_destroy, me_get_resource_user, me_get_peer,
me_read,
me_write,
me_add_to_pollset,
me_add_to_pollset_set,
me_delete_from_pollset_set,
me_shutdown,
me_destroy,
me_get_resource_user,
me_get_peer,
me_get_fd,
};

@ -107,6 +107,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
half *m = (half *)ep;
@ -160,8 +164,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) {
}
static const grpc_endpoint_vtable vtable = {
me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
me_shutdown, me_destroy, me_get_resource_user, me_get_peer,
me_read,
me_write,
me_add_to_pollset,
me_add_to_pollset_set,
me_delete_from_pollset_set,
me_shutdown,
me_destroy,
me_get_resource_user,
me_get_peer,
me_get_fd,
};

@ -89,6 +89,13 @@ static void te_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_endpoint_add_to_pollset_set(exec_ctx, te->wrapped, pollset_set);
}
static void te_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set) {
trickle_endpoint *te = (trickle_endpoint *)ep;
grpc_endpoint_delete_from_pollset_set(exec_ctx, te->wrapped, pollset_set);
}
static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
trickle_endpoint *te = (trickle_endpoint *)ep;
@ -135,10 +142,16 @@ static void te_finish_write(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&te->mu);
}
static const grpc_endpoint_vtable vtable = {
te_read, te_write, te_add_to_pollset, te_add_to_pollset_set,
te_shutdown, te_destroy, te_get_resource_user, te_get_peer,
te_get_fd};
static const grpc_endpoint_vtable vtable = {te_read,
te_write,
te_add_to_pollset,
te_add_to_pollset_set,
te_delete_from_pollset_set,
te_shutdown,
te_destroy,
te_get_resource_user,
te_get_peer,
te_get_fd};
grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
double bytes_per_second) {

Loading…
Cancel
Save