Error reporting progress

pull/6897/head
Craig Tiller 9 years ago
parent 80384bd2e3
commit 4f1d0f337b
  1. 10
      src/core/lib/iomgr/error.c
  2. 6
      src/core/lib/iomgr/error.h
  3. 131
      src/core/lib/iomgr/ev_poll_and_epoll_posix.c
  4. 16
      src/core/lib/iomgr/ev_posix.c
  5. 12
      src/core/lib/iomgr/ev_posix.h
  6. 11
      src/core/lib/iomgr/pollset.h
  7. 15
      src/core/lib/iomgr/tcp_client_posix.c
  8. 22
      src/core/lib/iomgr/wakeup_fd_pipe.c
  9. 12
      src/core/lib/iomgr/wakeup_fd_posix.c
  10. 15
      src/core/lib/iomgr/wakeup_fd_posix.h
  11. 10
      src/core/lib/iomgr/workqueue.h
  12. 62
      src/core/lib/iomgr/workqueue_posix.c
  13. 13
      src/core/lib/security/credentials/google_default/google_default_credentials.c
  14. 38
      src/core/lib/surface/completion_queue.c
  15. 10
      test/core/iomgr/endpoint_tests.c
  16. 12
      test/core/security/oauth2_utils.c

@ -469,3 +469,13 @@ grpc_error *grpc_os_error(const char *file, int line, int err,
GRPC_ERROR_STR_OS_ERROR, strerror(err)),
GRPC_ERROR_STR_SYSCALL, call_name);
}
bool grpc_log_if_error(const char *what, grpc_error *error, const char *file,
int line) {
if (error == GRPC_ERROR_NONE) return true;
const char *msg = grpc_error_string(error);
gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "%s: %s", what, msg);
grpc_error_free_string(msg);
GRPC_ERROR_UNREF(error);
return false;
}

