|
|
|
@ -949,6 +949,8 @@ static void unref_by(grpc_fd *fd, int n) { |
|
|
|
|
grpc_iomgr_unregister_object(&fd->iomgr_object); |
|
|
|
|
|
|
|
|
|
grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error); |
|
|
|
|
/* Clear the least significant bit if it set (in case fd was shutdown) */ |
|
|
|
|
err = (grpc_error *)((intptr_t)err & ~FD_SHUTDOWN_BIT); |
|
|
|
|
GRPC_ERROR_UNREF(err); |
|
|
|
|
|
|
|
|
|
gpr_mu_unlock(&fd_freelist_mu); |
|
|
|
@ -1013,11 +1015,11 @@ static grpc_fd *fd_create(int fd, const char *name) { |
|
|
|
|
|
|
|
|
|
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); |
|
|
|
|
new_fd->fd = fd; |
|
|
|
|
gpr_atm_rel_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE); |
|
|
|
|
gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE); |
|
|
|
|
new_fd->orphaned = false; |
|
|
|
|
gpr_atm_rel_store(&new_fd->read_closure, CLOSURE_NOT_READY); |
|
|
|
|
gpr_atm_rel_store(&new_fd->write_closure, CLOSURE_NOT_READY); |
|
|
|
|
gpr_atm_rel_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); |
|
|
|
|
gpr_atm_no_barrier_store(&new_fd->read_closure, CLOSURE_NOT_READY); |
|
|
|
|
gpr_atm_no_barrier_store(&new_fd->write_closure, CLOSURE_NOT_READY); |
|
|
|
|
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); |
|
|
|
|
|
|
|
|
|
new_fd->freelist_next = NULL; |
|
|
|
|
new_fd->on_done_closure = NULL; |
|
|
|
@ -1104,57 +1106,53 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, |
|
|
|
|
|
|
|
|
|
static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
bool is_done = false; |
|
|
|
|
while (!is_done) { |
|
|
|
|
is_done = true; |
|
|
|
|
while (true) { |
|
|
|
|
/* Fast-path: CLOSURE_NOT_READY -> <closure> */ |
|
|
|
|
if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { |
|
|
|
|
/* Fallback to slowpath */ |
|
|
|
|
gpr_atm curr = gpr_atm_acq_load(state); |
|
|
|
|
switch (curr) { |
|
|
|
|
case CLOSURE_NOT_READY: { |
|
|
|
|
is_done = false; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
if (gpr_atm_acq_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { |
|
|
|
|
return; /* Fast-path successful. Return */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case CLOSURE_READY: { |
|
|
|
|
/* Change the state to CLOSURE_NOT_READY and if successful, schedule
|
|
|
|
|
the closure */ |
|
|
|
|
if (gpr_atm_rel_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { |
|
|
|
|
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); |
|
|
|
|
} else { |
|
|
|
|
/* Looks like the current state is not CLOSURE_READY anymore. Most
|
|
|
|
|
likely it transitioned to CLOSURE_NOT_READY. Retry the fast-path |
|
|
|
|
again */ |
|
|
|
|
is_done = false; |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
/* Slowpath */ |
|
|
|
|
gpr_atm curr = gpr_atm_acq_load(state); |
|
|
|
|
switch (curr) { |
|
|
|
|
case CLOSURE_NOT_READY: { |
|
|
|
|
break; /* retry */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case CLOSURE_READY: { |
|
|
|
|
/* Change the state to CLOSURE_NOT_READY.
|
|
|
|
|
If successful: Schedule the closure |
|
|
|
|
If not: Most likely the state transitioned to CLOSURE_NOT_READY. |
|
|
|
|
Retry the fast-path again */ |
|
|
|
|
if (gpr_atm_rel_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { |
|
|
|
|
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); |
|
|
|
|
return; /* Slow-path successful. Return */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
default: { |
|
|
|
|
/* 'curr' is either a closure or the fd is shutdown (in which case
|
|
|
|
|
* 'curr' contains a pointer to the shutdown-error) */ |
|
|
|
|
if ((curr & FD_SHUTDOWN_BIT) > 0) { |
|
|
|
|
/* FD is shutdown. Schedule the closure with the shutdown error */ |
|
|
|
|
grpc_error *shutdown_err = |
|
|
|
|
(grpc_error *)(curr & ~FD_SHUTDOWN_BIT); |
|
|
|
|
grpc_closure_sched( |
|
|
|
|
exec_ctx, closure, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); |
|
|
|
|
|
|
|
|
|
} else { |
|
|
|
|
/* There is already a closure!. This indicates a bug in the code */ |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_ERROR, |
|
|
|
|
"User called notify_on function with a previous callback still " |
|
|
|
|
"pending"); |
|
|
|
|
abort(); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
break; /* retry */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
default: { |
|
|
|
|
/* 'curr' is either a closure or the fd is shutdown(in which case 'curr'
|
|
|
|
|
contains a pointer to the shutdown-error). If the fd is shutdown, |
|
|
|
|
schedule the closure with the shutdown error */ |
|
|
|
|
if ((curr & FD_SHUTDOWN_BIT) > 0) { |
|
|
|
|
grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT); |
|
|
|
|
grpc_closure_sched( |
|
|
|
|
exec_ctx, closure, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* There is already a closure!. This indicates a bug in the code */ |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"notify_on called with a previous callback still pending"); |
|
|
|
|
abort(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_UNREACHABLE_CODE(return ); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, |
|
|
|
@ -1164,80 +1162,87 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, |
|
|
|
|
gpr_atm curr = CLOSURE_NOT_READY; |
|
|
|
|
gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT; |
|
|
|
|
|
|
|
|
|
bool is_done = false; |
|
|
|
|
while (!is_done) { |
|
|
|
|
is_done = true; |
|
|
|
|
if (!gpr_atm_acq_cas(state, curr, new_state)) { |
|
|
|
|
/* Fallback to slowpath */ |
|
|
|
|
curr = gpr_atm_acq_load(state); |
|
|
|
|
switch (curr) { |
|
|
|
|
case CLOSURE_READY: { |
|
|
|
|
is_done = false; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case CLOSURE_NOT_READY: { |
|
|
|
|
is_done = false; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
default: { |
|
|
|
|
/* 'curr' is either a closure or the fd is already shutdown */ |
|
|
|
|
if ((curr & FD_SHUTDOWN_BIT) > 0) { |
|
|
|
|
/* fd is already shutdown. Do nothing */ |
|
|
|
|
} else if (gpr_atm_rel_cas(state, curr, new_state)) { |
|
|
|
|
grpc_closure_sched( |
|
|
|
|
exec_ctx, (grpc_closure *)curr, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); |
|
|
|
|
} else { |
|
|
|
|
/* 'curr' was a closure but now changed to a different state. We
|
|
|
|
|
will have to retry */ |
|
|
|
|
is_done = false; |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
while (true) { |
|
|
|
|
if (gpr_atm_acq_cas(state, curr, new_state)) { |
|
|
|
|
return; /* Fast-path successful. Return */ |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) { |
|
|
|
|
/* Try the fast-path first (i.e expect the current value to be
|
|
|
|
|
* CLOSURE_NOT_READY */ |
|
|
|
|
if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { |
|
|
|
|
/* Fallback to slowpath */ |
|
|
|
|
gpr_atm curr = gpr_atm_acq_load(state); |
|
|
|
|
curr = gpr_atm_acq_load(state); |
|
|
|
|
switch (curr) { |
|
|
|
|
case CLOSURE_READY: { |
|
|
|
|
/* Already ready. We are done here */ |
|
|
|
|
break; |
|
|
|
|
break; /* retry */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case CLOSURE_NOT_READY: { |
|
|
|
|
/* The state was not CLOSURE_NOT_READY when we checked initially at the
|
|
|
|
|
beginning of this function but now it is CLOSURE_NOT_READY again. |
|
|
|
|
This is only possible if the state transitioned out of |
|
|
|
|
CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then |
|
|
|
|
back to CLOSURE_NOT_READY again (i.e after we entered this function, |
|
|
|
|
the fd became "ready" and the necessary actions were already done). |
|
|
|
|
So there is no need to make the state CLOSURE_READY now */ |
|
|
|
|
break; |
|
|
|
|
break; /* retry */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
default: { |
|
|
|
|
/* 'curr' is either a closure or the fd is shutdown */ |
|
|
|
|
/* 'curr' is either a closure or the fd is already shutdown */ |
|
|
|
|
|
|
|
|
|
/* If fd is already shutdown, we are done */ |
|
|
|
|
if ((curr & FD_SHUTDOWN_BIT) > 0) { |
|
|
|
|
/* The fd is shutdown. Do nothing */ |
|
|
|
|
} else if (gpr_atm_rel_cas(state, curr, CLOSURE_NOT_READY)) { |
|
|
|
|
grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Fd is not shutdown. Schedule the closure and move the state to
|
|
|
|
|
shutdown state */ |
|
|
|
|
if (gpr_atm_rel_cas(state, curr, new_state)) { |
|
|
|
|
grpc_closure_sched( |
|
|
|
|
exec_ctx, (grpc_closure *)curr, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
/* else the state changed again (only possible by either a racing
|
|
|
|
|
set_ready or set_shutdown functions. In both these cases, the closure |
|
|
|
|
would have been scheduled for execution. So we are done here */ |
|
|
|
|
|
|
|
|
|
/* 'curr' was a closure but now changed to a different state. We will
|
|
|
|
|
have to retry */ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} /* else fast-path succeeded. We are done */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_UNREACHABLE_CODE(return ); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) { |
|
|
|
|
/* Try the fast-path first (i.e expect the current value to be
|
|
|
|
|
* CLOSURE_NOT_READY */ |
|
|
|
|
if (gpr_atm_acq_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { |
|
|
|
|
return; /* early out */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gpr_atm curr = gpr_atm_acq_load(state); |
|
|
|
|
switch (curr) { |
|
|
|
|
case CLOSURE_READY: { |
|
|
|
|
/* Already ready. We are done here */ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case CLOSURE_NOT_READY: { |
|
|
|
|
/* The state was not CLOSURE_NOT_READY when we checked initially at the
|
|
|
|
|
beginning of this function but now it is CLOSURE_NOT_READY again. |
|
|
|
|
This is only possible if the state transitioned out of |
|
|
|
|
CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then |
|
|
|
|
back to CLOSURE_NOT_READY again (i.e after we entered this function, |
|
|
|
|
the fd became "ready" and the necessary actions were already done). |
|
|
|
|
So there is no need to make the state CLOSURE_READY now */ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
default: { |
|
|
|
|
/* 'curr' is either a closure or the fd is shutdown */ |
|
|
|
|
if ((curr & FD_SHUTDOWN_BIT) > 0) { |
|
|
|
|
/* The fd is shutdown. Do nothing */ |
|
|
|
|
} else if (gpr_atm_rel_cas(state, curr, CLOSURE_NOT_READY)) { |
|
|
|
|
grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
/* else the state changed again (only possible by either a racing
|
|
|
|
|
set_ready or set_shutdown functions. In both these cases, the closure |
|
|
|
|
would have been scheduled for execution. So we are done here */ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
@ -1248,7 +1253,7 @@ static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, |
|
|
|
|
|
|
|
|
|
static bool fd_is_shutdown(grpc_fd *fd) { |
|
|
|
|
grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error); |
|
|
|
|
return (((intptr_t) err & FD_SHUTDOWN_BIT) > 0); |
|
|
|
|
return (((intptr_t)err & FD_SHUTDOWN_BIT) > 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Might be called multiple times */ |
|
|
|
|