Merge pull request #4451 from ctiller/proxy-crash

Fix proxy end2end tests
pull/4465/head
Yang Gao 9 years ago
commit 29c3ca5a3a
  1. 4
      src/core/surface/call.c
  2. 2
      src/core/surface/channel_connectivity.c
  3. 2
      src/core/surface/channel_ping.c
  4. 40
      src/core/surface/completion_queue.c
  5. 8
      src/core/surface/completion_queue.h
  6. 6
      src/core/surface/server.c
  7. 10
      src/core/transport/chttp2/internal.h
  8. 20
      src/core/transport/chttp2/stream_lists.c
  9. 9
      src/core/transport/chttp2/writing.c
  10. 28
      src/core/transport/chttp2_transport.c
  11. 10
      test/core/surface/completion_queue_test.c
  12. 4
      test/core/util/test_config.c
  13. 4
      tools/run_tests/jobset.py

@ -1119,7 +1119,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
bctl->success = 1;
if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq);
grpc_cq_begin_op(call->cq, notify_tag);
}
gpr_mu_unlock(&call->mu);
post_batch_completion(exec_ctx, bctl);
@ -1334,7 +1334,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq);
grpc_cq_begin_op(call->cq, notify_tag);
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);

@ -189,7 +189,7 @@ void grpc_channel_watch_connectivity_state(
7, (channel, (int)last_observed_state, (long long)deadline.tv_sec,
(int)deadline.tv_nsec, (int)deadline.clock_type, cq, tag));
grpc_cq_begin_op(cq);
grpc_cq_begin_op(cq, tag);
gpr_mu_init(&w->mu);
grpc_closure_init(&w->on_complete, watch_complete, w);

@ -73,7 +73,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
grpc_closure_init(&pr->closure, ping_done, pr);
op.send_ping = &pr->closure;
op.bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq);
grpc_cq_begin_op(cq, tag);
top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op);
grpc_exec_ctx_finish(&exec_ctx);
}

@ -73,6 +73,12 @@ struct grpc_completion_queue {
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
#ifndef NDEBUG
void **outstanding_tags;
size_t outstanding_tag_count;
size_t outstanding_tag_capacity;
#endif
grpc_completion_queue *next_free;
};
@ -89,6 +95,9 @@ void grpc_cq_global_shutdown(void) {
while (g_freelist) {
grpc_completion_queue *next = g_freelist->next_free;
grpc_pollset_destroy(&g_freelist->pollset);
#ifndef NDEBUG
gpr_free(g_freelist->outstanding_tags);
#endif
gpr_free(g_freelist);
g_freelist = next;
}
@ -117,6 +126,10 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc = gpr_malloc(sizeof(grpc_completion_queue));
grpc_pollset_init(&cc->pollset);
#ifndef NDEBUG
cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0;
#endif
} else {
cc = g_freelist;
g_freelist = g_freelist->next_free;
@ -134,6 +147,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->shutdown_called = 0;
cc->is_server_cq = 0;
cc->num_pluckers = 0;
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
#endif
grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc);
GPR_TIMER_END("grpc_completion_queue_create", 0);
@ -176,10 +192,17 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
}
}
void grpc_cq_begin_op(grpc_completion_queue *cc) {
void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
#ifndef NDEBUG
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown_called);
if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) {
cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity);
cc->outstanding_tags =
gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) *
cc->outstanding_tag_capacity);
}
cc->outstanding_tags[cc->outstanding_tag_count++] = tag;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
#endif
gpr_ref(&cc->pending_events);
@ -196,6 +219,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
int shutdown;
int i;
grpc_pollset_worker *pluck_worker;
#ifndef NDEBUG
int found = 0;
#endif
GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
@ -206,6 +232,18 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
#ifndef NDEBUG
for (i = 0; i < (int)cc->outstanding_tag_count; i++) {
if (cc->outstanding_tags[i] == tag) {
cc->outstanding_tag_count--;
GPR_SWAP(void *, cc->outstanding_tags[i],
cc->outstanding_tags[cc->outstanding_tag_count]);
found = 1;
break;
}
}
GPR_ASSERT(found);
#endif
shutdown = gpr_unref(&cc->pending_events);
if (!shutdown) {
cc->completed_tail->next =

@ -68,10 +68,12 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc);
#endif
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
void grpc_cq_begin_op(grpc_completion_queue *cc);
shutdown until a corrensponding grpc_cq_end_* call is made.
\a tag is currently used only in debug builds. */
void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation */
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
void *tag, int success,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,

@ -1007,7 +1007,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
grpc_cq_begin_op(cq);
grpc_cq_begin_op(cq, tag);
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
@ -1176,7 +1176,7 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
grpc_cq_begin_op(cq_for_notification);
grpc_cq_begin_op(cq_for_notification, tag);
details->reserved = NULL;
rc->type = BATCH_CALL;
rc->server = server;
@ -1213,7 +1213,7 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
grpc_cq_begin_op(cq_for_notification);
grpc_cq_begin_op(cq_for_notification, tag);
rc->type = REGISTERED_CALL;
rc->server = server;
rc->tag = tag;

