Merge branch 'primary-goat-whisperer' into the-test-be-sleepy

pull/2765/head
Craig Tiller 9 years ago
commit 4719ad01d5
  1. 6
      BUILD
  2. 36
      Makefile
  3. 19
      build.json
  4. 3
      gRPC.podspec
  5. 18
      src/core/iomgr/fd_posix.c
  6. 9
      src/core/iomgr/fd_posix.h
  7. 23
      src/core/iomgr/pollset.h
  8. 168
      src/core/iomgr/pollset_kick_posix.c
  9. 93
      src/core/iomgr/pollset_kick_posix.h
  10. 162
      src/core/iomgr/pollset_multipoller_with_epoll.c
  11. 123
      src/core/iomgr/pollset_multipoller_with_poll_posix.c
  12. 145
      src/core/iomgr/pollset_posix.c
  13. 33
      src/core/iomgr/pollset_posix.h
  14. 83
      src/core/iomgr/pollset_windows.c
  15. 9
      src/core/iomgr/pollset_windows.h
  16. 8
      src/core/iomgr/wakeup_fd_eventfd.c
  17. 12
      src/core/iomgr/wakeup_fd_pipe.c
  18. 10
      src/core/iomgr/wakeup_fd_posix.c
  19. 20
      src/core/iomgr/wakeup_fd_posix.h
  20. 6
      src/core/security/google_default_credentials.c
  21. 60
      src/core/surface/completion_queue.c
  22. 2
      src/core/surface/completion_queue.h
  23. 79
      src/core/surface/server.c
  24. 6
      src/core/transport/chttp2_transport.c
  25. 8
      test/core/httpcli/httpcli_test.c
  26. 21
      test/core/iomgr/endpoint_tests.c
  27. 20
      test/core/iomgr/fd_posix_test.c
  28. 130
      test/core/iomgr/poll_kick_posix_test.c
  29. 11
      test/core/iomgr/tcp_client_posix_test.c
  30. 17
      test/core/iomgr/tcp_posix_test.c
  31. 5
      test/core/iomgr/tcp_server_posix_test.c
  32. 9
      test/core/security/oauth2_utils.c
  33. 9
      test/core/security/print_google_default_creds_token.c
  34. 9
      test/core/security/verify_jwt.c
  35. 2
      tools/doxygen/Doxyfile.core.internal
  36. 20
      tools/run_tests/sources_and_headers.json
  37. 8
      tools/run_tests/tests.json
  38. 3
      vsprojects/grpc/grpc.vcxproj
  39. 6
      vsprojects/grpc/grpc.vcxproj.filters
  40. 3
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
  41. 6
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -184,7 +184,6 @@ cc_library(
"src/core/iomgr/iomgr_internal.h",
"src/core/iomgr/iomgr_posix.h",
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset_posix.h",
"src/core/iomgr/pollset_set.h",
"src/core/iomgr/pollset_set_posix.h",
@ -304,7 +303,6 @@ cc_library(
"src/core/iomgr/iomgr.c",
"src/core/iomgr/iomgr_posix.c",
"src/core/iomgr/iomgr_windows.c",
"src/core/iomgr/pollset_kick_posix.c",
"src/core/iomgr/pollset_multipoller_with_epoll.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_posix.c",
@ -442,7 +440,6 @@ cc_library(
"src/core/iomgr/iomgr_internal.h",
"src/core/iomgr/iomgr_posix.h",
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset_posix.h",
"src/core/iomgr/pollset_set.h",
"src/core/iomgr/pollset_set_posix.h",
@ -539,7 +536,6 @@ cc_library(
"src/core/iomgr/iomgr.c",
"src/core/iomgr/iomgr_posix.c",
"src/core/iomgr/iomgr_windows.c",
"src/core/iomgr/pollset_kick_posix.c",
"src/core/iomgr/pollset_multipoller_with_epoll.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_posix.c",
@ -1023,7 +1019,6 @@ objc_library(
"src/core/iomgr/iomgr.c",
"src/core/iomgr/iomgr_posix.c",
"src/core/iomgr/iomgr_windows.c",
"src/core/iomgr/pollset_kick_posix.c",
"src/core/iomgr/pollset_multipoller_with_epoll.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_posix.c",
@ -1163,7 +1158,6 @@ objc_library(
"src/core/iomgr/iomgr_internal.h",
"src/core/iomgr/iomgr_posix.h",
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset_posix.h",
"src/core/iomgr/pollset_set.h",
"src/core/iomgr/pollset_set_posix.h",

File diff suppressed because one or more lines are too long

@ -149,7 +149,6 @@
"src/core/iomgr/iomgr_internal.h",
"src/core/iomgr/iomgr_posix.h",
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset_posix.h",
"src/core/iomgr/pollset_set.h",
"src/core/iomgr/pollset_set_posix.h",
@ -245,7 +244,6 @@
"src/core/iomgr/iomgr.c",
"src/core/iomgr/iomgr_posix.c",
"src/core/iomgr/iomgr_windows.c",
"src/core/iomgr/pollset_kick_posix.c",
"src/core/iomgr/pollset_multipoller_with_epoll.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_posix.c",
@ -1674,23 +1672,6 @@
"gpr"
]
},
{
"name": "poll_kick_posix_test",
"build": "test",
"language": "c",
"src": [
"test/core/iomgr/poll_kick_posix_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr_test_util",
"gpr"
],
"platforms": [
"posix"
]
},
{
"name": "resolve_address_test",
"build": "test",

@ -186,7 +186,6 @@ Pod::Spec.new do |s|
'src/core/iomgr/iomgr_internal.h',
'src/core/iomgr/iomgr_posix.h',
'src/core/iomgr/pollset.h',
'src/core/iomgr/pollset_kick_posix.h',
'src/core/iomgr/pollset_posix.h',
'src/core/iomgr/pollset_set.h',
'src/core/iomgr/pollset_set_posix.h',
@ -313,7 +312,6 @@ Pod::Spec.new do |s|
'src/core/iomgr/iomgr.c',
'src/core/iomgr/iomgr_posix.c',
'src/core/iomgr/iomgr_windows.c',
'src/core/iomgr/pollset_kick_posix.c',
'src/core/iomgr/pollset_multipoller_with_epoll.c',
'src/core/iomgr/pollset_multipoller_with_poll_posix.c',
'src/core/iomgr/pollset_posix.c',
@ -452,7 +450,6 @@ Pod::Spec.new do |s|
'src/core/iomgr/iomgr_internal.h',
'src/core/iomgr/iomgr_posix.h',
'src/core/iomgr/pollset.h',
'src/core/iomgr/pollset_kick_posix.h',
'src/core/iomgr/pollset_posix.h',
'src/core/iomgr/pollset_set.h',
'src/core/iomgr/pollset_set_posix.h',

@ -168,13 +168,19 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
static void pollset_kick_locked(grpc_pollset *pollset) {
gpr_mu_lock(GRPC_POLLSET_MU(pollset));
grpc_pollset_kick(pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(pollset));
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset);
pollset_kick_locked(fd->inactive_watcher_root.next->pollset);
} else if (fd->read_watcher) {
grpc_pollset_force_kick(fd->read_watcher->pollset);
pollset_kick_locked(fd->read_watcher->pollset);
} else if (fd->write_watcher) {
grpc_pollset_force_kick(fd->write_watcher->pollset);
pollset_kick_locked(fd->write_watcher->pollset);
}
}
@ -188,13 +194,13 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
grpc_pollset_force_kick(watcher->pollset);
pollset_kick_locked(watcher->pollset);
}
if (fd->read_watcher) {
grpc_pollset_force_kick(fd->read_watcher->pollset);
pollset_kick_locked(fd->read_watcher->pollset);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
grpc_pollset_force_kick(fd->write_watcher->pollset);
pollset_kick_locked(fd->write_watcher->pollset);
}
}

@ -109,7 +109,8 @@ grpc_fd *grpc_fd_create(int fd, const char *name);
on_done is called when the underlying file descriptor is definitely close()d.
If on_done is NULL, no callback will be made.
Requires: *fd initialized; no outstanding notify_on_read or
notify_on_write. */
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason);
@ -122,11 +123,13 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
i.e. a combination of read_mask and write_mask determined by the fd's current
interest in said events.
Polling strategies that do not need to alter their behavior depending on the
fd's current interest (such as epoll) do not need to call this function. */
fd's current interest (such as epoll) do not need to call this function.
MUST NOT be called with a pollset lock taken */
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *rec);
/* Complete polling previously started with grpc_fd_begin_poll */
/* Complete polling previously started with grpc_fd_begin_poll
MUST NOT be called with a pollset lock taken */
void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */

