Add shutdown-state

pull/9683/head
Sree Kuchibhotla 8 years ago
parent 8b8cbed345
commit 91c4da3223
  1. 128
      src/core/lib/iomgr/ev_epoll_linux.c

@ -185,6 +185,7 @@ static void fd_global_shutdown(void);
#define CLOSURE_NOT_READY ((gpr_atm)0)
#define CLOSURE_READY ((gpr_atm)1)
#define CLOSURE_SHUTDOWN ((gpr_atm)2)
/*******************************************************************************
@ -1067,53 +1068,97 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
GRPC_ERROR_UNREF(error);
}
static grpc_error *fd_shutdown_error(grpc_fd *fd) {
grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error1);
if (err != GRPC_ERROR_NONE) {
err = GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &err, 1);
}
return err;
}
static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
grpc_closure *closure) {
bool is_done = false;
while (!is_done) {
is_done = true;
/* Fast-path: CLOSURE_NOT_READY -> <closure> */
if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
// CAS failed because the current value of 'state' is not
// 'CLOSURE_NOT_READY'
/* Fallback to slowpath */
gpr_atm curr = gpr_atm_acq_load(state);
switch (curr) {
case CLOSURE_NOT_READY: {
// The CAS above failed because the state was not 'CLOSURE_NOT_READY'
// but it seems to be back to 'CLOSURE_NOT_READY'. Lets retry CAS
// again
is_done = false;
break;
}
case CLOSURE_READY: {
// Change the state to CLOSURE_NOT_READY and if successful, schedule
// the closure
/* Change the state to CLOSURE_NOT_READY and if successful, schedule
the closure */
if (gpr_atm_rel_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
// Looks like the current state is not CLOSURE_READY anymore. Retry
// from the beginning
/* Looks like the current state is not CLOSURE_READY anymore. Most
likely it transitioned to CLOSURE_NOT_READY. Retry the fast-path
again */
is_done = false;
}
break;
}
default: {
// The current state already contains a closure. This is a fatal error
/* 'curr' is either a closure or the fd is shutdown (in which case
* 'curr' contains a pointer to the shutdown-error) */
if ((curr & CLOSURE_SHUTDOWN) > 0) {
/* FD is shutdown. Schedule the closure with the shutdown error */
grpc_error *shutdown_err = (grpc_error *)(curr & ~CLOSURE_SHUTDOWN);
grpc_closure_sched(
exec_ctx, closure,
GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
} else {
/* There is already a closure!. This indicates a bug in the code */
gpr_log(
GPR_ERROR,
"User called notify_on function with a previous callback still "
"pending");
abort();
}
break;
}
}
}
}
}
static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
grpc_error *shutdown_err) {
/* Try the fast-path first (i.e expect the current value to be
CLOSURE_NOT_READY */
gpr_atm curr = CLOSURE_NOT_READY;
gpr_atm new_state = (gpr_atm)shutdown_err | CLOSURE_SHUTDOWN;
bool is_done = false;
while (!is_done) {
is_done = true;
if (!gpr_atm_acq_cas(state, curr, new_state)) {
/* Fallback to slowpath */
curr = gpr_atm_acq_load(state);
switch (curr) {
case CLOSURE_READY: {
is_done = false;
break;
}
case CLOSURE_NOT_READY: {
is_done = false;
break;
}
default: {
/* 'curr' is either a closure or the fd is already shutdown */
if ((curr & CLOSURE_SHUTDOWN) > 0) {
/* fd is already shutdown. Do nothing */
} else if (gpr_atm_rel_cas(state, curr, new_state)) {
grpc_closure_sched(
exec_ctx, (grpc_closure *)curr,
GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
} else {
/* 'curr' was a closure but now changed to a different state. We
will have to retry */
is_done = false;
}
break;
}
}
@ -1122,8 +1167,8 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
}
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
/* Try the fast-path first (i.e expect current value to be CLOSURE_NOT_READY
* and then try to change it to CLOSURE_READY) */
/* Try the fast-path first (i.e expect the current value to be
* CLOSURE_NOT_READY */
if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
/* Fallback to slowpath */
gpr_atm curr = gpr_atm_acq_load(state);
@ -1135,24 +1180,25 @@ static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
case CLOSURE_NOT_READY: {
/* The state was not CLOSURE_NOT_READY when we checked initially at the
beginning of this function but now it is CLOSURE_NOT_READY. This is
only possible if the state transitioned out of CLOSURE_NOT_READY to
either CLOSURE_READY or <some closure> and then back to
CLOSURE_NOT_READY again (i.e after we entered this function, the fd
became "ready" and the necessary actions were already done). So there
is no need to make the state CLOSURE_READY now */
beginning of this function but now it is CLOSURE_NOT_READY again.
This is only possible if the state transitioned out of
CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then
back to CLOSURE_NOT_READY again (i.e after we entered this function,
the fd became "ready" and the necessary actions were already done).
So there is no need to make the state CLOSURE_READY now */
break;
}
default: {
/* 'curr' is a closure. This closure should be enqueued and the current
state should be changed to CLOSURE_NOT_READY */
if (gpr_atm_rel_cas(state, curr, CLOSURE_NOT_READY)) {
grpc_closure_sched(exec_ctx, (grpc_closure *)*state,
fd_shutdown_error(fd));
} /* else the state changed again. This can only happen due to another
racing set_ready function (which means, we do not have to do
anything else here */
/* 'curr' is either a closure or the fd is shutdown */
if ((curr & CLOSURE_SHUTDOWN) > 0) {
/* The fd is shutdown. Do nothing */
} else if (gpr_atm_rel_cas(state, curr, CLOSURE_NOT_READY)) {
grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
}
/* else the state changed again (only possible by either a racing
set_ready or set_shutdown functions. In both these cases, the closure
would have been scheduled for execution. So we are done here */
break;
}
}
@ -1182,12 +1228,8 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
(gpr_atm)why)) {
shutdown(fd->fd, SHUT_RDWR);
/* Flush any pending read and write closures at this point. Since
fd->shutdown_error1 is set, both the closures would be called with
success = false */
set_ready(exec_ctx, fd, &fd->read_closure);
set_ready(exec_ctx, fd, &fd->write_closure);
set_shutdown(exec_ctx, fd, &fd->read_closure, why);
set_shutdown(exec_ctx, fd, &fd->write_closure, why);
} else {
// Shutdown already called
GRPC_ERROR_UNREF(why);

Loading…
Cancel
Save