Allow returning a workqueue somehow associated with an endpoint

pull/7184/head
Craig Tiller 9 years ago
parent a5596db1a5
commit 70bd4839bc
  1. 4
      src/core/lib/iomgr/endpoint.c
  2. 4
      src/core/lib/iomgr/endpoint.h
  3. 3
      src/core/lib/iomgr/ev_epoll_linux.c
  4. 3
      src/core/lib/iomgr/ev_poll_and_epoll_posix.c
  5. 3
      src/core/lib/iomgr/ev_poll_posix.c
  6. 4
      src/core/lib/iomgr/ev_posix.c
  7. 4
      src/core/lib/iomgr/ev_posix.h
  8. 16
      src/core/lib/iomgr/tcp_posix.c
  9. 4
      src/core/lib/iomgr/workqueue.h
  10. 18
      src/core/lib/security/transport/secure_endpoint.c
  11. 13
      test/core/internal_api_canaries/iomgr.c
  12. 12
      test/core/util/mock_endpoint.c
  13. 12
      test/core/util/passthru_endpoint.c

@ -65,3 +65,7 @@ void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
return ep->vtable->get_peer(ep);
}
grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
return ep->vtable->get_workqueue(ep);
}

@ -51,6 +51,7 @@ struct grpc_endpoint_vtable {
gpr_slice_buffer *slices, grpc_closure *cb);
void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *slices, grpc_closure *cb);
grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep);
void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@ -69,6 +70,9 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
/* Retrieve a reference to the workqueue associated with this endpoint */
grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep);
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it

@ -1037,6 +1037,8 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
/*******************************************************************************
* Pollset Definitions
*/
@ -1794,6 +1796,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,

@ -725,6 +725,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
/*******************************************************************************
* pollset_posix.c
*/
@ -2006,6 +2008,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,

@ -617,6 +617,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
/*******************************************************************************
* pollset_posix.c
*/
@ -1234,6 +1236,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
.fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,

@ -148,6 +148,10 @@ grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
}
grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) {
return g_event_engine->fd_get_workqueue(fd);
}
int grpc_fd_wrapped_fd(grpc_fd *fd) {
return g_event_engine->fd_wrapped_fd(fd);
}

@ -56,6 +56,7 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
bool (*fd_is_shutdown)(grpc_fd *fd);
grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd);
grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
grpc_fd *fd);
@ -107,6 +108,9 @@ const char *grpc_get_poll_strategy_name();
This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd, const char *name);
/* Get a workqueue that's associated with this fd */
grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd);
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd *fd);

@ -450,9 +450,19 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
static const grpc_endpoint_vtable vtable = {
tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
tcp_shutdown, tcp_destroy, tcp_get_peer};
static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return grpc_fd_get_workqueue(tcp->em_fd);
}
static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_write,
tcp_get_workqueue,
tcp_add_to_pollset,
tcp_add_to_pollset_set,
tcp_shutdown,
tcp_destroy,
tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) {

@ -58,7 +58,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
(grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p))
#define GRPC_WORKQUEUE_UNREF(cl, p, r) \
grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r))
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
@ -66,7 +66,7 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason);
#else
#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p))
#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p))
void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);

@ -360,11 +360,19 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
return grpc_endpoint_get_peer(ep->wrapped_ep);
}
static const grpc_endpoint_vtable vtable = {
endpoint_read, endpoint_write,
endpoint_add_to_pollset, endpoint_add_to_pollset_set,
endpoint_shutdown, endpoint_destroy,
endpoint_get_peer};
static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
return grpc_endpoint_get_workqueue(ep->wrapped_ep);
}
static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_write,
endpoint_get_workqueue,
endpoint_add_to_pollset,
endpoint_add_to_pollset_set,
endpoint_shutdown,
endpoint_destroy,
endpoint_get_peer};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,

@ -77,11 +77,14 @@ static void test_code(void) {
/* endpoint.h */
grpc_endpoint endpoint;
grpc_endpoint_vtable vtable = {
grpc_endpoint_read, grpc_endpoint_write,
grpc_endpoint_add_to_pollset, grpc_endpoint_add_to_pollset_set,
grpc_endpoint_shutdown, grpc_endpoint_destroy,
grpc_endpoint_get_peer};
grpc_endpoint_vtable vtable = {grpc_endpoint_read,
grpc_endpoint_write,
grpc_endpoint_get_workqueue,
grpc_endpoint_add_to_pollset,
grpc_endpoint_add_to_pollset_set,
grpc_endpoint_shutdown,
grpc_endpoint_destroy,
grpc_endpoint_get_peer};
endpoint.vtable = &vtable;
grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL);

@ -95,9 +95,17 @@ static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint");
}
static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
static const grpc_endpoint_vtable vtable = {
me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
me_shutdown, me_destroy, me_get_peer,
me_read,
me_write,
me_get_workqueue,
me_add_to_pollset,
me_add_to_pollset_set,
me_shutdown,
me_destroy,
me_get_peer,
};
grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice)) {

@ -140,9 +140,17 @@ static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint");
}
static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
static const grpc_endpoint_vtable vtable = {
me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
me_shutdown, me_destroy, me_get_peer,
me_read,
me_write,
me_get_workqueue,
me_add_to_pollset,
me_add_to_pollset_set,
me_shutdown,
me_destroy,
me_get_peer,
};
static void half_init(half *m, passthru_endpoint *parent) {

Loading…
Cancel
Save