@ -37,6 +37,8 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
/* A grpc_pollset is a set of file descriptors that a higher level item is
interested in. For example:
- a server will typically keep a pollset containing all connected channels,
@ -63,13 +65,24 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
descriptors.
Requires GRPC_POLLSET_MU(pollset) locked.
May unlock GRPC_POLLSET_MU(pollset) during its execution.
worker is a (platform-specific) handle that can be used to wake up
from grpc_pollset_work before any events are received and before the timeout
has expired. It is both initialized and destroyed by grpc_pollset_work.
Initialization of worker is guaranteed to occur BEFORE the
GRPC_POLLSET_MU(pollset) is released for the first time by
grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
not be released by grpc_pollset_work AFTER worker has been destroyed.
Returns true if some work has been done, and false if the deadline
got attained. */
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline);
expired. */
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset.
Requires GRPC_POLLSET_MU(pollset) locked. */
void grpc_pollset_kick(grpc_pollset *pollset);
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
Otherwise, if specific_worker is non-NULL, then kick that worker. */
void grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */

@ -1,168 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc/support/port_platform.h>
#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/pollset_kick_posix.h"
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
/* This implementation is based on a freelist of wakeup fds, with extra logic to
* handle kicks while there is no attached fd. */
/* TODO(klempner): Autosize this, and consider providing a way to disable the
* cap entirely on systems with large fd limits */
#define GRPC_MAX_CACHED_WFDS 50
static grpc_kick_fd_info *fd_freelist = NULL;
static int fd_freelist_count = 0;
static gpr_mu fd_freelist_mu;
static grpc_kick_fd_info *allocate_wfd(void) {
grpc_kick_fd_info *info = NULL;
gpr_mu_lock(&fd_freelist_mu);
if (fd_freelist != NULL) {
info = fd_freelist;
fd_freelist = fd_freelist->next;
--fd_freelist_count;
}
gpr_mu_unlock(&fd_freelist_mu);
if (info == NULL) {
info = gpr_malloc(sizeof(*info));
grpc_wakeup_fd_create(&info->wakeup_fd);
info->next = NULL;
}
return info;
}
static void destroy_wfd(grpc_kick_fd_info *wfd) {
grpc_wakeup_fd_destroy(&wfd->wakeup_fd);
gpr_free(wfd);
}
static void free_wfd(grpc_kick_fd_info *fd_info) {
gpr_mu_lock(&fd_freelist_mu);
if (fd_freelist_count < GRPC_MAX_CACHED_WFDS) {
fd_info->next = fd_freelist;
fd_freelist = fd_info;
fd_freelist_count++;
fd_info = NULL;
}
gpr_mu_unlock(&fd_freelist_mu);
if (fd_info) {
destroy_wfd(fd_info);
}
}
void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) {
gpr_mu_init(&kick_state->mu);
kick_state->kicked = 0;
kick_state->fd_list.next = kick_state->fd_list.prev = &kick_state->fd_list;
}
void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) {
gpr_mu_destroy(&kick_state->mu);
GPR_ASSERT(kick_state->fd_list.next == &kick_state->fd_list);
}
grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
grpc_pollset_kick_state *kick_state) {
grpc_kick_fd_info *fd_info;
gpr_mu_lock(&kick_state->mu);
if (kick_state->kicked) {
kick_state->kicked = 0;
gpr_mu_unlock(&kick_state->mu);
return NULL;
}
fd_info = allocate_wfd();
fd_info->next = &kick_state->fd_list;
fd_info->prev = fd_info->next->prev;
fd_info->next->prev = fd_info->prev->next = fd_info;
gpr_mu_unlock(&kick_state->mu);
return fd_info;
}
void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
grpc_kick_fd_info *fd_info) {
grpc_wakeup_fd_consume_wakeup(&fd_info->wakeup_fd);
}
void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
grpc_kick_fd_info *fd_info) {
gpr_mu_lock(&kick_state->mu);
fd_info->next->prev = fd_info->prev;
fd_info->prev->next = fd_info->next;
free_wfd(fd_info);
gpr_mu_unlock(&kick_state->mu);
}
void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
gpr_mu_lock(&kick_state->mu);
if (kick_state->fd_list.next != &kick_state->fd_list) {
grpc_wakeup_fd_wakeup(&kick_state->fd_list.next->wakeup_fd);
} else {
kick_state->kicked = 1;
}
gpr_mu_unlock(&kick_state->mu);
}
void grpc_pollset_kick_global_init_fallback_fd(void) {
gpr_mu_init(&fd_freelist_mu);
grpc_wakeup_fd_global_init_force_fallback();
}
void grpc_pollset_kick_global_init(void) {
gpr_mu_init(&fd_freelist_mu);
grpc_wakeup_fd_global_init();
}
void grpc_pollset_kick_global_destroy(void) {
while (fd_freelist != NULL) {
grpc_kick_fd_info *current = fd_freelist;
fd_freelist = fd_freelist->next;
destroy_wfd(current);
}
grpc_wakeup_fd_global_destroy();
gpr_mu_destroy(&fd_freelist_mu);
}
#endif /* GPR_POSIX_SOCKET */

@ -1,93 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/sync.h>
/* pollset kicking allows breaking a thread out of polling work for
a given pollset.
writing a byte to a pipe is used as a posix-ly portable base
mechanism, and eventfds are utilized on Linux for better performance. */
typedef struct grpc_kick_fd_info {
grpc_wakeup_fd_info wakeup_fd;
/* used for polling list and free list */
struct grpc_kick_fd_info *next;
/* only used when polling */
struct grpc_kick_fd_info *prev;
} grpc_kick_fd_info;
typedef struct grpc_pollset_kick_state {
gpr_mu mu;
int kicked;
struct grpc_kick_fd_info fd_list;
} grpc_pollset_kick_state;
#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) \
GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd)
/* This is an abstraction around the typical pipe mechanism for waking up a
thread sitting in a poll() style call. */
void grpc_pollset_kick_global_init(void);
void grpc_pollset_kick_global_destroy(void);
void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state);
void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state);
/* Guarantees a pure posix implementation rather than a specialized one, if
* applicable. Intended for testing. */
void grpc_pollset_kick_global_init_fallback_fd(void);
/* Must be called before entering poll(). If return value is NULL, this consumed
an existing kick. Otherwise the return value is an FD to add to the poll set.
*/
grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
grpc_pollset_kick_state *kick_state);
/* Consume an existing kick. Must be called after poll returns that the fd was
readable, and before calling kick_post_poll. */
void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
grpc_kick_fd_info *fd_info);
/* Must be called after pre_poll, and after consume if applicable */
void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
grpc_kick_fd_info *fd_info);
/* Actually kick */
void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */

@ -36,6 +36,7 @@
#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
#include <errno.h>
#include <poll.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
@ -44,23 +45,28 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
typedef struct wakeup_fd_hdl {
grpc_wakeup_fd wakeup_fd;
struct wakeup_fd_hdl *next;
} wakeup_fd_hdl;
typedef struct {
grpc_pollset *pollset;
grpc_fd *fd;
grpc_iomgr_closure closure;
} delayed_add;
typedef struct {
int epoll_fd;
grpc_wakeup_fd_info wakeup_fd;
wakeup_fd_hdl *free_wakeup_fds;
} pollset_hdr;
static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
grpc_fd_watcher watcher;
if (and_unlock_pollset) {
gpr_mu_unlock(&pollset->mu);
}
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
@ -80,6 +86,52 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd_end_poll(&watcher, 0, 0);
}
static void perform_delayed_add(void *arg, int iomgr_status) {
delayed_add *da = arg;
int do_shutdown_cb = 0;
if (!grpc_fd_is_orphaned(da->fd)) {
finally_add_fd(da->pollset, da->fd);
}
gpr_mu_lock(&da->pollset->mu);
da->pollset->in_flight_cbs--;
if (da->pollset->shutting_down) {
/* We don't care about this pollset anymore. */
if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
GPR_ASSERT(!grpc_pollset_has_workers(da->pollset));
da->pollset->called_shutdown = 1;
do_shutdown_cb = 1;
}
}
gpr_mu_unlock(&da->pollset->mu);
GRPC_FD_UNREF(da->fd, "delayed_add");
if (do_shutdown_cb) {
da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg);
}
gpr_free(da);
}
static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
if (and_unlock_pollset) {
gpr_mu_unlock(&pollset->mu);
finally_add_fd(pollset, fd);
} else {
delayed_add *da = gpr_malloc(sizeof(*da));
da->pollset = pollset;
da->fd = fd;
GRPC_FD_REF(fd, "delayed_add");
grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
grpc_iomgr_add_callback(&da->closure);
}
}
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
grpc_fd *fd,
int and_unlock_pollset) {
@ -103,12 +155,14 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
#define GRPC_EPOLL_MAX_EVENTS 1000
static void multipoll_with_epoll_pollset_maybe_work(
grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
gpr_timespec now, int allow_synchronous_callback) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int ep_rv;
int poll_rv;
pollset_hdr *h = pollset->data.ptr;
int timeout_ms;
struct pollfd pfds[2];
/* If you want to ignore epoll's ability to sanely handle parallel pollers,
* for a more apples-to-apples performance comparison with poll, add a
@ -116,43 +170,58 @@ static void multipoll_with_epoll_pollset_maybe_work(
* here.
*/
pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
do {
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
if (ep_rv < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
}
} else {
int i;
for (i = 0; i < ep_rv; ++i) {
if (ep_ev[i].data.ptr == 0) {
grpc_wakeup_fd_consume_wakeup(&h->wakeup_fd);
} else {
grpc_fd *fd = ep_ev[i].data.ptr;
/* TODO(klempner): We might want to consider making err and pri
* separate events */
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write = ep_ev[i].events & EPOLLOUT;
if (read || cancel) {
grpc_fd_become_readable(fd, allow_synchronous_callback);
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfds[0].events = POLLIN;
pfds[0].revents = 0;
pfds[1].fd = h->epoll_fd;
pfds[1].events = POLLIN;
pfds[1].revents = 0;
poll_rv = poll(pfds, 2, timeout_ms);
if (poll_rv < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
}
} else if (poll_rv == 0) {
/* do nothing */
} else {
if (pfds[0].revents) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
if (pfds[1].revents) {
do {
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
if (ep_rv < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
}
if (write || cancel) {
grpc_fd_become_writable(fd, allow_synchronous_callback);
} else {
int i;
for (i = 0; i < ep_rv; ++i) {
grpc_fd *fd = ep_ev[i].data.ptr;
/* TODO(klempner): We might want to consider making err and pri
* separate events */
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write = ep_ev[i].events & EPOLLOUT;
if (read || cancel) {
grpc_fd_become_readable(fd, allow_synchronous_callback);
}
if (write || cancel) {
grpc_fd_become_writable(fd, allow_synchronous_callback);
}
}
}
}
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
timeout_ms = 0;
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
gpr_mu_lock(&pollset->mu);
pollset->counter -= 1;
}
static void multipoll_with_epoll_pollset_finish_shutdown(
@ -160,21 +229,14 @@ static void multipoll_with_epoll_pollset_finish_shutdown(
static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
grpc_wakeup_fd_destroy(&h->wakeup_fd);
close(h->epoll_fd);
gpr_free(h);
}
static void epoll_kick(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
grpc_wakeup_fd_wakeup(&h->wakeup_fd);
}
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
multipoll_with_epoll_pollset_add_fd,
multipoll_with_epoll_pollset_del_fd,
multipoll_with_epoll_pollset_maybe_work,
epoll_kick,
multipoll_with_epoll_pollset_finish_shutdown,
multipoll_with_epoll_pollset_destroy};
@ -182,8 +244,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
struct epoll_event ev;
int err;
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
@ -196,16 +256,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
for (i = 0; i < nfds; i++) {
multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0);
}
grpc_wakeup_fd_create(&h->wakeup_fd);
ev.events = EPOLLIN;
ev.data.ptr = 0;
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
GRPC_WAKEUP_FD_GET_READ_FD(&h->wakeup_fd), &ev);
if (err < 0) {
gpr_log(GPR_ERROR, "Wakeup fd epoll_ctl failed: %s", strerror(errno));
abort();
}
}
grpc_platform_become_multipoller_type grpc_platform_become_multipoller =

@ -53,12 +53,6 @@ typedef struct {
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
/* fds being polled by the current poller: parallel arrays of pollfd, and
a grpc_fd_watcher */
size_t pfd_count;
size_t pfd_capacity;
grpc_fd_watcher *watchers;
struct pollfd *pfds;
/* fds that have been removed from the pollset explicitly */
size_t del_count;
size_t del_capacity;
@ -102,80 +96,60 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
}
}
static void end_polling(grpc_pollset *pollset) {
size_t i;
pollset_hdr *h;
h = pollset->data.ptr;
for (i = 1; i < h->pfd_count; i++) {
grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN,
h->pfds[i].revents & POLLOUT);
}
}
static void multipoll_with_poll_pollset_maybe_work(
grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
gpr_timespec now, int allow_synchronous_callback) {
int timeout;
int r;
size_t i, np, nf, nd;
size_t i, j, pfd_count, fd_count;
pollset_hdr *h;
grpc_kick_fd_info *kfd;
/* TODO(ctiller): inline some elements to avoid an allocation */
grpc_fd_watcher *watchers;
struct pollfd *pfds;
h = pollset->data.ptr;
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
if (h->pfd_capacity < h->fd_count + 1) {
h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
gpr_free(h->pfds);
gpr_free(h->watchers);
h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
h->watchers = gpr_malloc(sizeof(grpc_fd_watcher) * h->pfd_capacity);
}
nf = 0;
np = 1;
kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
if (kfd == NULL) {
/* Already kicked */
return;
}
h->pfds[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
h->pfds[0].events = POLLIN;
h->pfds[0].revents = POLLOUT;
/* TODO(ctiller): perform just one malloc here if we exceed the inline case */
pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1));
watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1));
fd_count = 0;
pfd_count = 1;
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfds[0].events = POLLIN;
pfds[0].revents = POLLOUT;
for (i = 0; i < h->fd_count; i++) {
int remove = grpc_fd_is_orphaned(h->fds[i]);
for (nd = 0; nd < h->del_count; nd++) {
if (h->fds[i] == h->dels[nd]) remove = 1;
for (j = 0; !remove && j < h->del_count; j++) {
if (h->fds[i] == h->dels[j]) remove = 1;
}
if (remove) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
} else {
h->fds[nf++] = h->fds[i];
h->watchers[np].fd = h->fds[i];
h->pfds[np].fd = h->fds[i]->fd;
h->pfds[np].revents = 0;
np++;
h->fds[fd_count++] = h->fds[i];
watchers[pfd_count].fd = h->fds[i];
pfds[pfd_count].fd = h->fds[i]->fd;
pfds[pfd_count].revents = 0;
pfd_count++;
}
}
h->pfd_count = np;
h->fd_count = nf;
for (nd = 0; nd < h->del_count; nd++) {
GRPC_FD_UNREF(h->dels[nd], "multipoller_del");
for (j = 0; j < h->del_count; j++) {
GRPC_FD_UNREF(h->dels[j], "multipoller_del");
}
h->del_count = 0;
if (h->pfd_count == 0) {
end_polling(pollset);
return;
}
pollset->counter++;
h->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
for (i = 1; i < np; i++) {
h->pfds[i].events = grpc_fd_begin_poll(h->watchers[i].fd, pollset, POLLIN,
POLLOUT, &h->watchers[i]);
for (i = 1; i < pfd_count; i++) {
pfds[i].events = grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
POLLOUT, &watchers[i]);
}
r = poll(h->pfds, h->pfd_count, timeout);
r = poll(pfds, pfd_count, timeout);
end_polling(pollset);
for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
pfds[i].revents & POLLOUT);
}
if (r < 0) {
if (errno != EINTR) {
@ -184,35 +158,31 @@ static void multipoll_with_poll_pollset_maybe_work(
} else if (r == 0) {
/* do nothing */
} else {
if (h->pfds[0].revents & POLLIN) {
grpc_pollset_kick_consume(&pollset->kick_state, kfd);
if (pfds[0].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
for (i = 1; i < np; i++) {
if (h->watchers[i].fd == NULL) {
for (i = 1; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
continue;
}
if (h->pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback);
if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
grpc_fd_become_readable(watchers[i].fd, allow_synchronous_callback);
}
if (h->pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
grpc_fd_become_writable(h->watchers[i].fd, allow_synchronous_callback);
if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
grpc_fd_become_writable(watchers[i].fd, allow_synchronous_callback);
}
}
}
grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
gpr_mu_lock(&pollset->mu);
pollset->counter--;
}
gpr_free(pfds);
gpr_free(watchers);
static void multipoll_with_poll_pollset_kick(grpc_pollset *p) {
grpc_pollset_force_kick(p);
gpr_mu_lock(&pollset->mu);
}
static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
size_t i;
pollset_hdr *h = pollset->data.ptr;
GPR_ASSERT(pollset->counter == 0);
for (i = 0; i < h->fd_count; i++) {
GRPC_FD_UNREF(h->fds[i], "multipoller");
}
@ -226,8 +196,6 @@ static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
multipoll_with_poll_pollset_finish_shutdown(pollset);
gpr_free(h->pfds);
gpr_free(h->watchers);
gpr_free(h->fds);
gpr_free(h->dels);
gpr_free(h);
@ -237,7 +205,6 @@ static const grpc_pollset_vtable multipoll_with_poll_pollset = {
multipoll_with_poll_pollset_add_fd,
multipoll_with_poll_pollset_del_fd,
multipoll_with_poll_pollset_maybe_work,
multipoll_with_poll_pollset_kick,
multipoll_with_poll_pollset_finish_shutdown,
multipoll_with_poll_pollset_destroy};
@ -250,10 +217,6 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
h->fd_count = nfds;
h->fd_capacity = nfds;
h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
h->pfd_count = 0;
h->pfd_capacity = 0;
h->pfds = NULL;
h->watchers = NULL;
h->del_count = 0;
h->del_capacity = 0;
h->dels = NULL;

@ -55,22 +55,60 @@
#include <grpc/support/useful.h>
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
void grpc_pollset_kick(grpc_pollset *p) {
if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p && p->counter) {
p->vtable->kick(p);
}
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
worker->next->prev = worker->prev;
}
int grpc_pollset_has_workers(grpc_pollset *p) {
return p->root_worker.next != &p->root_worker;
}
void grpc_pollset_force_kick(grpc_pollset *p) {
if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
grpc_pollset_kick_kick(&p->kick_state);
static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
if (grpc_pollset_has_workers(p)) {
grpc_pollset_worker *w = p->root_worker.next;
remove_worker(p, w);
return w;
} else {
return NULL;
}
}
static void kick_using_pollset_kick(grpc_pollset *p) {
if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
grpc_pollset_kick_kick(&p->kick_state);
static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->next = &p->root_worker;
worker->prev = worker->next->prev;
worker->prev->next = worker->next->prev = worker;
}
static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev = &p->root_worker;
worker->next = worker->prev->next;
worker->prev->next = worker->next->prev = worker;
}
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker;
specific_worker = specific_worker->next) {
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
}
p->kicked_without_pollers = 1;
} else if (gpr_tls_get(&g_current_thread_worker) !=
(gpr_intptr)specific_worker) {
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
}
} else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
specific_worker = pop_front_worker(p);
if (specific_worker != NULL) {
push_back_worker(p, specific_worker);
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
} else {
p->kicked_without_pollers = 1;
}
}
}
@ -78,16 +116,12 @@ static void kick_using_pollset_kick(grpc_pollset *p) {
void grpc_pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
/* Initialize kick fd state */
grpc_pollset_kick_global_init();
grpc_wakeup_fd_global_init();
}
void grpc_pollset_global_shutdown(void) {
/* destroy the kick pipes */
grpc_pollset_kick_global_destroy();
gpr_tls_destroy(&g_current_thread_poller);
grpc_wakeup_fd_global_destroy();
}
/* main interface */
@ -96,7 +130,7 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
grpc_pollset_kick_init(&pollset->kick_state);
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
@ -134,27 +168,44 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
int added_worker = 0;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
return 1;
goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
return 1;
goto done;
}
if (pollset->shutting_down) {
return 1;
goto done;
}
if (!pollset->kicked_without_pollers) {
push_front_worker(pollset, worker);
added_worker = 1;
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
pollset->vtable->maybe_work(pollset, worker, deadline, now, 1);
gpr_tls_set(&g_current_thread_poller, 0);
} else {
pollset->kicked_without_pollers = 0;
}
done:
grpc_wakeup_fd_destroy(&worker->wakeup_fd);
if (added_worker) {
remove_worker(pollset, worker);
}
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
pollset->vtable->maybe_work(pollset, deadline, now, 1);
gpr_tls_set(&g_current_thread_poller, 0);
if (pollset->shutting_down) {
if (pollset->counter > 0) {
grpc_pollset_kick(pollset);
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, NULL);
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
@ -177,15 +228,13 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
pollset->counter == 0) {
!grpc_pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
call_shutdown = 1;
}
pollset->shutdown_done_cb = shutdown_done;
pollset->shutdown_done_arg = shutdown_done_arg;
if (pollset->counter > 0) {
grpc_pollset_kick(pollset);
}
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
if (call_shutdown) {
@ -196,8 +245,8 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void grpc_pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
GPR_ASSERT(pollset->in_flight_cbs == 0);
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->vtable->destroy(pollset);
grpc_pollset_kick_destroy(&pollset->kick_state);
gpr_mu_destroy(&pollset->mu);
}
@ -248,8 +297,8 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_lock(&pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
if (pollset->counter != 0) {
grpc_pollset_kick(pollset);
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
grpc_iomgr_add_callback(&up_args->promotion_closure);
gpr_mu_unlock(&pollset->mu);
return;
@ -264,7 +313,8 @@ static void basic_do_promote(void *args, int success) {
pollset->in_flight_cbs--;
if (pollset->shutting_down) {
/* We don't care about this pollset anymore. */
if (pollset->in_flight_cbs == 0 && pollset->counter == 0 && !pollset->called_shutdown) {
if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->called_shutdown = 1;
do_shutdown_cb = 1;
}
@ -307,7 +357,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) goto exit;
if (!pollset->counter) {
if (!grpc_pollset_has_workers(pollset)) {
/* Fast path -- no in flight cbs */
/* TODO(klempner): Comment this out and fix any test failures or establish
* they are due to timing issues */
@ -343,7 +393,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args->promotion_closure.cb_arg = up_args;
grpc_iomgr_add_callback(&up_args->promotion_closure);
grpc_pollset_kick(pollset);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
if (and_unlock_pollset) {
@ -365,12 +415,12 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
}
static void basic_pollset_maybe_work(grpc_pollset *pollset,
grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback) {
struct pollfd pfd[2];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
grpc_kick_fd_info *kfd;
int timeout;
int r;
int nfds;
@ -387,16 +437,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
fd = pollset->data.ptr = NULL;
}
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
if (kfd == NULL) {
/* Already kicked */
return;
}
pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd);
pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfd[0].events = POLLIN;
pfd[0].revents = 0;
nfds = 1;
pollset->counter++;
if (fd) {
pfd[1].fd = fd->fd;
pfd[1].revents = 0;
@ -428,7 +472,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* do nothing */
} else {
if (pfd[0].revents & POLLIN) {
grpc_pollset_kick_consume(&pollset->kick_state, kfd);
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
if (nfds > 1) {
if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
@ -440,14 +484,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
}
}
grpc_pollset_kick_post_poll(&pollset->kick_state, kfd);
gpr_mu_lock(&pollset->mu);
pollset->counter--;
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(pollset->counter == 0);
if (pollset->data.ptr != NULL) {
GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
pollset->data.ptr = NULL;
@ -455,14 +495,13 @@ static void basic_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable basic_pollset = {
basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy};
basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
basic_pollset_destroy, basic_pollset_destroy};
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
pollset->vtable = &basic_pollset;
pollset->counter = 0;
pollset->data.ptr = fd_or_null;
if (fd_or_null) {
if (fd_or_null != NULL) {
GRPC_FD_REF(fd_or_null, "basicpoll");
}
}

@ -35,8 +35,7 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#include <grpc/support/sync.h>
#include "src/core/iomgr/pollset_kick_posix.h"
#include "src/core/iomgr/wakeup_fd_posix.h"
typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@ -45,6 +44,12 @@ typedef struct grpc_pollset_vtable grpc_pollset_vtable;
use the struct tag */
struct grpc_fd;
typedef struct grpc_pollset_worker {
grpc_wakeup_fd wakeup_fd;
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
} grpc_pollset_worker;
typedef struct grpc_pollset {
/* pollsets under posix can mutate representation as fds are added and
removed.
@ -52,11 +57,11 @@ typedef struct grpc_pollset {
few fds, and an epoll() based implementation for many fds */
const grpc_pollset_vtable *vtable;
gpr_mu mu;
grpc_pollset_kick_state kick_state;
int counter;
grpc_pollset_worker root_worker;
int in_flight_cbs;
int shutting_down;
int called_shutdown;
int kicked_without_pollers;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
union {
@ -70,9 +75,9 @@ struct grpc_pollset_vtable {
int and_unlock_pollset);
void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd,
int and_unlock_pollset);
void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
gpr_timespec now, int allow_synchronous_callback);
void (*kick)(grpc_pollset *pollset);
void (*maybe_work)(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now,
int allow_synchronous_callback);
void (*finish_shutdown)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
};
@ -85,22 +90,16 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd);
poll after an fd is orphaned) */
void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd);
/* Force any current pollers to break polling: it's the callers responsibility
to ensure that the pollset indeed needs to be kicked - no verification that
the pollset is actually performing polling work is done. At worst this will
result in spurious wakeups if performed at the wrong moment.
Does not touch pollset->mu. */
void grpc_pollset_force_kick(grpc_pollset *pollset);
/* Returns the fd to listen on for kicks */
int grpc_kick_read_fd(grpc_pollset *p);
/* Call after polling has been kicked to leave the kicked state */
void grpc_kick_drain(grpc_pollset *p);
/* Convert a timespec to milliseconds:
- very small or negative poll times are clamped to zero to do a
- very small or negative poll times are clamped to zero to do a
non-blocking poll (which becomes spin polling)
- other small values are rounded up to one millisecond
- longer than a millisecond polls are rounded up to the next nearest
- longer than a millisecond polls are rounded up to the next nearest
millisecond to avoid spinning
- infinite timeouts are converted to -1 */
int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now);
@ -114,4 +113,8 @@ extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller;
void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
size_t fd_count);
/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must
* be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */

@ -42,6 +42,38 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
worker->next->prev = worker->prev;
}
static int has_workers(grpc_pollset *p) {
return p->root_worker.next != &p->root_worker;
}
static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
if (has_workers(p)) {
grpc_pollset_worker *w = p->root_worker.next;
remove_worker(p, w);
return w;
}
else {
return NULL;
}
}
static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->next = &p->root_worker;
worker->prev = worker->next->prev;
worker->prev->next = worker->next->prev = worker;
}
static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev = &p->root_worker;
worker->next = worker->prev->next;
worker->prev->next = worker->next->prev = worker;
}
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
@ -50,7 +82,8 @@
void grpc_pollset_init(grpc_pollset *pollset) {
memset(pollset, 0, sizeof(*pollset));
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->kicked_without_pollers = 0;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
@ -58,34 +91,66 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void *shutdown_done_arg) {
gpr_mu_lock(&pollset->mu);
pollset->shutting_down = 1;
gpr_cv_broadcast(&pollset->cv);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
shutdown_done(shutdown_done_arg);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
gpr_cv_destroy(&pollset->cv);
}
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) {
gpr_timespec now;
int added_worker = 0;
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
worker->next = worker->prev = NULL;
gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
return 1 /* GPR_TRUE */;
goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
return 1 /* GPR_TRUE */;
goto done;
}
if (!pollset->shutting_down) {
gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
push_front_worker(pollset, worker);
added_worker = 1;
gpr_cv_wait(&worker->cv, &pollset->mu, deadline);
} else {
pollset->kicked_without_pollers = 0;
}
done:
gpr_cv_destroy(&worker->cv);
if (added_worker) {
remove_worker(pollset, worker);
}
return 1 /* GPR_TRUE */;
}
void grpc_pollset_kick(grpc_pollset *p) { gpr_cv_signal(&p->cv); }
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker;
specific_worker = specific_worker->next) {
gpr_cv_signal(&specific_worker->cv);
}
p->kicked_without_pollers = 1;
} else {
gpr_cv_signal(&specific_worker->cv);
}
} else {
specific_worker = pop_front_worker(p);
if (specific_worker != NULL) {
push_back_worker(p, specific_worker);
gpr_cv_signal(&specific_worker->cv);
} else {
p->kicked_without_pollers = 1;
}
}
}
#endif /* GPR_WINSOCK_SOCKET */

