Fixes & debug

pull/7644/head
Craig Tiller 9 years ago
parent 9d01848ef2
commit e7603b887e
  1. 14
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 50
      src/core/lib/iomgr/combiner.c
  3. 9
      src/core/lib/iomgr/tcp_posix.c
  4. 7
      src/core/lib/surface/server.c

@ -1019,6 +1019,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
if (grpc_http_trace) {
char *str = grpc_transport_stream_op_string(op);
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s", str);
gpr_free(str);
}
grpc_closure *on_complete = op->on_complete;
if (on_complete == NULL) {
on_complete = grpc_closure_create(do_nothing, NULL);
@ -1821,6 +1827,10 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error) {
if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
end_all_the_calls(exec_ctx, t, error);
}
@ -2015,10 +2025,6 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
error = GRPC_ERROR_CREATE("Transport closed");
}
if (error != GRPC_ERROR_NONE) {
if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
if (grpc_http_write_state_trace) {

@ -41,6 +41,13 @@
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
int grpc_combiner_trace = 0;
#define COMBINER_TRACE(fn) \
if (grpc_combiner_trace) { \
fn \
}
struct grpc_combiner {
grpc_workqueue *optional_workqueue;
gpr_mpscq queue;
@ -60,10 +67,12 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
gpr_mpscq_init(&lock->queue);
lock->take_async_break_before_final_list = false;
grpc_closure_list_init(&lock->final_list);
gpr_log(GPR_DEBUG, "C:%p create", lock);
return lock;
}
static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
gpr_log(GPR_DEBUG, "C:%p really_destroy", lock);
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
gpr_mpscq_destroy(&lock->queue);
GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner");
@ -71,7 +80,10 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
}
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
if (gpr_atm_full_fetch_add(&lock->state, -1) == 1) {
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1);
gpr_log(GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock,
old_state);
if (old_state == 1) {
really_destroy(exec_ctx, lock);
}
}
@ -83,6 +95,7 @@ static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0);
grpc_combiner *lock = arg;
gpr_log(GPR_DEBUG, "C:%p continue_finishing_mainline", lock);
GPR_ASSERT(exec_ctx->active_combiner == NULL);
exec_ctx->active_combiner = lock;
if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock);
@ -94,14 +107,18 @@ static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg,
static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_TIMER_BEGIN("combiner.execute_final", 0);
grpc_closure *c = lock->final_list.head;
GPR_ASSERT(c != NULL);
grpc_closure_list_init(&lock->final_list);
lock->take_async_break_before_final_list = false;
int loops = 0;
while (c != NULL) {
gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c);
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error;
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
loops++;
}
GPR_TIMER_END("combiner.execute_final", 0);
}
@ -110,6 +127,7 @@ static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_TIMER_BEGIN("combiner.continue_executing_final", 0);
grpc_combiner *lock = arg;
gpr_log(GPR_DEBUG, "C:%p continue_executing_final", lock);
GPR_ASSERT(exec_ctx->active_combiner == NULL);
exec_ctx->active_combiner = lock;
// quick peek to see if new things have turned up on the queue: if so, go back
@ -128,6 +146,9 @@ static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg,
static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_TIMER_BEGIN("combiner.start_execute_final", 0);
GPR_ASSERT(exec_ctx->active_combiner == lock);
gpr_log(GPR_DEBUG,
"C:%p start_execute_final take_async_break_before_final_list=%d",
lock, lock->take_async_break_before_final_list);
if (lock->take_async_break_before_final_list) {
grpc_closure_init(&lock->continue_finishing, continue_executing_final,
lock);
@ -145,6 +166,7 @@ static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0);
gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue);
gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n);
GPR_ASSERT(exec_ctx->active_combiner == lock);
if (n == NULL) {
// queue is in an inconsistant state: use this as a cue that we should
@ -160,16 +182,21 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
grpc_error *error = cl->error;
cl->cb(exec_ctx, cl->cb_arg, error);
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("combiner.maybe_finish_one", 0);
GPR_TIMER_END("combiner.maybe_finish_one", 0);
return true;
}
static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock) =
maybe_finish_one;
GPR_TIMER_BEGIN("combiner.finish",0);
bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock);
GPR_TIMER_BEGIN("combiner.finish", 0);
int loops = 0;
do {
switch (gpr_atm_full_fetch_add(&lock->state, -2)) {
executor = maybe_finish_one;
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2);
gpr_log(GPR_DEBUG, "C:%p finish[%d] old_state=%" PRIdPTR " cl=[%p,%p]",
lock, loops, old_state, lock->final_list.head,
lock->final_list.tail);
switch (old_state) {
case 5: // we're down to one queued item: if it's the final list we
case 4: // should do that
if (!grpc_closure_list_empty(lock->final_list)) {
@ -189,12 +216,14 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
// deleted lock
GPR_UNREACHABLE_CODE(return );
}
loops++;
} while (executor(exec_ctx, lock));
GPR_TIMER_END("combiner.finish", 0);
GPR_TIMER_END("combiner.finish", 0);
}
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *cl, grpc_error *error) {
gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl);
GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2);
GPR_ASSERT(last & 1); // ensure lock has not been destroyed
@ -217,12 +246,15 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error) {
grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
GRPC_ERROR_REF(error), true);
GRPC_ERROR_REF(error), false);
}
void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error,
bool force_async_break) {
gpr_log(GPR_DEBUG,
"C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p",
lock, closure, force_async_break, exec_ctx->active_combiner);
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
@ -239,7 +271,7 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
gpr_atm_full_fetch_add(&lock->state, 2);
}
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
GPR_TIMER_END("combiner.execute_finally", 0);
}
void grpc_combiner_force_async_finally(grpc_combiner *lock) {

@ -379,10 +379,15 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
}
if (!tcp_flush(tcp, &error)) {
gpr_log(GPR_DEBUG, "write: delayed");
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
cb = tcp->write_cb;
tcp->write_cb = NULL;
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "write: %s", str);
grpc_error_free_string(str);
GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
cb->cb(exec_ctx, cb->cb_arg, error);
GPR_TIMER_END("tcp_handle_write.cb", 0);
@ -425,8 +430,12 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
gpr_log(GPR_DEBUG, "write: delayed");
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "write: %s", str);
grpc_error_free_string(str);
grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}

@ -446,7 +446,12 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_channel_get_channel_stack(chand->channel), 0),
op);
GRPC_LOG_IF_ERROR("disconnecting client", error);
if (error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(error);
gpr_log(GPR_INFO, "Disconnected client: %s", msg);
grpc_error_free_string(msg);
}
GRPC_ERROR_UNREF(error);
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {

Loading…
Cancel
Save