@ -34,6 +34,7 @@
#ifndef GRPC_CORE_LIB_IOMGR_ERROR_H
#define GRPC_CORE_LIB_IOMGR_ERROR_H
#include <stdbool.h>
#include <stdint.h>
#include <grpc/support/time.h>
@ -114,4 +115,9 @@ grpc_error *grpc_os_error(const char *file, int line, int err,
#define GRPC_OS_ERROR(err, call_name) \
grpc_os_error(__FILE__, __LINE__, err, call_name)
bool grpc_log_if_error(const char *what, grpc_error *error, const char *file,
int line);
#define GRPC_LOG_IF_ERROR(what, error) \
grpc_log_if_error((what), (error), __FILE__, __LINE__)
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */

@ -217,9 +217,10 @@ struct grpc_pollset {
struct grpc_pollset_vtable {
void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
struct grpc_fd *fd, int and_unlock_pollset);
void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now);
grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now);
void (*finish_shutdown)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
};
@ -247,9 +248,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
/* As per pollset_kick, with an extended set of flags (defined above)
-- mostly for fd_posix's use. */
static void pollset_kick_ext(grpc_pollset *p,
grpc_pollset_worker *specific_worker,
uint32_t flags);
static grpc_error *pollset_kick_ext(grpc_pollset *p,
grpc_pollset_worker *specific_worker,
uint32_t flags) GRPC_MUST_USE_RESULT;
/* turn a pollset into a multipoller: platform specific */
typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
@ -415,12 +416,13 @@ static bool fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
static void pollset_kick_locked(grpc_fd_watcher *watcher) {
static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) {
gpr_mu_lock(&watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
pollset_kick_ext(watcher->pollset, watcher->worker,
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker,
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
gpr_mu_unlock(&watcher->pollset->mu);
return err;
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
@ -726,10 +728,19 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next->prev = worker;
}
static void pollset_kick_ext(grpc_pollset *p,
grpc_pollset_worker *specific_worker,
uint32_t flags) {
static void kick_append_error(grpc_error **composite, grpc_error *error) {
if (error == GRPC_ERROR_NONE) return;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE("Kick Failure");
}
*composite = grpc_error_add_child(*composite, error);
}
static grpc_error *pollset_kick_ext(grpc_pollset *p,
grpc_pollset_worker *specific_worker,
uint32_t flags) {
GPR_TIMER_BEGIN("pollset_kick_ext", 0);
grpc_error *error = GRPC_ERROR_NONE;
/* pollset->mu already held */
if (specific_worker != NULL) {
@ -739,7 +750,8 @@ static void pollset_kick_ext(grpc_pollset *p,
for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker;
specific_worker = specific_worker->next) {
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
kick_append_error(
&error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
}
p->kicked_without_pollers = 1;
GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
@ -750,14 +762,16 @@ static void pollset_kick_ext(grpc_pollset *p,
specific_worker->reevaluate_polling_on_wakeup = 1;
}
specific_worker->kicked_specifically = 1;
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
kick_append_error(&error,
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
} else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
GPR_TIMER_MARK("kick_yoself", 0);
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
specific_worker->reevaluate_polling_on_wakeup = 1;
}
specific_worker->kicked_specifically = 1;
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
kick_append_error(&error,
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
}
} else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
@ -778,7 +792,8 @@ static void pollset_kick_ext(grpc_pollset *p,
if (specific_worker != NULL) {
GPR_TIMER_MARK("finally_kick", 0);
push_back_worker(p, specific_worker);
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
kick_append_error(
&error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
}
} else {
GPR_TIMER_MARK("kicked_no_pollers", 0);
@ -787,20 +802,21 @@ static void pollset_kick_ext(grpc_pollset *p,
}
GPR_TIMER_END("pollset_kick_ext", 0);
return error;
}
static void pollset_kick(grpc_pollset *p,
grpc_pollset_worker *specific_worker) {
pollset_kick_ext(p, specific_worker, 0);
static grpc_error *pollset_kick(grpc_pollset *p,
grpc_pollset_worker *specific_worker) {
return pollset_kick_ext(p, specific_worker, 0);
}
/* global state management */
static void pollset_global_init(void) {
static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
grpc_wakeup_fd_global_init();
grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
static void pollset_global_shutdown(void) {
@ -810,7 +826,9 @@ static void pollset_global_shutdown(void) {
grpc_wakeup_fd_global_destroy();
}
static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
static grpc_error *kick_poller(void) {
return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
}
/* main interface */
@ -876,11 +894,12 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
}
static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl, gpr_timespec now,
gpr_timespec deadline) {
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
*worker_hdl = &worker;
grpc_error *error = GRPC_ERROR_NONE;
/* pollset->mu already held */
int added_worker = 0;
@ -896,7 +915,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->local_wakeup_cache = worker.wakeup_fd->next;
} else {
worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
if (error != GRPC_ERROR_NONE) {
return error;
}
}
worker.kicked_specifically = 0;
/* If there's work waiting for the pollset to be idle, and the
@ -933,8 +955,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker,
deadline, now);
error = pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker,
deadline, now);
GPR_TIMER_END("maybe_work_and_unlock", 0);
locked = 0;
gpr_tls_set(&g_current_thread_poller, 0);
@ -955,7 +977,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* If we're forced to re-evaluate polling (via pollset_kick with
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
a loop */
if (worker.reevaluate_polling_on_wakeup) {
if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
worker.reevaluate_polling_on_wakeup = 0;
pollset->kicked_without_pollers = 0;
if (queued_work || worker.kicked_specifically) {
@ -996,6 +1018,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
*worker_hdl = NULL;
GPR_TIMER_END("pollset_work", 0);
return error;
}
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@ -1156,11 +1179,19 @@ exit:
}
}
static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_pollset_worker *worker,
gpr_timespec deadline,
gpr_timespec now) {
static void work_combine_error(grpc_error **composite, grpc_error *error) {
if (error == GRPC_ERROR_NONE) return;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE("pollset_work");
}
*composite = grpc_error_add_child(*composite, error);
}
static grpc_error *basic_pollset_maybe_work_and_unlock(
grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now) {
grpc_error *error = GRPC_ERROR_NONE;
#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
@ -1210,7 +1241,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
if (r < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
}
if (fd) {
fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
@ -1221,10 +1252,12 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
}
} else {
if (pfd[0].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
work_combine_error(&error,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
}
if (pfd[1].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
work_combine_error(&error,
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
}
if (nfds > 2) {
fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
@ -1237,6 +1270,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
if (fd) {
GRPC_FD_UNREF(fd, "basicpoll_begin");
}
return error;
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
@ -1297,9 +1332,11 @@ exit:
}
}
static void multipoll_with_poll_pollset_maybe_work_and_unlock(
static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock(
grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline, gpr_timespec now) {
grpc_error *error = GRPC_ERROR_NONE;
#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
@ -1374,10 +1411,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
work_combine_error(&error,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
}
if (pfds[1].revents & POLLIN_CHECK) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
work_combine_error(&error,
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
}
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
@ -1391,6 +1430,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
gpr_free(pfds);
gpr_free(watchers);
return error;
}
static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
@ -1934,14 +1974,23 @@ static const grpc_event_engine_vtable vtable = {
};
const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) {
const char *msg;
grpc_error *err = GRPC_ERROR_NONE;
#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
platform_become_multipoller = epoll_become_multipoller;
#else
platform_become_multipoller = poll_become_multipoller;
#endif
fd_global_init();
pollset_global_init();
err = pollset_global_init();
if (err != GRPC_ERROR_NONE) goto error;
return &vtable;
error:
msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "%s", msg);
grpc_error_free_string(msg);
return NULL;
}
#endif

@ -101,15 +101,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
g_event_engine->pollset_destroy(pollset);
}
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline) {
g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline);
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline) {
return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline);
}
void grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
g_event_engine->pollset_kick(pollset, specific_worker);
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
return g_event_engine->pollset_kick(pollset, specific_worker);
}
void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@ -159,6 +159,6 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd);
}
void grpc_kick_poller(void) { g_event_engine->kick_poller(); }
grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); }
#endif // GPR_POSIX_SOCKET