@ -65,6 +65,7 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITTEN,
GRPC_CHTTP2_LIST_PARSING_SEEN,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
@ -391,8 +392,6 @@ typedef struct {
gpr_uint8 write_closed;
/** is this stream reading half-closed (boolean) */
gpr_uint8 read_closed;
/** is this stream finished closing (and reportably closed) */
gpr_uint8 finished_close;
/** is this stream in the stream map? (boolean) */
gpr_uint8 in_stream_map;
/** has this stream seen an error? if 1, then pending incoming frames
@ -586,6 +585,13 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(

@ -353,6 +353,26 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing(
return r;
}
void grpc_chttp2_list_add_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING);
}
int grpc_chttp2_list_pop_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
void grpc_chttp2_register_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS);

@ -332,17 +332,12 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
!transport_global->is_client, 1);
}
if (stream_writing->sent_initial_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_initial_metadata_finished, 1);
}
if (stream_writing->sent_message) {
GPR_ASSERT(stream_writing->send_message == NULL);
GPR_ASSERT(stream_global->send_message_finished);
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_message_finished, 1);
stream_writing->sent_message = 0;
@ -351,6 +346,10 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_trailing_metadata_finished, 1);
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
!transport_global->is_client, 1);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);

@ -134,6 +134,9 @@ static void connectivity_state_set(
static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global);
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global);
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@ -625,6 +628,7 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
void *transport_writing_ptr, int success) {
grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
grpc_chttp2_stream_global *stream_global;
GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
@ -638,6 +642,11 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) {
fail_pending_writes(exec_ctx, stream_global);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
}
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
t->writing_active = 0;
@ -1107,6 +1116,16 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
}
}
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global) {
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_initial_metadata_finished, 0);
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_trailing_metadata_finished, 0);
grpc_chttp2_complete_closure_step(exec_ctx,
&stream_global->send_message_finished, 0);
}
void grpc_chttp2_mark_stream_closed(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, int close_reads,
@ -1123,6 +1142,13 @@ void grpc_chttp2_mark_stream_closed(
}
if (close_writes && !stream_global->write_closed) {
stream_global->write_closed = 1;
if (TRANSPORT_FROM_GLOBAL(transport_global)->writing_active) {
GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes");
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global,
stream_global);
} else {
fail_pending_writes(exec_ctx, stream_global);
}
}
if (stream_global->read_closed && stream_global->write_closed) {
if (stream_global->id != 0 &&
@ -1134,7 +1160,6 @@ void grpc_chttp2_mark_stream_closed(
remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
stream_global->id);
}
stream_global->finished_close = 1;
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
}
@ -1348,7 +1373,6 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
remove_stream(exec_ctx, t, stream_global->id);
stream_global->finished_close = 1;
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
}

@ -89,7 +89,7 @@ static void test_cq_end_op(void) {
cc = grpc_completion_queue_create(NULL);
grpc_cq_begin_op(cc);
grpc_cq_begin_op(cc, tag);
grpc_cq_end_op(&exec_ctx, cc, tag, 1, do_nothing_end_completion, NULL,
&completion);
@ -148,7 +148,7 @@ static void test_pluck(void) {
cc = grpc_completion_queue_create(NULL);
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc);
grpc_cq_begin_op(cc, tags[i]);
grpc_cq_end_op(&exec_ctx, cc, tags[i], 1, do_nothing_end_completion, NULL,
&completions[i]);
}
@ -160,7 +160,7 @@ static void test_pluck(void) {
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc);
grpc_cq_begin_op(cc, tags[i]);
grpc_cq_end_op(&exec_ctx, cc, tags[i], 1, do_nothing_end_completion, NULL,
&completions[i]);
}
@ -233,7 +233,7 @@ static void test_too_many_plucks(void) {
GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc);
grpc_cq_begin_op(cc, tags[i]);
grpc_cq_end_op(&exec_ctx, cc, tags[i], 1, do_nothing_end_completion, NULL,
&completions[i]);
}
@ -279,7 +279,7 @@ static void producer_thread(void *arg) {
gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
for (i = 0; i < TEST_THREAD_EVENTS; i++) {
grpc_cq_begin_op(opt->cc);
grpc_cq_begin_op(opt->cc, (void *)(gpr_intptr)1);
}
gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);

@ -127,6 +127,8 @@ static void crash_handler(int signum, siginfo_t *info, void *data) {
backtrace_symbols_fd(addrlist, addrlen, STDERR_FILENO);
}
/* try to get a core dump for SIGTERM */
if (signum == SIGTERM) signum = SIGQUIT;
raise(signum);
}
@ -145,6 +147,8 @@ static void install_crash_handler() {
GPR_ASSERT(sigaction(SIGABRT, &sa, NULL) == 0);
GPR_ASSERT(sigaction(SIGBUS, &sa, NULL) == 0);
GPR_ASSERT(sigaction(SIGSEGV, &sa, NULL) == 0);
GPR_ASSERT(sigaction(SIGTERM, &sa, NULL) == 0);
GPR_ASSERT(sigaction(SIGQUIT, &sa, NULL) == 0);
}
#else
static void install_crash_handler() {}

@ -273,7 +273,7 @@ class Job(object):
update_cache.finished(self._spec.identity(), self._bin_hash)
elif self._state == _RUNNING and time.time() - self._start > self._spec.timeout_seconds:
if self._timeout_retries < self._spec.timeout_retries:
message('TIMEOUT_FLAKE', self._spec.shortname, stdout, do_newline=True)
message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout, do_newline=True)
self._timeout_retries += 1
self.result.num_failures += 1
self.result.retries = self._timeout_retries + self._retries
@ -282,7 +282,7 @@ class Job(object):
self._process.terminate()
self.start()
else:
message('TIMEOUT', self._spec.shortname, stdout, do_newline=True)
message('TIMEOUT', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout, do_newline=True)
self.kill()
self.result.state = 'TIMEOUT'
self.result.num_failures += 1

Loading…
Cancel
Save