pull/2151/head
Siddharth Rakesh 10 years ago
commit f8857de339
  1. 66
      Makefile
  2. 31
      build.json
  3. 34
      src/core/iomgr/fd_posix.c
  4. 2
      src/core/transport/chttp2_transport.c
  5. 60
      test/core/end2end/multiple_server_queues_test.c
  6. 63
      test/core/iomgr/fd_conservation_posix_test.c
  7. 28
      tools/run_tests/sources_and_headers.json
  8. 17
      tools/run_tests/tests.json
  9. 9
      vsprojects/Grpc.mak

File diff suppressed because one or more lines are too long

@ -920,6 +920,23 @@
"posix"
]
},
{
"name": "fd_conservation_posix_test",
"build": "test",
"language": "c",
"src": [
"test/core/iomgr/fd_conservation_posix_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr_test_util",
"gpr"
],
"platforms": [
"posix"
]
},
{
"name": "fd_posix_test",
"build": "test",
@ -1531,6 +1548,20 @@
"gpr"
]
},
{
"name": "multiple_server_queues_test",
"build": "test",
"language": "c",
"src": [
"test/core/end2end/multiple_server_queues_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "murmur_hash_test",
"build": "test",

@ -100,6 +100,7 @@ static grpc_fd *alloc_fd(int fd) {
&r->inactive_watcher_root;
r->freelist_next = NULL;
r->read_watcher = r->write_watcher = NULL;
r->on_done_closure = NULL;
return r;
}
@ -138,9 +139,6 @@ static void unref_by(grpc_fd *fd, int n) {
#endif
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
if (fd->on_done_closure) {
grpc_iomgr_add_callback(fd->on_done_closure);
}
grpc_iomgr_unregister_object(&fd->iomgr_object);
freelist_fd(fd);
} else {
@ -199,13 +197,24 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}
}
static int has_watchers(grpc_fd *fd) {
return fd->read_watcher != NULL || fd->write_watcher != NULL || fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
wake_all_watchers_locked(fd);
if (!has_watchers(fd)) {
close(fd->fd);
if (fd->on_done_closure) {
grpc_iomgr_add_callback(fd->on_done_closure);
}
} else {
wake_all_watchers_locked(fd);
}
gpr_mu_unlock(&fd->watcher_mu);
UNREF_BY(fd, 2, reason); /* drop the reference */
}
@ -354,6 +363,13 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
GRPC_FD_REF(fd, "poll");
gpr_mu_lock(&fd->watcher_mu);
/* if we are shutdown, then don't add to the watcher set */
if (gpr_atm_no_barrier_load(&fd->shutdown)) {
watcher->fd = NULL;
watcher->pollset = NULL;
gpr_mu_unlock(&fd->watcher_mu);
return 0;
}
/* if there is nobody polling for read, but we need to, then start doing so */
if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
fd->read_watcher = watcher;
@ -383,6 +399,10 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
int kick = 0;
grpc_fd *fd = watcher->fd;
if (fd == NULL) {
return;
}
gpr_mu_lock(&fd->watcher_mu);
if (watcher == fd->read_watcher) {
/* remove read watcher, kick if we still need a read */
@ -404,6 +424,12 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
if (kick) {
maybe_wake_one_watcher_locked(fd);
}
if (grpc_fd_is_orphaned(fd) && !has_watchers(fd)) {
close(fd->fd);
if (fd->on_done_closure != NULL) {
grpc_iomgr_add_callback(fd->on_done_closure);
}
}
gpr_mu_unlock(&fd->watcher_mu);
GRPC_FD_UNREF(fd, "poll");

@ -431,6 +431,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
GPR_ASSERT(s->global.outgoing_sopb == NULL);
GPR_ASSERT(s->global.publish_sopb == NULL);
grpc_sopb_destroy(&s->writing.sopb);
grpc_sopb_destroy(&s->global.incoming_sopb);
grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.incoming_metadata);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.incoming_metadata);
@ -935,7 +936,6 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
static void reading_action(void *pt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = pt;
gpr_log(GPR_DEBUG, "reading_action");
grpc_endpoint_notify_on_read(t->ep, recv_data, t);
}

@ -0,0 +1,60 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc/grpc.h>
#include "test/core/util/test_config.h"
int main(int argc, char **argv) {
grpc_completion_queue *cq1;
grpc_completion_queue *cq2;
grpc_server *server;
grpc_test_init(argc, argv);
grpc_init();
cq1 = grpc_completion_queue_create();
cq2 = grpc_completion_queue_create();
server = grpc_server_create(NULL);
grpc_server_register_completion_queue(server, cq1);
grpc_server_add_http2_port(server, "[::]:0");
grpc_server_register_completion_queue(server, cq2);
grpc_server_start(server);
grpc_server_shutdown_and_notify(server, cq2, NULL);
grpc_completion_queue_next(cq2, gpr_inf_future); /* cue queue hang */
grpc_completion_queue_shutdown(cq1);
grpc_completion_queue_shutdown(cq2);
grpc_completion_queue_next(cq1, gpr_inf_future);
grpc_completion_queue_next(cq2, gpr_inf_future);
grpc_server_destroy(server);
grpc_shutdown();
return 0;
}

@ -0,0 +1,63 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <sys/resource.h>
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
#include "src/core/iomgr/endpoint_pair.h"
#include "src/core/iomgr/iomgr.h"
int main(int argc, char **argv) {
int i;
struct rlimit rlim;
grpc_endpoint_pair p;
grpc_test_init(argc, argv);
grpc_iomgr_init();
/* set max # of file descriptors to a low value, and
verify we can create and destroy many more than this number
of descriptors */
rlim.rlim_cur = rlim.rlim_max = 10;
GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim));
for (i = 0; i < 100; i++) {
p = grpc_iomgr_create_endpoint_pair("test", 1);
grpc_endpoint_destroy(p.client);
grpc_endpoint_destroy(p.server);
}
grpc_iomgr_shutdown();
return 0;
}

@ -127,6 +127,20 @@
"test/core/end2end/dualstack_socket_test.c"
]
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"language": "c",
"name": "fd_conservation_posix_test",
"src": [
"test/core/iomgr/fd_conservation_posix_test.c"
]
},
{
"deps": [
"gpr",
@ -723,6 +737,20 @@
"test/core/surface/multi_init_test.c"
]
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"language": "c",
"name": "multiple_server_queues_test",
"src": [
"test/core/end2end/multiple_server_queues_test.c"
]
},
{
"deps": [
"gpr",

@ -81,6 +81,14 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "fd_conservation_posix_test",
"platforms": [
"posix"
]
},
{
"flaky": false,
"language": "c",
@ -410,6 +418,15 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "multiple_server_queues_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save