Add asserts that uv calls all run on the same thread

pull/11871/head
murgatroid99 8 years ago
parent 87116f4499
commit b8e07ad780
  1. 1
      BUILD
  2. 1
      build.yaml
  3. 2
      gRPC-Core.podspec
  4. 1
      grpc.gemspec
  5. 1
      package.xml
  6. 4
      src/core/lib/iomgr/iomgr_uv.c
  7. 37
      src/core/lib/iomgr/iomgr_uv.h
  8. 7
      src/core/lib/iomgr/pollset_uv.c
  9. 4
      src/core/lib/iomgr/resolve_address_uv.c
  10. 3
      src/core/lib/iomgr/tcp_client_uv.c
  11. 6
      src/core/lib/iomgr/tcp_server_uv.c
  12. 3
      src/core/lib/iomgr/tcp_uv.c
  13. 4
      src/core/lib/iomgr/timer_uv.c
  14. 1
      tools/doxygen/Doxyfile.core.internal
  15. 2
      tools/run_tests/generated/sources_and_headers.json
  16. 2
      tools/run_tests/run_tests.py
  17. 1
      vsprojects/vcxproj/grpc/grpc.vcxproj
  18. 3
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  19. 1
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
  20. 3
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
  21. 1
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  22. 3
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -597,6 +597,7 @@ grpc_cc_library(
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_internal.h",
"src/core/lib/iomgr/iomgr_posix.h",
"src/core/lib/iomgr/iomgr_uv.h",
"src/core/lib/iomgr/is_epollexclusive_available.h",
"src/core/lib/iomgr/load_file.h",
"src/core/lib/iomgr/lockfree_event.h",

@ -213,6 +213,7 @@ filegroups:
- src/core/lib/iomgr/iomgr.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/iomgr/iomgr_posix.h
- src/core/lib/iomgr/iomgr_uv.h
- src/core/lib/iomgr/is_epollexclusive_available.h
- src/core/lib/iomgr/load_file.h
- src/core/lib/iomgr/lockfree_event.h

@ -276,6 +276,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_internal.h',
'src/core/lib/iomgr/iomgr_posix.h',
'src/core/lib/iomgr/iomgr_uv.h',
'src/core/lib/iomgr/is_epollexclusive_available.h',
'src/core/lib/iomgr/load_file.h',
'src/core/lib/iomgr/lockfree_event.h',
@ -753,6 +754,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_internal.h',
'src/core/lib/iomgr/iomgr_posix.h',
'src/core/lib/iomgr/iomgr_uv.h',
'src/core/lib/iomgr/is_epollexclusive_available.h',
'src/core/lib/iomgr/load_file.h',
'src/core/lib/iomgr/lockfree_event.h',

@ -208,6 +208,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/iomgr.h )
s.files += %w( src/core/lib/iomgr/iomgr_internal.h )
s.files += %w( src/core/lib/iomgr/iomgr_posix.h )
s.files += %w( src/core/lib/iomgr/iomgr_uv.h )
s.files += %w( src/core/lib/iomgr/is_epollexclusive_available.h )
s.files += %w( src/core/lib/iomgr/load_file.h )
s.files += %w( src/core/lib/iomgr/lockfree_event.h )

@ -222,6 +222,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr_internal.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr_uv.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/is_epollexclusive_available.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/load_file.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/lockfree_event.h" role="src" />

@ -22,14 +22,18 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/pollset_uv.h"
#include "src/core/lib/iomgr/tcp_uv.h"
gpr_thd_id grpc_init_thread;
void grpc_iomgr_platform_init(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_global_init();
grpc_register_tracer("tcp", &grpc_tcp_trace);
grpc_executor_set_threading(&exec_ctx, false);
grpc_init_thread = gpr_thd_currentid();
grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_iomgr_platform_flush(void) {}

@ -0,0 +1,37 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_UV_H
#define GRPC_CORE_LIB_IOMGR_IOMGR_UV_H
#include "src/core/lib/iomgr/iomgr_internal.h"
#include <grpc/support/thd.h>
/* The thread ID of the thread on which grpc was initialized. Used to verify
* that all calls into libuv are made on that same thread */
extern gpr_thd_id grpc_init_thread;
#ifdef GRPC_UV_THREAD_CHECK
#define GRPC_ASSERT_SAME_THREAD() \
GPR_ASSERT(gpr_thd_currentid() == grpc_init_thread)
#else
#define GRPC_ASSERT_SAME_THREAD()
#endif /* GRPC_UV_THREAD_CHECK */
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_UV_H */

@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_uv.h"
@ -69,6 +70,7 @@ void grpc_pollset_global_init(void) {
}
void grpc_pollset_global_shutdown(void) {
GRPC_ASSERT_SAME_THREAD();
gpr_mu_destroy(&grpc_polling_mu);
uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb);
}
@ -78,6 +80,7 @@ static void timer_run_cb(uv_timer_t *timer) {}
static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
GRPC_ASSERT_SAME_THREAD();
*mu = &grpc_polling_mu;
uv_timer_init(uv_default_loop(), &pollset->timer);
pollset->shutting_down = 0;
@ -86,6 +89,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(!pollset->shutting_down);
GRPC_ASSERT_SAME_THREAD();
pollset->shutting_down = 1;
if (grpc_pollset_work_run_loop) {
// Drain any pending UV callbacks without blocking
@ -98,6 +102,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GRPC_ASSERT_SAME_THREAD();
uv_close((uv_handle_t *)&pollset->timer, timer_close_cb);
// timer.data is a boolean indicating that the timer has finished closing
pollset->timer.data = (void *)0;
@ -112,6 +117,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
uint64_t timeout;
GRPC_ASSERT_SAME_THREAD();
gpr_mu_unlock(&grpc_polling_mu);
if (grpc_pollset_work_run_loop) {
if (gpr_time_cmp(deadline, now) >= 0) {
@ -140,6 +146,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
GRPC_ASSERT_SAME_THREAD();
uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
return GRPC_ERROR_NONE;
}

@ -30,6 +30,7 @@
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@ -174,6 +175,8 @@ static grpc_error *blocking_resolve_address_impl(
grpc_error *err;
int retry_status;
GRPC_ASSERT_SAME_THREAD();
req.addrinfo = NULL;
err = try_split_host_port(name, default_port, &host, &port);
@ -228,6 +231,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
char *port = NULL;
grpc_error *err;
int s;
GRPC_ASSERT_SAME_THREAD();
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(exec_ctx, on_done, err);

@ -26,6 +26,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/tcp_uv.h"
@ -124,6 +125,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
(void)channel_args;
(void)interested_parties;
GRPC_ASSERT_SAME_THREAD();
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {

@ -28,6 +28,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/tcp_server.h"
@ -107,6 +108,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
}
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
GRPC_ASSERT_SAME_THREAD();
gpr_ref(&s->refs);
return s;
}
@ -173,6 +175,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GRPC_ASSERT_SAME_THREAD();
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
@ -335,6 +338,8 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
int status;
grpc_error *error = GRPC_ERROR_NONE;
GRPC_ASSERT_SAME_THREAD();
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
}
@ -415,6 +420,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_tcp_listener *sp;
(void)pollsets;
(void)pollset_count;
GRPC_ASSERT_SAME_THREAD();
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "SERVER_START %p", server);
}