@ -61,11 +61,11 @@ typedef struct grpc_event_engine_vtable {
grpc_closure *closure);
void (*pollset_reset)(grpc_pollset *pollset);
void (*pollset_destroy)(grpc_pollset *pollset);
void (*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
void (*pollset_kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
grpc_error *(*pollset_kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
struct grpc_fd *fd);
@ -88,7 +88,7 @@ typedef struct grpc_event_engine_vtable {
void (*pollset_set_del_fd)(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_fd *fd);
void (*kick_poller)(void);
grpc_error *(*kick_poller)(void);
void (*shutdown_engine)(void);
} grpc_event_engine_vtable;

@ -81,14 +81,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
May call grpc_closure_list_run on grpc_closure_list, without holding the
pollset
lock */
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline) GRPC_MUST_USE_RESULT;
/* Break one polling thread out of polling work for this pollset.
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
Otherwise, if specific_worker is non-NULL, then kick that worker. */
void grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker)
GRPC_MUST_USE_RESULT;
#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_H */

@ -76,13 +76,16 @@ static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) {
GPR_ASSERT(fd >= 0);
if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) ||
(!grpc_is_unix_socket(addr) && !grpc_set_socket_low_latency(fd, 1)) ||
!grpc_set_socket_no_sigpipe_if_possible(fd)) {
gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
strerror(errno));
goto error;
err = grpc_set_socket_nonblocking(fd, 1);
if (err != GRPC_ERROR_NONE) goto error;
err = grpc_set_socket_cloexec(fd, 1);
if (err != GRPC_ERROR_NONE) goto error;
if (!grpc_is_unix_socket(addr)) {
err = grpc_set_socket_low_latency(fd, 1);
if (err != GRPC_ERROR_NONE) goto error;
}
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (err != GRPC_ERROR_NONE) goto error;
goto done;
error:

@ -45,7 +45,7 @@
#include "src/core/lib/iomgr/socket_utils_posix.h"
static void pipe_init(grpc_wakeup_fd* fd_info) {
static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) {
int pipefd[2];
/* TODO(klempner): Make this nonfatal */
int r = pipe(pipefd);
@ -53,36 +53,40 @@ static void pipe_init(grpc_wakeup_fd* fd_info) {
gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno));
abort();
}
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1));
GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1));
grpc_error* err;
err = grpc_set_socket_nonblocking(pipefd[0], 1);
if (err != GRPC_ERROR_NONE) return err;
err = grpc_set_socket_nonblocking(pipefd[1], 1);
if (err != GRPC_ERROR_NONE) return err;
fd_info->read_fd = pipefd[0];
fd_info->write_fd = pipefd[1];
return GRPC_ERROR_NONE;
}
static void pipe_consume(grpc_wakeup_fd* fd_info) {
static grpc_error* pipe_consume(grpc_wakeup_fd* fd_info) {
char buf[128];
ssize_t r;
for (;;) {
r = read(fd_info->read_fd, buf, sizeof(buf));
if (r > 0) continue;
if (r == 0) return;
if (r == 0) return GRPC_ERROR_NONE;
switch (errno) {
case EAGAIN:
return;
return GRPC_ERROR_NONE;
case EINTR:
continue;
default:
gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
return;
return GRPC_OS_ERROR(errno, "read");
}
}
}
static void pipe_wakeup(grpc_wakeup_fd* fd_info) {
static grpc_error* pipe_wakeup(grpc_wakeup_fd* fd_info) {
char c = 0;
while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR)
;
return GRPC_ERROR_NONE;
}
static void pipe_destroy(grpc_wakeup_fd* fd_info) {

@ -53,16 +53,16 @@ void grpc_wakeup_fd_global_init(void) {
void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; }
void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->init(fd_info);
grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) {
return wakeup_fd_vtable->init(fd_info);
}
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->consume(fd_info);
grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) {
return wakeup_fd_vtable->consume(fd_info);
}
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
wakeup_fd_vtable->wakeup(fd_info);
grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) {
return wakeup_fd_vtable->wakeup(fd_info);
}
void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) {

@ -62,6 +62,8 @@
#ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H
#define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H
#include "src/core/lib/iomgr/error.h"
void grpc_wakeup_fd_global_init(void);
void grpc_wakeup_fd_global_destroy(void);
@ -72,9 +74,9 @@ void grpc_wakeup_fd_global_init_force_fallback(void);
typedef struct grpc_wakeup_fd grpc_wakeup_fd;
typedef struct grpc_wakeup_fd_vtable {
void (*init)(grpc_wakeup_fd* fd_info);
void (*consume)(grpc_wakeup_fd* fd_info);
void (*wakeup)(grpc_wakeup_fd* fd_info);
grpc_error* (*init)(grpc_wakeup_fd* fd_info);
grpc_error* (*consume)(grpc_wakeup_fd* fd_info);
grpc_error* (*wakeup)(grpc_wakeup_fd* fd_info);
void (*destroy)(grpc_wakeup_fd* fd_info);
/* Must be called before calling any other functions */
int (*check_availability)(void);
@ -89,9 +91,10 @@ extern int grpc_allow_specialized_wakeup_fd;
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
void grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info);
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info);
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info);
grpc_error* grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT;
grpc_error* grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info)
GRPC_MUST_USE_RESULT;
grpc_error* grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT;
void grpc_wakeup_fd_destroy(grpc_wakeup_fd* fd_info);
/* Defined in some specialized implementation's .c file, or by

@ -50,9 +50,11 @@
/* grpc_workqueue is forward declared in exec_ctx.h */
/** Create a work queue */
grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx);
grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
grpc_workqueue **workqueue);
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
grpc_error *grpc_workqueue_flush(
grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) GRPC_MUST_USE_RESULT;
#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
@ -77,7 +79,7 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset);
/** Add a work item to a workqueue */
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
grpc_error *error);
void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error);
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */

@ -47,20 +47,25 @@
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) {
grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
grpc_workqueue **workqueue) {
char name[32];
grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&workqueue->refs, 1);
gpr_mu_init(&workqueue->mu);
workqueue->closure_list.head = workqueue->closure_list.tail = NULL;
grpc_wakeup_fd_init(&workqueue->wakeup_fd);
sprintf(name, "workqueue:%p", (void *)workqueue);
workqueue->wakeup_read_fd =
grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name);
grpc_closure_init(&workqueue->read_closure, on_readable, workqueue);
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
&workqueue->read_closure);
return workqueue;
*workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&(*workqueue)->refs, 1);
gpr_mu_init(&(*workqueue)->mu);
(*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL;
grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd);
if (err != GRPC_ERROR_NONE) {
gpr_free(*workqueue);
return err;
}
sprintf(name, "workqueue:%p", (void *)(*workqueue));
(*workqueue)->wakeup_read_fd = grpc_fd_create(
GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name);
grpc_closure_init(&(*workqueue)->read_closure, on_readable, workqueue);
grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd,
&(*workqueue)->read_closure);
return GRPC_ERROR_NONE;
}
static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
@ -101,13 +106,16 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd);
}
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
grpc_error *grpc_workqueue_flush(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&workqueue->mu);
if (grpc_closure_list_empty(workqueue->closure_list)) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
}
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
gpr_mu_unlock(&workqueue->mu);
return error;
}
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@ -123,20 +131,32 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
} else {
gpr_mu_lock(&workqueue->mu);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu);
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
&workqueue->read_closure);
if (error == GRPC_ERROR_NONE) {
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
&workqueue->read_closure);
} else {
/* recurse to get error handling */
on_readable(exec_ctx, arg, error);
}
}
}
void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
grpc_error *error) {
void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error) {
grpc_error *push_error = GRPC_ERROR_NONE;
gpr_mu_lock(&workqueue->mu);
if (grpc_closure_list_empty(workqueue->closure_list)) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
}
grpc_closure_list_append(&workqueue->closure_list, closure, error);
if (push_error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(push_error);
gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg);
grpc_error_free_string(msg);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
}
gpr_mu_unlock(&workqueue->mu);
}

