Merge pull request #912 from ctiller/disconnect

Deflake fling_test
pull/917/head
Nicolas Noble 10 years ago
commit 961141002a
  1. 26
      Makefile
  2. 14
      build.json
  3. 4
      src/core/channel/connected_channel.c
  4. 7
      src/core/surface/completion_queue.c
  5. 42
      src/core/transport/chttp2_transport.c
  6. 2
      test/core/end2end/no_server_test.c
  7. 4
      test/core/fling/server.c
  8. 1
      tools/buildgen/build-cleaner.py
  9. 2
      tools/run_tests/run_lcov.sh
  10. 4
      tools/run_tests/tests.json

@ -8076,13 +8076,24 @@ $(BINDIR)/$(CONFIG)/qps_client_async: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/qps_client_async: $(QPS_CLIENT_ASYNC_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/qps_client_async: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/qps_client_async: $(PROTOBUF_DEP) $(QPS_CLIENT_ASYNC_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(QPS_CLIENT_ASYNC_OBJS) $(GTEST_LIB) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/qps_client_async
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/qps/qpstest.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
@ -8153,13 +8164,24 @@ $(BINDIR)/$(CONFIG)/qps_server_async: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/qps_server_async: $(QPS_SERVER_ASYNC_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/qps_server_async: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/qps_server_async: $(PROTOBUF_DEP) $(QPS_SERVER_ASYNC_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(QPS_SERVER_ASYNC_OBJS) $(GTEST_LIB) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/qps_server_async
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/qps/qpstest.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a

@ -572,6 +572,7 @@
},
{
"name": "census_statistics_multiple_writers_circular_buffer_test",
"flaky": true,
"build": "test",
"language": "c",
"src": [
@ -582,8 +583,7 @@
"grpc",
"gpr_test_util",
"gpr"
],
"flaky": true
]
},
{
"name": "census_statistics_multiple_writers_test",
@ -629,6 +629,7 @@
},
{
"name": "census_statistics_small_log_test",
"flaky": true,
"build": "test",
"language": "c",
"src": [
@ -639,8 +640,7 @@
"grpc",
"gpr_test_util",
"gpr"
],
"flaky": true
]
},
{
"name": "census_stats_store_test",
@ -868,8 +868,7 @@
"grpc",
"gpr_test_util",
"gpr"
],
"flaky": true
]
},
{
"name": "fling_test",
@ -883,8 +882,7 @@
"grpc",
"gpr_test_util",
"gpr"
],
"flaky": true
]
},
{
"name": "gen_hpack_tables",

@ -48,12 +48,12 @@
/* the protobuf library will (by default) start warning at 100megs */
#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
typedef struct {
typedef struct connected_channel_channel_data {
grpc_transport *transport;
gpr_uint32 max_message_length;
} channel_data;
typedef struct {
typedef struct connected_channel_call_data {
grpc_call_element *elem;
grpc_stream_op_buffer outgoing_sopb;

@ -71,6 +71,7 @@ struct grpc_completion_queue {
grpc_pollset pollset;
/* 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
/* Head of a linked list of queued events (prev points to the last element) */
event *queue;
/* Fixed size chained hash table of events for pluck() */
@ -107,7 +108,6 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
grpc_event_finish_func on_finish, void *user_data) {
event *ev = gpr_malloc(sizeof(event));
gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
GPR_ASSERT(!cc->shutdown);
ev->base.type = type;
ev->base.tag = tag;
ev->base.call = call;
@ -150,6 +150,7 @@ static void end_op_locked(grpc_completion_queue *cc,
#endif
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
}
@ -380,6 +381,10 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->shutdown_called = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
if (gpr_unref(&cc->refs)) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);

@ -309,6 +309,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
static int prepare_callbacks(transport *t);
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
static int prepare_write(transport *t);
static void perform_write(transport *t, grpc_endpoint *ep);
@ -516,13 +517,29 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
static void destroy_transport(grpc_transport *gt) {
transport *t = (transport *)gt;
gpr_mu_lock(&t->mu);
lock(t);
t->destroying = 1;
while (t->calling_back) {
/* Wait for pending stuff to finish.
We need to be not calling back to ensure that closed() gets a chance to
trigger if needed during unlock() before we die.
We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */
while (t->calling_back || t->writing) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
}
t->cb = NULL;
gpr_mu_unlock(&t->mu);
drop_connection(t);
unlock(t);
/* The drop_connection() above puts the transport into an error state, and
the follow-up unlock should then (as part of the cleanup work it does)
ensure that cb is NULL, and therefore not call back anything further.
This check validates this very subtle behavior.
It's shutdown path, so I don't believe an extra lock pair is going to be
problematic for performance. */
lock(t);
GPR_ASSERT(!t->cb);
unlock(t);
unref_transport(t);
}
@ -680,6 +697,7 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
if (id == PENDING_CALLBACKS) GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) {
return;
}
@ -738,7 +756,7 @@ static void unlock(transport *t) {
if (perform_callbacks) {
t->calling_back = 1;
}
if (t->error_state == ERROR_STATE_SEEN) {
if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back = 1;
t->cb = NULL; /* no more callbacks */
@ -772,7 +790,7 @@ static void unlock(transport *t) {
}
if (call_closed) {
cb->closed(t->cb_user_data, &t->base);
call_cb_closed(t, cb);
}
/* write some bytes if necessary */
@ -903,13 +921,16 @@ static void finish_write_common(transport *t, int success) {
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
s->sent_write_closed = 1;
stream_list_join(t, s, PENDING_CALLBACKS);
if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
}
t->outbuf.count = 0;
t->outbuf.length = 0;
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
t->writing = 0;
if (t->destroying) {
gpr_cv_signal(&t->cv);
}
if (!t->reading) {
grpc_endpoint_destroy(t->ep);
t->ep = NULL;
@ -979,7 +1000,8 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
} else {
grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
}
if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) {
if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed &&
!s->published_close) {
stream_list_join(t, s, PENDING_CALLBACKS);
}
@ -1765,6 +1787,10 @@ static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
}
}
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
cb->closed(t->cb_user_data, &t->base);
}
static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
transport *t = (transport *)gt;
lock(t);

@ -41,7 +41,7 @@ static void *tag(gpr_intptr i) { return (void *)i; }
int main(int argc, char **argv) {
grpc_channel *chan;
grpc_call *call;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5);
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2);
grpc_completion_queue *cq;
cq_verifier *cqv;
grpc_event *ev;

