|
|
|
@ -68,7 +68,7 @@ static int grpc_polling_trace = 0; /* Disabled by default */ |
|
|
|
|
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Uncomment the following enable extra checks on poll_object operations */ |
|
|
|
|
/* Uncomment the following to enable extra checks on poll_object operations */ |
|
|
|
|
/* #define PO_DEBUG */ |
|
|
|
|
|
|
|
|
|
static int grpc_wakeup_signal = -1; |
|
|
|
@ -140,24 +140,61 @@ struct grpc_fd { |
|
|
|
|
Ref/Unref by two to avoid altering the orphaned bit */ |
|
|
|
|
gpr_atm refst; |
|
|
|
|
|
|
|
|
|
/* Indicates that the fd is shutdown and that any pending read/write closures
|
|
|
|
|
should fail */ |
|
|
|
|
bool shutdown; |
|
|
|
|
grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */ |
|
|
|
|
/* Internally stores data of type (grpc_error *). If the FD is shutdown, this
|
|
|
|
|
contains reason for shutdown (i.e a pointer to grpc_error) ORed with |
|
|
|
|
FD_SHUTDOWN_BIT. Since address allocations are word-aligned, the lower bit |
|
|
|
|
of (grpc_error *) addresses is guaranteed to be zero. Even if the |
|
|
|
|
(grpc_error *), is of special types like GRPC_ERROR_NONE, GRPC_ERROR_OOM |
|
|
|
|
etc, the lower bit is guaranteed to be zero. |
|
|
|
|
|
|
|
|
|
/* The fd is either closed or we relinquished control of it. In either cases,
|
|
|
|
|
this indicates that the 'fd' on this structure is no longer valid */ |
|
|
|
|
Once an fd is shutdown, any pending or future read/write closures on the |
|
|
|
|
fd should fail */ |
|
|
|
|
gpr_atm shutdown_error; |
|
|
|
|
|
|
|
|
|
/* The fd is either closed or we relinquished control of it. In either
|
|
|
|
|
cases, this indicates that the 'fd' on this structure is no longer |
|
|
|
|
valid */ |
|
|
|
|
bool orphaned; |
|
|
|
|
|
|
|
|
|
/* TODO: sreek - Move this to a lockfree implementation */ |
|
|
|
|
grpc_closure *read_closure; |
|
|
|
|
grpc_closure *write_closure; |
|
|
|
|
/* Closures to call when the fd is readable or writable respectively. These
|
|
|
|
|
fields contain one of the following values: |
|
|
|
|
CLOSURE_READY : The fd has an I/O event of interest but there is no |
|
|
|
|
closure yet to execute |
|
|
|
|
|
|
|
|
|
CLOSURE_NOT_READY : The fd has no I/O event of interest |
|
|
|
|
|
|
|
|
|
closure ptr : The closure to be executed when the fd has an I/O |
|
|
|
|
event of interest |
|
|
|
|
|
|
|
|
|
shutdown_error | FD_SHUTDOWN_BIT : |
|
|
|
|
'shutdown_error' field ORed with FD_SHUTDOWN_BIT. |
|
|
|
|
This indicates that the fd is shutdown. Since all |
|
|
|
|
memory allocations are word-aligned, the lower two |
|
|
|
|
bits of the shutdown_error pointer are always 0. So |
|
|
|
|
it is safe to OR these with FD_SHUTDOWN_BIT |
|
|
|
|
|
|
|
|
|
Valid state transitions: |
|
|
|
|
|
|
|
|
|
<closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY |
|
|
|
|
| | ^ | ^ | | |
|
|
|
|
| | | | | | | |
|
|
|
|
| +--------------4----------+ 6 +---------2---------------+ | |
|
|
|
|
| | | |
|
|
|
|
| v | |
|
|
|
|
+-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+ |
|
|
|
|
|
|
|
|
|
For 1, 4 : See set_ready() function |
|
|
|
|
For 2, 3 : See notify_on() function |
|
|
|
|
For 5,6,7: See set_shutdown() function */ |
|
|
|
|
gpr_atm read_closure; |
|
|
|
|
gpr_atm write_closure; |
|
|
|
|
|
|
|
|
|
struct grpc_fd *freelist_next; |
|
|
|
|
grpc_closure *on_done_closure; |
|
|
|
|
|
|
|
|
|
/* The pollset that last noticed that the fd is readable */ |
|
|
|
|
grpc_pollset *read_notifier_pollset; |
|
|
|
|
/* The pollset that last noticed that the fd is readable. The actual type
|
|
|
|
|
* stored in this is (grpc_pollset *) */ |
|
|
|
|
gpr_atm read_notifier_pollset; |
|
|
|
|
|
|
|
|
|
grpc_iomgr_object iomgr_object; |
|
|
|
|
}; |
|
|
|
@ -180,8 +217,10 @@ static void fd_unref(grpc_fd *fd); |
|
|
|
|
static void fd_global_init(void); |
|
|
|
|
static void fd_global_shutdown(void); |
|
|
|
|
|
|
|
|
|
#define CLOSURE_NOT_READY ((grpc_closure *)0) |
|
|
|
|
#define CLOSURE_READY ((grpc_closure *)1) |
|
|
|
|
#define CLOSURE_NOT_READY ((gpr_atm)0) |
|
|
|
|
#define CLOSURE_READY ((gpr_atm)2) |
|
|
|
|
|
|
|
|
|
#define FD_SHUTDOWN_BIT 1 |
|
|
|
|
|
|
|
|
|
/*******************************************************************************
|
|
|
|
|
* Polling island Declarations |
|
|
|
@ -908,7 +947,11 @@ static void unref_by(grpc_fd *fd, int n) { |
|
|
|
|
fd->freelist_next = fd_freelist; |
|
|
|
|
fd_freelist = fd; |
|
|
|
|
grpc_iomgr_unregister_object(&fd->iomgr_object); |
|
|
|
|
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error); |
|
|
|
|
|
|
|
|
|
grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error); |
|
|
|
|
/* Clear the least significant bit if it set (in case fd was shutdown) */ |
|
|
|
|
err = (grpc_error *)((intptr_t)err & ~FD_SHUTDOWN_BIT); |
|
|
|
|
GRPC_ERROR_UNREF(err); |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&fd_freelist_mu); |
|
|
|
|
} else { |
|
|
|
@ -972,13 +1015,14 @@ static grpc_fd *fd_create(int fd, const char *name) { |
|
|
|
|
|
|
|
|
|
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); |
|
|
|
|
new_fd->fd = fd; |
|
|
|
|
new_fd->shutdown = false; |
|
|
|
|
gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE); |
|
|
|
|
new_fd->orphaned = false; |
|
|
|
|
new_fd->read_closure = CLOSURE_NOT_READY; |
|
|
|
|
new_fd->write_closure = CLOSURE_NOT_READY; |
|
|
|
|
gpr_atm_no_barrier_store(&new_fd->read_closure, CLOSURE_NOT_READY); |
|
|
|
|
gpr_atm_no_barrier_store(&new_fd->write_closure, CLOSURE_NOT_READY); |
|
|
|
|
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); |
|
|
|
|
|
|
|
|
|
new_fd->freelist_next = NULL; |
|
|
|
|
new_fd->on_done_closure = NULL; |
|
|
|
|
new_fd->read_notifier_pollset = NULL; |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&new_fd->po.mu); |
|
|
|
|
|
|
|
|
@ -1060,101 +1104,206 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_error *fd_shutdown_error(grpc_fd *fd) { |
|
|
|
|
if (!fd->shutdown) { |
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} else { |
|
|
|
|
return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1); |
|
|
|
|
static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
while (true) { |
|
|
|
|
/* Fast-path: CLOSURE_NOT_READY -> <closure>.
|
|
|
|
|
The 'release' cas here matches the 'acquire' load in set_ready and |
|
|
|
|
set_shutdown ensuring that the closure (scheduled by set_ready or |
|
|
|
|
set_shutdown) happens-after the I/O event on the fd */ |
|
|
|
|
if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { |
|
|
|
|
return; /* Fast-path successful. Return */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Slowpath. The 'acquire' load matches the 'release' cas in set_ready and
|
|
|
|
|
set_shutdown */ |
|
|
|
|
gpr_atm curr = gpr_atm_acq_load(state); |
|
|
|
|
switch (curr) { |
|
|
|
|
case CLOSURE_NOT_READY: { |
|
|
|
|
break; /* retry */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case CLOSURE_READY: { |
|
|
|
|
/* Change the state to CLOSURE_NOT_READY. Schedule the closure if
|
|
|
|
|
successful. If not, the state most likely transitioned to shutdown. |
|
|
|
|
We should retry. |
|
|
|
|
|
|
|
|
|
This can be a no-barrier cas since the state is being transitioned to |
|
|
|
|
CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any |
|
|
|
|
closure when transitioning out of CLOSURE_NO_READY state (i.e there |
|
|
|
|
is no other code that needs to 'happen-after' this) */ |
|
|
|
|
if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { |
|
|
|
|
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); |
|
|
|
|
return; /* Slow-path successful. Return */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
break; /* retry */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
default: { |
|
|
|
|
/* 'curr' is either a closure or the fd is shutdown(in which case 'curr'
|
|
|
|
|
contains a pointer to the shutdown-error). If the fd is shutdown, |
|
|
|
|
schedule the closure with the shutdown error */ |
|
|
|
|
if ((curr & FD_SHUTDOWN_BIT) > 0) { |
|
|
|
|
grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT); |
|
|
|
|
grpc_closure_sched( |
|
|
|
|
exec_ctx, closure, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* There is already a closure!. This indicates a bug in the code */ |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"notify_on called with a previous callback still pending"); |
|
|
|
|
abort(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_UNREACHABLE_CODE(return ); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure **st, grpc_closure *closure) { |
|
|
|
|
if (fd->shutdown) { |
|
|
|
|
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown")); |
|
|
|
|
} else if (*st == CLOSURE_NOT_READY) { |
|
|
|
|
/* not ready ==> switch to a waiting state by setting the closure */ |
|
|
|
|
*st = closure; |
|
|
|
|
} else if (*st == CLOSURE_READY) { |
|
|
|
|
/* already ready ==> queue the closure to run immediately */ |
|
|
|
|
*st = CLOSURE_NOT_READY; |
|
|
|
|
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd)); |
|
|
|
|
} else { |
|
|
|
|
/* upcallptr was set to a different closure. This is an error! */ |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"User called a notify_on function with a previous callback still " |
|
|
|
|
"pending"); |
|
|
|
|
abort(); |
|
|
|
|
static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, |
|
|
|
|
grpc_error *shutdown_err) { |
|
|
|
|
/* Try the fast-path first (i.e expect the current value to be
|
|
|
|
|
CLOSURE_NOT_READY */ |
|
|
|
|
gpr_atm curr = CLOSURE_NOT_READY; |
|
|
|
|
gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT; |
|
|
|
|
|
|
|
|
|
while (true) { |
|
|
|
|
/* The 'release' cas here matches the 'acquire' load in notify_on to ensure
|
|
|
|
|
that the closure it schedules 'happens-after' the set_shutdown is called |
|
|
|
|
on the fd */ |
|
|
|
|
if (gpr_atm_rel_cas(state, curr, new_state)) { |
|
|
|
|
return; /* Fast-path successful. Return */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Fallback to slowpath. This 'acquire' load matches the 'release' cas in
|
|
|
|
|
notify_on and set_ready */ |
|
|
|
|
curr = gpr_atm_acq_load(state); |
|
|
|
|
switch (curr) { |
|
|
|
|
case CLOSURE_READY: { |
|
|
|
|
break; /* retry */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case CLOSURE_NOT_READY: { |
|
|
|
|
break; /* retry */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
default: { |
|
|
|
|
/* 'curr' is either a closure or the fd is already shutdown */ |
|
|
|
|
|
|
|
|
|
/* If fd is already shutdown, we are done */ |
|
|
|
|
if ((curr & FD_SHUTDOWN_BIT) > 0) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Fd is not shutdown. Schedule the closure and move the state to
|
|
|
|
|
shutdown state. The 'release' cas here matches the 'acquire' load in |
|
|
|
|
notify_on to ensure that the closure it schedules 'happens-after' |
|
|
|
|
the set_shutdown is called on the fd */ |
|
|
|
|
if (gpr_atm_rel_cas(state, curr, new_state)) { |
|
|
|
|
grpc_closure_sched( |
|
|
|
|
exec_ctx, (grpc_closure *)curr, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* 'curr' was a closure but now changed to a different state. We will
|
|
|
|
|
have to retry */ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_UNREACHABLE_CODE(return ); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* returns 1 if state becomes not ready */ |
|
|
|
|
static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure **st) { |
|
|
|
|
if (*st == CLOSURE_READY) { |
|
|
|
|
/* duplicate ready ==> ignore */ |
|
|
|
|
return 0; |
|
|
|
|
} else if (*st == CLOSURE_NOT_READY) { |
|
|
|
|
/* not ready, and not waiting ==> flag ready */ |
|
|
|
|
*st = CLOSURE_READY; |
|
|
|
|
return 0; |
|
|
|
|
} else { |
|
|
|
|
/* waiting ==> queue closure */ |
|
|
|
|
grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd)); |
|
|
|
|
*st = CLOSURE_NOT_READY; |
|
|
|
|
return 1; |
|
|
|
|
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) { |
|
|
|
|
/* Try an optimistic case first (i.e assume current state is
|
|
|
|
|
CLOSURE_NOT_READY). |
|
|
|
|
|
|
|
|
|
This 'release' cas matches the 'acquire' load in notify_on ensuring that |
|
|
|
|
any closure (scheduled by notify_on) 'happens-after' the return from |
|
|
|
|
epoll_pwait */ |
|
|
|
|
if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { |
|
|
|
|
return; /* early out */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* The 'acquire' load here matches the 'release' cas in notify_on and
|
|
|
|
|
set_shutdown */ |
|
|
|
|
gpr_atm curr = gpr_atm_acq_load(state); |
|
|
|
|
switch (curr) { |
|
|
|
|
case CLOSURE_READY: { |
|
|
|
|
/* Already ready. We are done here */ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case CLOSURE_NOT_READY: { |
|
|
|
|
/* The state was not CLOSURE_NOT_READY when we checked initially at the
|
|
|
|
|
beginning of this function but now it is CLOSURE_NOT_READY again. |
|
|
|
|
This is only possible if the state transitioned out of |
|
|
|
|
CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then |
|
|
|
|
back to CLOSURE_NOT_READY again (i.e after we entered this function, |
|
|
|
|
the fd became "ready" and the necessary actions were already done). |
|
|
|
|
So there is no need to make the state CLOSURE_READY now */ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
default: { |
|
|
|
|
/* 'curr' is either a closure or the fd is shutdown */ |
|
|
|
|
if ((curr & FD_SHUTDOWN_BIT) > 0) { |
|
|
|
|
/* The fd is shutdown. Do nothing */ |
|
|
|
|
} else if (gpr_atm_no_barrier_cas(state, curr, CLOSURE_NOT_READY)) { |
|
|
|
|
/* The cas above was no-barrier since the state is being transitioned to
|
|
|
|
|
CLOSURE_NOT_READY; notify_on and set_shutdown do not schedule any |
|
|
|
|
closures when transitioning out of CLOSURE_NO_READY state (i.e there |
|
|
|
|
is no other code that needs to 'happen-after' this) */ |
|
|
|
|
|
|
|
|
|
grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
/* else the state changed again (only possible by either a racing
|
|
|
|
|
set_ready or set_shutdown functions. In both these cases, the closure |
|
|
|
|
would have been scheduled for execution. So we are done here */ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_fd *fd) { |
|
|
|
|
grpc_pollset *notifier = NULL; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&fd->po.mu); |
|
|
|
|
notifier = fd->read_notifier_pollset; |
|
|
|
|
gpr_mu_unlock(&fd->po.mu); |
|
|
|
|
|
|
|
|
|
return notifier; |
|
|
|
|
gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset); |
|
|
|
|
return (grpc_pollset *)notifier; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool fd_is_shutdown(grpc_fd *fd) { |
|
|
|
|
gpr_mu_lock(&fd->po.mu); |
|
|
|
|
const bool r = fd->shutdown; |
|
|
|
|
gpr_mu_unlock(&fd->po.mu); |
|
|
|
|
return r; |
|
|
|
|
grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error); |
|
|
|
|
return (((intptr_t)err & FD_SHUTDOWN_BIT) > 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Might be called multiple times */ |
|
|
|
|
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { |
|
|
|
|
gpr_mu_lock(&fd->po.mu); |
|
|
|
|
/* Do the actual shutdown only once */ |
|
|
|
|
if (!fd->shutdown) { |
|
|
|
|
fd->shutdown = true; |
|
|
|
|
fd->shutdown_error = why; |
|
|
|
|
|
|
|
|
|
/* Store the shutdown error ORed with FD_SHUTDOWN_BIT in fd->shutdown_error */ |
|
|
|
|
if (gpr_atm_rel_cas(&fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE, |
|
|
|
|
(gpr_atm)why | FD_SHUTDOWN_BIT)) { |
|
|
|
|
shutdown(fd->fd, SHUT_RDWR); |
|
|
|
|
/* Flush any pending read and write closures. Since fd->shutdown is 'true'
|
|
|
|
|
at this point, the closures would be called with 'success = false' */ |
|
|
|
|
set_ready_locked(exec_ctx, fd, &fd->read_closure); |
|
|
|
|
set_ready_locked(exec_ctx, fd, &fd->write_closure); |
|
|
|
|
|
|
|
|
|
set_shutdown(exec_ctx, fd, &fd->read_closure, why); |
|
|
|
|
set_shutdown(exec_ctx, fd, &fd->write_closure, why); |
|
|
|
|
} else { |
|
|
|
|
/* Shutdown already called */ |
|
|
|
|
GRPC_ERROR_UNREF(why); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&fd->po.mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
gpr_mu_lock(&fd->po.mu); |
|
|
|
|
notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); |
|
|
|
|
gpr_mu_unlock(&fd->po.mu); |
|
|
|
|
notify_on(exec_ctx, fd, &fd->read_closure, closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
gpr_mu_lock(&fd->po.mu); |
|
|
|
|
notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); |
|
|
|
|
gpr_mu_unlock(&fd->po.mu); |
|
|
|
|
notify_on(exec_ctx, fd, &fd->write_closure, closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { |
|
|
|
@ -1343,18 +1492,19 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, |
|
|
|
|
|
|
|
|
|
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
grpc_pollset *notifier) { |
|
|
|
|
/* Need the fd->po.mu since we might be racing with fd_notify_on_read */ |
|
|
|
|
gpr_mu_lock(&fd->po.mu); |
|
|
|
|
set_ready_locked(exec_ctx, fd, &fd->read_closure); |
|
|
|
|
fd->read_notifier_pollset = notifier; |
|
|
|
|
gpr_mu_unlock(&fd->po.mu); |
|
|
|
|
set_ready(exec_ctx, fd, &fd->read_closure); |
|
|
|
|
|
|
|
|
|
/* Note, it is possible that fd_become_readable might be called twice with
|
|
|
|
|
different 'notifier's when an fd becomes readable and it is in two epoll |
|
|
|
|
sets (This can happen briefly during polling island merges). In such cases |
|
|
|
|
it does not really matter which notifer is set as the read_notifier_pollset |
|
|
|
|
(They would both point to the same polling island anyway) */ |
|
|
|
|
/* Use release store to match with acquire load in fd_get_read_notifier */ |
|
|
|
|
gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { |
|
|
|
|
/* Need the fd->po.mu since we might be racing with fd_notify_on_write */ |
|
|
|
|
gpr_mu_lock(&fd->po.mu); |
|
|
|
|
set_ready_locked(exec_ctx, fd, &fd->write_closure); |
|
|
|
|
gpr_mu_unlock(&fd->po.mu); |
|
|
|
|
set_ready(exec_ctx, fd, &fd->write_closure); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, |
|
|
|
@ -1737,7 +1887,7 @@ retry: |
|
|
|
|
(void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type), |
|
|
|
|
(void *)bag); |
|
|
|
|
/* No need to lock 'pi_new' here since this is a new polling island
|
|
|
|
|
* and no one has a reference to it yet */ |
|
|
|
|
and no one has a reference to it yet */ |
|
|
|
|
polling_island_remove_all_fds_locked(pi_new, true, &error); |
|
|
|
|
|
|
|
|
|
/* Ref and unref so that the polling island gets deleted during unref
|
|
|
|
|