@ -42,10 +42,17 @@
nature of the IO completion ports. A Windows "pollset" is merely a mutex
and a condition variable, used to synchronize with the IOCP. */
typedef struct grpc_pollset_worker {
gpr_cv cv;
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
} grpc_pollset_worker;
typedef struct grpc_pollset {
gpr_mu mu;
gpr_cv cv;
int shutting_down;
int kicked_without_pollers;
grpc_pollset_worker root_worker;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)

@ -42,7 +42,7 @@
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/log.h>
static void eventfd_create(grpc_wakeup_fd_info *fd_info) {
static void eventfd_create(grpc_wakeup_fd *fd_info) {
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
/* TODO(klempner): Handle failure more gracefully */
GPR_ASSERT(efd >= 0);
@ -50,7 +50,7 @@ static void eventfd_create(grpc_wakeup_fd_info *fd_info) {
fd_info->write_fd = -1;
}
static void eventfd_consume(grpc_wakeup_fd_info *fd_info) {
static void eventfd_consume(grpc_wakeup_fd *fd_info) {
eventfd_t value;
int err;
do {
@ -58,14 +58,14 @@ static void eventfd_consume(grpc_wakeup_fd_info *fd_info) {
} while (err < 0 && errno == EINTR);
}
static void eventfd_wakeup(grpc_wakeup_fd_info *fd_info) {
static void eventfd_wakeup(grpc_wakeup_fd *fd_info) {
int err;
do {
err = eventfd_write(fd_info->read_fd, 1);
} while (err < 0 && errno == EINTR);
}
static void eventfd_destroy(grpc_wakeup_fd_info *fd_info) {
static void eventfd_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
}

@ -44,7 +44,7 @@
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/log.h>
static void pipe_create(grpc_wakeup_fd_info *fd_info) {
static void pipe_init(grpc_wakeup_fd *fd_info) {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
GPR_ASSERT(0 == pipe(pipefd));
@ -54,7 +54,7 @@ static void pipe_create(grpc_wakeup_fd_info *fd_info) {
fd_info->write_fd = pipefd[1];
}
static void pipe_consume(grpc_wakeup_fd_info *fd_info) {
static void pipe_consume(grpc_wakeup_fd *fd_info) {
char buf[128];
int r;
@ -74,13 +74,13 @@ static void pipe_consume(grpc_wakeup_fd_info *fd_info) {
}
}
static void pipe_wakeup(grpc_wakeup_fd_info *fd_info) {
static void pipe_wakeup(grpc_wakeup_fd *fd_info) {
char c = 0;
while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR)
;
}
static void pipe_destroy(grpc_wakeup_fd_info *fd_info) {
static void pipe_destroy(grpc_wakeup_fd *fd_info) {
close(fd_info->read_fd);
close(fd_info->write_fd);
}
@ -91,7 +91,7 @@ static int pipe_check_availability(void) {
}
const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {
pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability
};
pipe_init, pipe_consume, pipe_wakeup, pipe_destroy,
pipe_check_availability};
#endif /* GPR_POSIX_WAKUP_FD */

@ -57,19 +57,19 @@ void grpc_wakeup_fd_global_destroy(void) {
wakeup_fd_vtable = NULL;
}
void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info) {
wakeup_fd_vtable->create(fd_info);
void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->init(fd_info);
}
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info) {
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->consume(fd_info);
}
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) {
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->wakeup(fd_info);
}
void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) {
void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->destroy(fd_info);
}

@ -69,28 +69,28 @@ void grpc_wakeup_fd_global_destroy(void);
* purposes only.*/
void grpc_wakeup_fd_global_init_force_fallback(void);
typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info;
typedef struct grpc_wakeup_fd grpc_wakeup_fd;
typedef struct grpc_wakeup_fd_vtable {
void (*create)(grpc_wakeup_fd_info *fd_info);
void (*consume)(grpc_wakeup_fd_info *fd_info);
void (*wakeup)(grpc_wakeup_fd_info *fd_info);
void (*destroy)(grpc_wakeup_fd_info *fd_info);
void (*init)(grpc_wakeup_fd *fd_info);
void (*consume)(grpc_wakeup_fd *fd_info);
void (*wakeup)(grpc_wakeup_fd *fd_info);
void (*destroy)(grpc_wakeup_fd *fd_info);
/* Must be called before calling any other functions */
int (*check_availability)(void);
} grpc_wakeup_fd_vtable;
struct grpc_wakeup_fd_info {
struct grpc_wakeup_fd {
int read_fd;
int write_fd;
};
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info);
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info);
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info);
void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info);
/* Defined in some specialized implementation's .c file, or by
* wakeup_fd_nospecial.c if no such implementation exists. */

@ -80,7 +80,7 @@ static void on_compute_engine_detection_http_response(
}
gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset));
detector->is_done = 1;
grpc_pollset_kick(&detector->pollset);
grpc_pollset_kick(&detector->pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
@ -112,7 +112,9 @@ static int is_stack_running_on_compute_engine(void) {
called once for the lifetime of the process by the default credentials. */
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
grpc_pollset_work(&detector.pollset, gpr_inf_future(GPR_CLOCK_REALTIME));
grpc_pollset_worker worker;
grpc_pollset_work(&detector.pollset, &worker,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));

@ -45,6 +45,13 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#define MAX_PLUCKERS 4
typedef struct {
grpc_pollset_worker *worker;
void *tag;
} plucker;
/* Completion queue structure */
struct grpc_completion_queue {
/** completed events */
@ -60,6 +67,8 @@ struct grpc_completion_queue {
int shutdown;
int shutdown_called;
int is_server_cq;
int num_pluckers;
plucker pluckers[MAX_PLUCKERS];
};
grpc_completion_queue *grpc_completion_queue_create(void) {
@ -117,6 +126,8 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
int shutdown;
int i;
grpc_pollset_worker *pluck_worker;
storage->tag = tag;
storage->done = done;
@ -130,7 +141,14 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
grpc_pollset_kick(&cc->pollset);
pluck_worker = NULL;
for (i = 0; i < cc->num_pluckers; i++) {
if (cc->pluckers[i].tag == tag) {
pluck_worker = cc->pluckers[i].worker;
break;
}
}
grpc_pollset_kick(&cc->pollset, pluck_worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
cc->completed_tail->next =
@ -147,6 +165,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
grpc_event ret;
grpc_pollset_worker worker;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@ -172,7 +191,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
@ -184,11 +203,34 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
return ret;
}
static void add_plucker(grpc_completion_queue *cc, void *tag,
grpc_pollset_worker *worker) {
GPR_ASSERT(cc->num_pluckers != MAX_PLUCKERS);
cc->pluckers[cc->num_pluckers].tag = tag;
cc->pluckers[cc->num_pluckers].worker = worker;
cc->num_pluckers++;
}
static void del_plucker(grpc_completion_queue *cc, void *tag,
grpc_pollset_worker *worker) {
int i;
for (i = 0; i < cc->num_pluckers; i++) {
if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
cc->num_pluckers--;
GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
return;
}
}
gpr_log(GPR_ERROR, "should never reach here");
abort();
}
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker worker;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@ -219,12 +261,15 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
add_plucker(cc, tag, &worker);
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
del_plucker(cc, tag, &worker);
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
@ -261,15 +306,6 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return &cc->pollset;
}
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_kick(&cc->pollset);
grpc_pollset_work(&cc->pollset,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(100, GPR_TIMESPAN)));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }

@ -77,8 +77,6 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);

@ -327,6 +327,14 @@ static void request_matcher_zombify_all_pending_calls(
}
}
static void request_matcher_kill_requests(grpc_server *server,
request_matcher *rm) {
int request_id;
while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
fail_call(server, &server->requested_calls[request_id]);
}
}
/*
* server proper
*/
@ -492,12 +500,25 @@ static int num_channels(grpc_server *server) {
return n;
}
static void kill_pending_work_locked(grpc_server *server) {
registered_method *rm;
request_matcher_kill_requests(server, &server->unregistered_request_matcher);
request_matcher_zombify_all_pending_calls(
&server->unregistered_request_matcher);
for (rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_kill_requests(server, &rm->request_matcher);
request_matcher_zombify_all_pending_calls(&rm->request_matcher);
}
}
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
kill_pending_work_locked(server);
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
@ -945,52 +966,15 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
op.set_accept_stream_user_data = chand;
op.on_connectivity_state_change = &chand->channel_connectivity_changed;
op.connectivity_state = &chand->connectivity_state;
op.disconnect = gpr_atm_acq_load(&s->shutdown_flag);
grpc_transport_perform_op(transport, &op);
}
typedef struct {
requested_call **requests;
size_t count;
size_t capacity;
} request_killer;
static void request_killer_init(request_killer *rk) {
memset(rk, 0, sizeof(*rk));
}
static void request_killer_add(request_killer *rk, requested_call *rc) {
if (rk->capacity == rk->count) {
rk->capacity = GPR_MAX(8, rk->capacity * 2);
rk->requests =
gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests));
}
rk->requests[rk->count++] = rc;
}
static void request_killer_add_request_matcher(request_killer *rk,
grpc_server *server,
request_matcher *rm) {
int request_id;
while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
request_killer_add(rk, &server->requested_calls[request_id]);
}
}
static void request_killer_run(request_killer *rk, grpc_server *server) {
size_t i;
for (i = 0; i < rk->count; i++) {
fail_call(server, rk->requests[i]);
}
gpr_free(rk->requests);
}
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
request_killer reqkill;
GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
@ -1011,27 +995,16 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
channel_broadcaster_init(server, &broadcaster);
request_killer_init(&reqkill);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
request_killer_add_request_matcher(&reqkill, server,
&server->unregistered_request_matcher);
request_matcher_zombify_all_pending_calls(
&server->unregistered_request_matcher);
for (rm = server->registered_methods; rm; rm = rm->next) {
request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher);
request_matcher_zombify_all_pending_calls(&rm->request_matcher);
}
kill_pending_work_locked(server);
gpr_mu_unlock(&server->mu_call);
gpr_atm_rel_store(&server->shutdown_flag, 1);
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
request_killer_run(&reqkill, server);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
l->destroy(server, l->arg);
@ -1272,6 +1245,12 @@ static void done_request_event(void *req, grpc_cq_completion *c) {
gpr_free(req);
}
if (gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_lock(&server->mu_global);
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
}
server_unref(server);
}