@ -88,7 +88,7 @@ static void on_compute_engine_detection_http_response(grpc_exec_ctx *exec_ctx,
}
gpr_mu_lock(g_polling_mu);
detector->is_done = 1;
grpc_pollset_kick(detector->pollset, NULL);
GRPC_LOG_IF_ERROR("Pollset kick", grpc_pollset_kick(detector->pollset, NULL));
gpr_mu_unlock(g_polling_mu);
}
@ -131,9 +131,14 @@ static int is_stack_running_on_compute_engine(void) {
gpr_mu_lock(g_polling_mu);
while (!detector.is_done) {
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, detector.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
if (!GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(&exec_ctx, detector.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC)))) {
detector.is_done = 1;
detector.success = 0;
}
}
gpr_mu_unlock(g_polling_mu);

@ -263,8 +263,15 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
break;
}
}
grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
grpc_error *kick_error =
grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
gpr_mu_unlock(cc->mu);
if (kick_error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(kick_error);
gpr_log(GPR_ERROR, "Kick failed: %s", msg);
grpc_error_free_string(msg);
GRPC_ERROR_UNREF(kick_error);
}
} else {
cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
@ -343,8 +350,18 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_lock(cc->mu);
continue;
} else {
grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
iteration_deadline);
grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc),
&worker, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
gpr_mu_unlock(cc->mu);
const char *msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
grpc_error_free_string(msg);
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
}
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
@ -460,8 +477,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(cc->mu);
} else {
grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
iteration_deadline);
grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc),
&worker, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu);
const char *msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
grpc_error_free_string(msg);
GRPC_ERROR_UNREF(err);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
}
del_plucker(cc, tag, &worker);
}

@ -137,7 +137,7 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_INFO, "Read handler done");
gpr_mu_lock(g_mu);
state->read_done = 1 + (error == GRPC_ERROR_NONE);
grpc_pollset_kick(g_pollset, NULL);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL));
gpr_mu_unlock(g_mu);
} else if (error == GRPC_ERROR_NONE) {
grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming,
@ -172,7 +172,7 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(g_mu);
state->write_done = 1 + (error != GRPC_ERROR_NONE);
grpc_pollset_kick(g_pollset, NULL);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL));
gpr_mu_unlock(g_mu);
}
@ -237,8 +237,10 @@ static void read_and_write_test(grpc_endpoint_test_config config,
while (!state.read_done || !state.write_done) {
grpc_pollset_worker *worker = NULL;
GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
grpc_pollset_work(&exec_ctx, g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(&exec_ctx, g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline)));
}
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);

@ -70,7 +70,7 @@ static void on_oauth2_response(grpc_exec_ctx *exec_ctx, void *user_data,
gpr_mu_lock(request->mu);
request->is_done = 1;
request->token = token;
grpc_pollset_kick(request->pollset, NULL);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(request->pollset, NULL));
gpr_mu_unlock(request->mu);
}
@ -99,9 +99,13 @@ char *grpc_test_fetch_oauth2_token_with_credentials(
gpr_mu_lock(request.mu);
while (!request.is_done) {
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, request.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
if (GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(&exec_ctx, request.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC)))) {
request.is_done = 1;
}
}
gpr_mu_unlock(request.mu);

Loading…
Cancel
Save