@ -275,7 +275,7 @@ int main(int argc, char **argv) {
case FLING_SERVER_SEND_STATUS_FOR_STREAMING:
/* Send status and close completed at server */
grpc_call_destroy(call);
request_call();
if (!shutdown_started) request_call();
break;
case FLING_SERVER_READ_FOR_UNARY:
/* Finished payload read for unary. Start all reamaining
@ -288,7 +288,7 @@ int main(int argc, char **argv) {
grpc_byte_buffer_destroy(payload_buffer);
payload_buffer = NULL;
grpc_call_destroy(call);
request_call();
if (!shutdown_started) request_call();
break;
}
break;

@ -41,6 +41,7 @@ _TOP_LEVEL_KEYS = ['settings', 'filegroups', 'libs', 'targets']
_VERSION_KEYS = ['major', 'minor', 'micro', 'build']
_ELEM_KEYS = [
'name',
'flaky',
'build',
'run',
'language',

@ -35,7 +35,7 @@ out=`realpath ${1:-coverage}`
root=`realpath $(dirname $0)/../..`
tmp=`mktemp`
cd $root
tools/run_tests/run_tests.py -c gcov -l c c++
tools/run_tests/run_tests.py -c gcov -l c c++ || true
lcov --capture --directory . --output-file $tmp
genhtml $tmp --output-directory $out
rm $tmp

@ -102,12 +102,12 @@
"name": "fd_posix_test"
},
{
"flaky": true,
"flaky": false,
"language": "c",
"name": "fling_stream_test"
},
{
"flaky": true,
"flaky": false,
"language": "c",
"name": "fling_test"
},

Loading…
Cancel
Save