@ -823,6 +823,12 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
stream_global);
} else {
stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (stream_global->outgoing_sopb != NULL) {
grpc_sopb_reset(stream_global->outgoing_sopb);
stream_global->outgoing_sopb = NULL;
grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 1);
}
stream_global->read_closed = 1;
if (!stream_global->published_cancelled) {
char buffer[GPR_LTOA_MIN_BUFSIZE];

@ -64,7 +64,7 @@ static void on_finish(void *arg, const grpc_httpcli_response *response) {
GPR_ASSERT(0 == memcmp(expect, response->body, response->body_length));
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
g_done = 1;
grpc_pollset_kick(&g_pollset);
grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -87,7 +87,8 @@ static void test_get(int use_ssl, int port) {
(void *)42);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) {
grpc_pollset_work(&g_pollset, n_seconds_time(20));
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_free(host);
@ -112,7 +113,8 @@ static void test_post(int use_ssl, int port) {
n_seconds_time(15), on_finish, (void *)42);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) {
grpc_pollset_work(&g_pollset, n_seconds_time(20));
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_free(host);

@ -132,7 +132,7 @@ static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
gpr_log(GPR_INFO, "Read handler shutdown");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->read_done = 1;
grpc_pollset_kick(g_pollset);
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
return;
}
@ -143,7 +143,7 @@ static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
gpr_log(GPR_INFO, "Read handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->read_done = 1;
grpc_pollset_kick(g_pollset);
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else {
grpc_endpoint_notify_on_read(state->read_ep,
@ -167,7 +167,7 @@ static void read_and_write_test_write_handler(void *data,
gpr_log(GPR_INFO, "Write handler shutdown");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->write_done = 1;
grpc_pollset_kick(g_pollset);
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
return;
}
@ -201,7 +201,7 @@ static void read_and_write_test_write_handler(void *data,
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->write_done = 1;
grpc_pollset_kick(g_pollset);
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
@ -254,8 +254,9 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
while (!state.read_done || !state.write_done) {
grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
grpc_pollset_work(g_pollset, deadline);
grpc_pollset_work(g_pollset, &worker, deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
@ -287,7 +288,7 @@ static void shutdown_during_write_test_read_handler(
grpc_endpoint_destroy(st->ep);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
st->done = error;
grpc_pollset_kick(g_pollset);
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else {
grpc_endpoint_notify_on_read(
@ -309,7 +310,7 @@ static void shutdown_during_write_test_write_handler(
}
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
st->done = 1;
grpc_pollset_kick(g_pollset);
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
@ -350,15 +351,17 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
while (!write_st.done) {
grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
grpc_pollset_work(g_pollset, deadline);
grpc_pollset_work(g_pollset, &worker, deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
grpc_endpoint_destroy(write_st.ep);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
while (!read_st.done) {
grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
grpc_pollset_work(g_pollset, deadline);
grpc_pollset_work(g_pollset, &worker, deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
gpr_free(slices);

@ -179,7 +179,7 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
sv->done = 1;
grpc_pollset_kick(&g_pollset);
grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -249,7 +249,8 @@ static int server_start(server *sv) {
static void server_wait_and_shutdown(server *sv) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!sv->done) {
grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_MONOTONIC));
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -286,7 +287,7 @@ static void client_session_shutdown_cb(void *arg /*client*/, int success) {
client *cl = arg;
grpc_fd_orphan(cl->em_fd, NULL, "c");
cl->done = 1;
grpc_pollset_kick(&g_pollset);
grpc_pollset_kick(&g_pollset, NULL);
}
/* Write as much as possible, then register notify_on_write. */
@ -356,7 +357,8 @@ static void client_start(client *cl, int port) {
static void client_wait_and_shutdown(client *cl) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!cl->done) {
grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_MONOTONIC));
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -392,7 +394,7 @@ static void first_read_callback(void *arg /* fd_change_data */, int success) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
fdc->cb_that_ran = first_read_callback;
grpc_pollset_kick(&g_pollset);
grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -401,7 +403,7 @@ static void second_read_callback(void *arg /* fd_change_data */, int success) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
fdc->cb_that_ran = second_read_callback;
grpc_pollset_kick(&g_pollset);
grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -445,7 +447,8 @@ static void test_grpc_fd_change(void) {
/* And now wait for it to run. */
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (a.cb_that_ran == NULL) {
grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_MONOTONIC));
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
GPR_ASSERT(a.cb_that_ran == first_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -463,7 +466,8 @@ static void test_grpc_fd_change(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (b.cb_that_ran == NULL) {
grpc_pollset_work(&g_pollset, gpr_inf_future(GPR_CLOCK_MONOTONIC));
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
/* Except now we verify that second_read_callback ran instead */
GPR_ASSERT(b.cb_that_ran == second_read_callback);

@ -1,130 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/iomgr/pollset_kick_posix.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
static void test_allocation(void) {
grpc_pollset_kick_state state;
grpc_pollset_kick_init(&state);
grpc_pollset_kick_destroy(&state);
}
static void test_non_kick(void) {
grpc_pollset_kick_state state;
grpc_kick_fd_info *kfd;
grpc_pollset_kick_init(&state);
kfd = grpc_pollset_kick_pre_poll(&state);
GPR_ASSERT(kfd != NULL);
grpc_pollset_kick_post_poll(&state, kfd);
grpc_pollset_kick_destroy(&state);
}
static void test_basic_kick(void) {
/* Kicked during poll */
grpc_pollset_kick_state state;
grpc_kick_fd_info *kfd;
grpc_pollset_kick_init(&state);
kfd = grpc_pollset_kick_pre_poll(&state);
GPR_ASSERT(kfd != NULL);
grpc_pollset_kick_kick(&state);
/* Now hypothetically we polled and found that we were kicked */
grpc_pollset_kick_consume(&state, kfd);
grpc_pollset_kick_post_poll(&state, kfd);
grpc_pollset_kick_destroy(&state);
}
static void test_non_poll_kick(void) {
/* Kick before entering poll */
grpc_pollset_kick_state state;
grpc_kick_fd_info *kfd;
grpc_pollset_kick_init(&state);
grpc_pollset_kick_kick(&state);
kfd = grpc_pollset_kick_pre_poll(&state);
GPR_ASSERT(kfd == NULL);
grpc_pollset_kick_destroy(&state);
}
#define GRPC_MAX_CACHED_PIPES 50
static void test_over_free(void) {
/* Check high watermark pipe free logic */
int i;
grpc_kick_fd_info **kfds =
gpr_malloc(sizeof(grpc_kick_fd_info *) * GRPC_MAX_CACHED_PIPES);
grpc_pollset_kick_state state;
grpc_pollset_kick_init(&state);
for (i = 0; i < GRPC_MAX_CACHED_PIPES; ++i) {
kfds[i] = grpc_pollset_kick_pre_poll(&state);
GPR_ASSERT(kfds[i] != NULL);
}
for (i = 0; i < GRPC_MAX_CACHED_PIPES; ++i) {
grpc_pollset_kick_post_poll(&state, kfds[i]);
}
grpc_pollset_kick_destroy(&state);
gpr_free(kfds);
}
static void run_tests(void) {
test_allocation();
test_basic_kick();
test_non_poll_kick();
test_non_kick();
test_over_free();
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_pollset_kick_global_init();
run_tests();
grpc_pollset_kick_global_destroy();
grpc_pollset_kick_global_init_fallback_fd();
run_tests();
grpc_pollset_kick_global_destroy();
return 0;
}

@ -56,7 +56,7 @@ static gpr_timespec test_deadline(void) {
static void finish_connection() {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
g_connections_complete++;
grpc_pollset_kick(&g_pollset);
grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -111,7 +111,8 @@ void test_succeeds(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (g_connections_complete == connections_complete_before) {
grpc_pollset_work(&g_pollset, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -140,7 +141,8 @@ void test_fails(void) {
/* wait for the connection callback to finish */
while (g_connections_complete == connections_complete_before) {
grpc_pollset_work(&g_pollset, test_deadline());
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, test_deadline());
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -199,6 +201,7 @@ void test_times_out(void) {
gpr_now(connect_deadline.clock_type)) > 0) {
int is_after_deadline =
gpr_time_cmp(connect_deadline, gpr_now(GPR_CLOCK_MONOTONIC)) <= 0;
grpc_pollset_worker worker;
if (is_after_deadline &&
gpr_time_cmp(gpr_time_add(connect_deadline,
gpr_time_from_seconds(1, GPR_TIMESPAN)),
@ -208,7 +211,7 @@ void test_times_out(void) {
GPR_ASSERT(g_connections_complete ==
connections_complete_before + is_after_deadline);
}
grpc_pollset_work(&g_pollset, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));

@ -186,7 +186,8 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_work(&g_pollset, deadline);
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, deadline);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -222,7 +223,8 @@ static void large_read_test(ssize_t slice_size) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_work(&g_pollset, deadline);
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, deadline);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@ -265,7 +267,7 @@ static void write_done(void *user_data /* write_socket_state */,
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
gpr_log(GPR_INFO, "Signalling write done");
state->write_done = 1;
grpc_pollset_kick(&g_pollset);
grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -281,8 +283,9 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
for (;;) {
grpc_pollset_worker worker;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
grpc_pollset_work(&g_pollset, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
do {
bytes_read =
@ -358,10 +361,11 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
drain_socket_blocking(sv[0], num_bytes, num_bytes);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
grpc_pollset_worker worker;
if (state.write_done) {
break;
}
grpc_pollset_work(&g_pollset, deadline);
grpc_pollset_work(&g_pollset, &worker, deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -387,6 +391,7 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
size_t num_blocks;
gpr_slice *slices;
int current_data = 0;
grpc_pollset_worker worker;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d",
@ -417,7 +422,7 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
if (state.write_done) {
break;
}
grpc_pollset_work(&g_pollset, deadline);
grpc_pollset_work(&g_pollset, &worker, deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
break;

@ -54,7 +54,7 @@ static void on_connect(void *arg, grpc_endpoint *tcp) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
g_nconnects++;
grpc_pollset_kick(&g_pollset);
grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@ -136,7 +136,8 @@ static void test_connect(int n) {
gpr_log(GPR_DEBUG, "wait");
while (g_nconnects == nconnects_before &&
gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
grpc_pollset_work(&g_pollset, deadline);
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, deadline);
}
gpr_log(GPR_DEBUG, "wait done");

@ -68,7 +68,7 @@ static void on_oauth2_response(void *user_data, grpc_credentials_md *md_elems,
gpr_mu_lock(GRPC_POLLSET_MU(&request->pollset));
request->is_done = 1;
request->token = token;
grpc_pollset_kick(&request->pollset);
grpc_pollset_kick(&request->pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&request->pollset));
}
@ -83,8 +83,11 @@ char *grpc_test_fetch_oauth2_token_with_credentials(grpc_credentials *creds) {
on_oauth2_response, &request);
gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset));
while (!request.is_done)
grpc_pollset_work(&request.pollset, gpr_inf_future(GPR_CLOCK_MONOTONIC));
while (!request.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&request.pollset, &worker,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset));
grpc_pollset_shutdown(&request.pollset, do_nothing, NULL);

@ -65,7 +65,7 @@ static void on_metadata_response(void *user_data,
}
gpr_mu_lock(GRPC_POLLSET_MU(&sync->pollset));
sync->is_done = 1;
grpc_pollset_kick(&sync->pollset);
grpc_pollset_kick(&sync->pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&sync->pollset));
}
@ -95,8 +95,11 @@ int main(int argc, char **argv) {
on_metadata_response, &sync);
gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset));
while (!sync.is_done)
grpc_pollset_work(&sync.pollset, gpr_inf_future(GPR_CLOCK_REALTIME));
while (!sync.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&sync.pollset, &worker,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset));
grpc_credentials_release(creds);

@ -79,7 +79,7 @@ static void on_jwt_verification_done(void *user_data,
gpr_mu_lock(GRPC_POLLSET_MU(&sync->pollset));
sync->is_done = 1;
grpc_pollset_kick(&sync->pollset);
grpc_pollset_kick(&sync->pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&sync->pollset));
}
@ -109,8 +109,11 @@ int main(int argc, char **argv) {
on_jwt_verification_done, &sync);
gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset));
while (!sync.is_done)
grpc_pollset_work(&sync.pollset, gpr_inf_future(GPR_CLOCK_REALTIME));
while (!sync.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&sync.pollset, &worker,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset));
grpc_jwt_verifier_destroy(verifier);

@ -819,7 +819,6 @@ src/core/iomgr/iomgr.h \
src/core/iomgr/iomgr_internal.h \
src/core/iomgr/iomgr_posix.h \
src/core/iomgr/pollset.h \
src/core/iomgr/pollset_kick_posix.h \
src/core/iomgr/pollset_posix.h \
src/core/iomgr/pollset_set.h \
src/core/iomgr/pollset_set_posix.h \
@ -939,7 +938,6 @@ src/core/iomgr/iocp_windows.c \
src/core/iomgr/iomgr.c \
src/core/iomgr/iomgr_posix.c \
src/core/iomgr/iomgr_windows.c \
src/core/iomgr/pollset_kick_posix.c \
src/core/iomgr/pollset_multipoller_with_epoll.c \
src/core/iomgr/pollset_multipoller_with_poll_posix.c \
src/core/iomgr/pollset_posix.c \

@ -817,20 +817,6 @@
"test/core/end2end/no_server_test.c"
]
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"language": "c",
"name": "poll_kick_posix_test",
"src": [
"test/core/iomgr/poll_kick_posix_test.c"
]
},
{
"deps": [
"gpr",
@ -10819,7 +10805,6 @@
"src/core/iomgr/iomgr_internal.h",
"src/core/iomgr/iomgr_posix.h",
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset_posix.h",
"src/core/iomgr/pollset_set.h",
"src/core/iomgr/pollset_set_posix.h",
@ -10985,8 +10970,6 @@
"src/core/iomgr/iomgr_posix.h",
"src/core/iomgr/iomgr_windows.c",
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_kick_posix.c",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset_multipoller_with_epoll.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_posix.c",
@ -11279,7 +11262,6 @@
"src/core/iomgr/iomgr_internal.h",
"src/core/iomgr/iomgr_posix.h",
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset_posix.h",
"src/core/iomgr/pollset_set.h",
"src/core/iomgr/pollset_set_posix.h",
@ -11423,8 +11405,6 @@
"src/core/iomgr/iomgr_posix.h",
"src/core/iomgr/iomgr_windows.c",
"src/core/iomgr/pollset.h",
"src/core/iomgr/pollset_kick_posix.c",
"src/core/iomgr/pollset_kick_posix.h",
"src/core/iomgr/pollset_multipoller_with_epoll.c",
"src/core/iomgr/pollset_multipoller_with_poll_posix.c",
"src/core/iomgr/pollset_posix.c",

@ -463,14 +463,6 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "poll_kick_posix_test",
"platforms": [
"posix"
]
},
{
"flaky": false,
"language": "c",

@ -281,7 +281,6 @@
<ClInclude Include="..\..\src\core\iomgr\iomgr_internal.h" />
<ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_kick_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_set.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_set_posix.h" />
@ -461,8 +460,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\iomgr_windows.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_epoll.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">

@ -175,9 +175,6 @@
<ClCompile Include="..\..\src\core\iomgr\iomgr_windows.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_epoll.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
@ -593,9 +590,6 @@
<ClInclude Include="..\..\src\core\iomgr\pollset.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\iomgr\pollset_kick_posix.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\iomgr\pollset_posix.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>

@ -260,7 +260,6 @@
<ClInclude Include="..\..\src\core\iomgr\iomgr_internal.h" />
<ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_kick_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_set.h" />
<ClInclude Include="..\..\src\core\iomgr\pollset_set_posix.h" />
@ -394,8 +393,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\iomgr_windows.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_epoll.c">
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c">

@ -106,9 +106,6 @@
<ClCompile Include="..\..\src\core\iomgr\iomgr_windows.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_epoll.c">
<Filter>src\core\iomgr</Filter>
</ClCompile>
@ -470,9 +467,6 @@
<ClInclude Include="..\..\src\core\iomgr\pollset.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\iomgr\pollset_kick_posix.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\iomgr\pollset_posix.h">
<Filter>src\core\iomgr</Filter>
</ClInclude>

Loading…
Cancel
Save