@ -30,6 +30,7 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/tcp_uv.h"
@ -183,6 +184,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_tcp *tcp = (grpc_tcp *)ep;
int status;
grpc_error *error = GRPC_ERROR_NONE;
GRPC_ASSERT_SAME_THREAD();
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_slices = read_slices;
@ -236,6 +238,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
unsigned int i;
grpc_slice *slice;
uv_write_t *write_req;
GRPC_ASSERT_SAME_THREAD();
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
size_t j;

@ -24,6 +24,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/timer.h"
#include <uv.h>
@ -42,6 +43,7 @@ static void stop_uv_timer(uv_timer_t *handle) {
void run_expired_timer(uv_timer_t *handle) {
grpc_timer *timer = (grpc_timer *)handle->data;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_ASSERT_SAME_THREAD();
GPR_ASSERT(timer->pending);
timer->pending = 0;
GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
@ -54,6 +56,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec now) {
uint64_t timeout;
uv_timer_t *uv_timer;
GRPC_ASSERT_SAME_THREAD();
timer->closure = closure;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->pending = 0;
@ -74,6 +77,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
}
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
GRPC_ASSERT_SAME_THREAD();
if (timer->pending) {
timer->pending = 0;
GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);

@ -1118,6 +1118,7 @@ src/core/lib/iomgr/iomgr_internal.h \
src/core/lib/iomgr/iomgr_posix.c \
src/core/lib/iomgr/iomgr_posix.h \
src/core/lib/iomgr/iomgr_uv.c \
src/core/lib/iomgr/iomgr_uv.h \
src/core/lib/iomgr/iomgr_windows.c \
src/core/lib/iomgr/is_epollexclusive_available.c \
src/core/lib/iomgr/is_epollexclusive_available.h \

@ -7654,6 +7654,7 @@
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_internal.h",
"src/core/lib/iomgr/iomgr_posix.h",
"src/core/lib/iomgr/iomgr_uv.h",
"src/core/lib/iomgr/is_epollexclusive_available.h",
"src/core/lib/iomgr/load_file.h",
"src/core/lib/iomgr/lockfree_event.h",
@ -7812,6 +7813,7 @@
"src/core/lib/iomgr/iomgr_posix.c",
"src/core/lib/iomgr/iomgr_posix.h",
"src/core/lib/iomgr/iomgr_uv.c",
"src/core/lib/iomgr/iomgr_uv.h",
"src/core/lib/iomgr/iomgr_windows.c",
"src/core/lib/iomgr/is_epollexclusive_available.c",
"src/core/lib/iomgr/is_epollexclusive_available.h",

@ -245,7 +245,7 @@ class CLanguage(object):
self._docker_distro, self._make_options = self._compiler_options(self.args.use_docker,
self.args.compiler)
if args.iomgr_platform == "uv":
cflags = '-DGRPC_UV '
cflags = '-DGRPC_UV -DGRPC_UV_THREAD_CHECK'
try:
cflags += subprocess.check_output(['pkg-config', '--cflags', 'libuv']).strip() + ' '
except (subprocess.CalledProcessError, OSError):

@ -333,6 +333,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\load_file.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\lockfree_event.h" />

@ -944,6 +944,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -228,6 +228,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\load_file.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\lockfree_event.h" />

@ -677,6 +677,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -323,6 +323,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\load_file.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\lockfree_event.h" />

@ -854,6 +854,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

Loading…
Cancel
Save