Merge pull request #17308 from yang-g/shutdown

Move grpc_shutdown internals to a detached thread
pull/17979/head
Yang Gao 6 years ago committed by GitHub
commit b03e014ad8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      grpc.def
  2. 13
      include/grpc/grpc.h
  3. 3
      src/core/lib/debug/trace.h
  4. 40
      src/core/lib/gprpp/thd.h
  5. 44
      src/core/lib/gprpp/thd_posix.cc
  6. 54
      src/core/lib/gprpp/thd_windows.cc
  7. 108
      src/core/lib/surface/init.cc
  8. 1
      src/core/lib/surface/init.h
  9. 2
      src/php/ext/grpc/php_grpc.c
  10. 2
      src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
  11. 2
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  12. 2
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  13. 8
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
  14. 2
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  15. 2
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
  16. 2
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  17. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  18. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  19. 3
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  20. 2
      test/core/end2end/fuzzers/api_fuzzer.cc
  21. 10
      test/core/end2end/fuzzers/client_fuzzer.cc
  22. 8
      test/core/end2end/fuzzers/server_fuzzer.cc
  23. 2
      test/core/handshake/readahead_handshaker_server_ssl.cc
  24. 14
      test/core/iomgr/resolve_address_test.cc
  25. 6
      test/core/json/fuzzer.cc
  26. 2
      test/core/memory_usage/client.cc
  27. 2
      test/core/memory_usage/server.cc
  28. 10
      test/core/security/alts_credentials_fuzzer.cc
  29. 10
      test/core/security/ssl_server_fuzzer.cc
  30. 33
      test/core/slice/percent_decode_fuzzer.cc
  31. 40
      test/core/slice/percent_encode_fuzzer.cc
  32. 21
      test/core/surface/init_test.cc
  33. 1
      test/core/surface/public_headers_must_be_c89.c
  34. 31
      test/core/util/memory_counters.cc
  35. 18
      test/core/util/memory_counters.h
  36. 2
      test/core/util/port.cc
  37. 3
      test/core/util/test_config.cc
  38. 2
      test/cpp/naming/address_sorting_test.cc
  39. 16
      test/cpp/util/grpc_tool_test.cc

@ -16,6 +16,7 @@ EXPORTS
grpc_init
grpc_shutdown
grpc_is_initialized
grpc_shutdown_blocking
grpc_version_string
grpc_g_stands_for
grpc_completion_queue_factory_lookup

@ -73,10 +73,11 @@ GRPCAPI void grpc_init(void);
Before it's called, there should haven been a matching invocation to
grpc_init().
No memory is used by grpc after this call returns, nor are any instructions
executing within the grpc library.
Prior to calling, all application owned grpc objects must have been
destroyed. */
The last call to grpc_shutdown will initiate cleaning up of grpc library
internals, which can happen in another thread. Once the clean-up is done,
no memory is used by grpc, nor are any instructions executing within the
grpc library. Prior to calling, all application owned grpc objects must
have been destroyed. */
GRPCAPI void grpc_shutdown(void);
/** EXPERIMENTAL. Returns 1 if the grpc library has been initialized.
@ -85,6 +86,10 @@ GRPCAPI void grpc_shutdown(void);
https://github.com/grpc/grpc/issues/15334 */
GRPCAPI int grpc_is_initialized(void);
/** EXPERIMENTAL. Blocking shut down grpc library.
This is only for wrapped language to use now. */
GRPCAPI void grpc_shutdown_blocking(void);
/** Return a string representing the current version of grpc */
GRPCAPI const char* grpc_version_string(void);

@ -53,7 +53,8 @@ void grpc_tracer_enable_flag(grpc_core::TraceFlag* flag);
class TraceFlag {
public:
TraceFlag(bool default_enabled, const char* name);
// This needs to be trivially destructible as it is used as global variable.
// TraceFlag needs to be trivially destructible since it is used as global
// variable.
~TraceFlag() = default;
const char* name() const { return name_; }

@ -47,6 +47,27 @@ class ThreadInternalsInterface {
class Thread {
public:
class Options {
public:
Options() : joinable_(true), tracked_(true) {}
/// Set whether the thread is joinable or detached.
Options& set_joinable(bool joinable) {
joinable_ = joinable;
return *this;
}
bool joinable() const { return joinable_; }
/// Set whether the thread is tracked for fork support.
Options& set_tracked(bool tracked) {
tracked_ = tracked;
return *this;
}
bool tracked() const { return tracked_; }
private:
bool joinable_;
bool tracked_;
};
/// Default constructor only to allow use in structs that lack constructors
/// Does not produce a validly-constructed thread; must later
/// use placement new to construct a real thread. Does not init mu_ and cv_
@ -57,14 +78,17 @@ class Thread {
/// with argument \a arg once it is started.
/// The optional \a success argument indicates whether the thread
/// is successfully created.
/// The optional \a options can be used to set the thread detachable.
Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success = nullptr);
bool* success = nullptr, const Options& options = Options());
/// Move constructor for thread. After this is called, the other thread
/// no longer represents a living thread object
Thread(Thread&& other) : state_(other.state_), impl_(other.impl_) {
Thread(Thread&& other)
: state_(other.state_), impl_(other.impl_), options_(other.options_) {
other.state_ = MOVED;
other.impl_ = nullptr;
other.options_ = Options();
}
/// Move assignment operator for thread. After this is called, the other
@ -79,8 +103,10 @@ class Thread {
// assert it for the time being.
state_ = other.state_;
impl_ = other.impl_;
options_ = other.options_;
other.state_ = MOVED;
other.impl_ = nullptr;
other.options_ = Options();
}
return *this;
}
@ -95,11 +121,16 @@ class Thread {
GPR_ASSERT(state_ == ALIVE);
state_ = STARTED;
impl_->Start();
if (!options_.joinable()) {
state_ = DONE;
impl_ = nullptr;
}
} else {
GPR_ASSERT(state_ == FAILED);
}
};
}
// It is only legal to call Join if the Thread is created as joinable.
void Join() {
if (impl_ != nullptr) {
impl_->Join();
@ -119,12 +150,13 @@ class Thread {
/// FAKE -- just a dummy placeholder Thread created by the default constructor
/// ALIVE -- an actual thread of control exists associated with this thread
/// STARTED -- the thread of control has been started
/// DONE -- the thread of control has completed and been joined
/// DONE -- the thread of control has completed and been joined/detached
/// FAILED -- the thread of control never came alive
/// MOVED -- contents were moved out and we're no longer tracking them
enum ThreadState { FAKE, ALIVE, STARTED, DONE, FAILED, MOVED };
ThreadState state_;
internal::ThreadInternalsInterface* impl_;
Options options_;
};
} // namespace grpc_core

@ -44,13 +44,14 @@ struct thd_arg {
void (*body)(void* arg); /* body of a thread */
void* arg; /* argument to a thread */
const char* name; /* name of thread. Can be nullptr. */
bool joinable;
bool tracked;
};
class ThreadInternalsPosix
: public grpc_core::internal::ThreadInternalsInterface {
class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
public:
ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
void* arg, bool* success)
void* arg, bool* success, const Thread::Options& options)
: started_(false) {
gpr_mu_init(&mu_);
gpr_cv_init(&ready_);
@ -63,11 +64,20 @@ class ThreadInternalsPosix
info->body = thd_body;
info->arg = arg;
info->name = thd_name;
grpc_core::Fork::IncThreadCount();
info->joinable = options.joinable();
info->tracked = options.tracked();
if (options.tracked()) {
Fork::IncThreadCount();
}
GPR_ASSERT(pthread_attr_init(&attr) == 0);
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
0);
if (options.joinable()) {
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
0);
} else {
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ==
0);
}
*success =
(pthread_create(&pthread_id_, &attr,
@ -97,8 +107,14 @@ class ThreadInternalsPosix
}
gpr_mu_unlock(&arg.thread->mu_);
if (!arg.joinable) {
Delete(arg.thread);
}
(*arg.body)(arg.arg);
grpc_core::Fork::DecThreadCount();
if (arg.tracked) {
Fork::DecThreadCount();
}
return nullptr;
},
info) == 0);
@ -108,9 +124,11 @@ class ThreadInternalsPosix
if (!(*success)) {
/* don't use gpr_free, as this was allocated using malloc (see above) */
free(info);
grpc_core::Fork::DecThreadCount();
if (options.tracked()) {
Fork::DecThreadCount();
}
}
};
}
~ThreadInternalsPosix() override {
gpr_mu_destroy(&mu_);
@ -136,15 +154,15 @@ class ThreadInternalsPosix
} // namespace
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success) {
bool* success, const Options& options)
: options_(options) {
bool outcome = false;
impl_ =
grpc_core::New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome);
impl_ = New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome, options);
if (outcome) {
state_ = ALIVE;
} else {
state_ = FAILED;
grpc_core::Delete(impl_);
Delete(impl_);
impl_ = nullptr;
}

@ -46,6 +46,7 @@ struct thd_info {
void (*body)(void* arg); /* body of a thread */
void* arg; /* argument to a thread */
HANDLE join_event; /* the join event */
bool joinable; /* whether it is joinable */
};
thread_local struct thd_info* g_thd_info;
@ -53,7 +54,8 @@ thread_local struct thd_info* g_thd_info;
class ThreadInternalsWindows
: public grpc_core::internal::ThreadInternalsInterface {
public:
ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg, bool* success)
ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg, bool* success,
const grpc_core::Thread::Options& options)
: started_(false) {
gpr_mu_init(&mu_);
gpr_cv_init(&ready_);
@ -63,21 +65,24 @@ class ThreadInternalsWindows
info_->thread = this;
info_->body = thd_body;
info_->arg = arg;
info_->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
if (info_->join_event == nullptr) {
gpr_free(info_);
*success = false;
} else {
handle = CreateThread(nullptr, 64 * 1024, thread_body, info_, 0, nullptr);
if (handle == nullptr) {
destroy_thread();
info_->join_event = nullptr;
info_->joinable = options.joinable();
if (info_->joinable) {
info_->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
if (info_->join_event == nullptr) {
gpr_free(info_);
*success = false;
} else {
CloseHandle(handle);
*success = true;
return;
}
}
handle = CreateThread(nullptr, 64 * 1024, thread_body, info_, 0, nullptr);
if (handle == nullptr) {
destroy_thread();
*success = false;
} else {
CloseHandle(handle);
*success = true;
}
}
~ThreadInternalsWindows() override {
@ -107,14 +112,24 @@ class ThreadInternalsWindows
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&g_thd_info->thread->mu_);
if (!g_thd_info->joinable) {
grpc_core::Delete(g_thd_info->thread);
g_thd_info->thread = nullptr;
}
g_thd_info->body(g_thd_info->arg);
BOOL ret = SetEvent(g_thd_info->join_event);
GPR_ASSERT(ret);
if (g_thd_info->joinable) {
BOOL ret = SetEvent(g_thd_info->join_event);
GPR_ASSERT(ret);
} else {
gpr_free(g_thd_info);
}
return 0;
}
void destroy_thread() {
CloseHandle(info_->join_event);
if (info_ != nullptr && info_->joinable) {
CloseHandle(info_->join_event);
}
gpr_free(info_);
}
@ -129,14 +144,15 @@ class ThreadInternalsWindows
namespace grpc_core {
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success) {
bool* success, const Options& options)
: options_(options) {
bool outcome = false;
impl_ = grpc_core::New<ThreadInternalsWindows>(thd_body, arg, &outcome);
impl_ = New<ThreadInternalsWindows>(thd_body, arg, &outcome, options);
if (outcome) {
state_ = ALIVE;
} else {
state_ = FAILED;
grpc_core::Delete(impl_);
Delete(impl_);
impl_ = nullptr;
}

@ -33,6 +33,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
@ -61,10 +62,15 @@ extern void grpc_register_built_in_plugins(void);
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
static int g_initializations;
static gpr_cv* g_shutting_down_cv;
static bool g_shutting_down;
static void do_basic_init(void) {
gpr_log_verbosity_init();
gpr_mu_init(&g_init_mu);
g_shutting_down_cv = static_cast<gpr_cv*>(malloc(sizeof(gpr_cv)));
gpr_cv_init(g_shutting_down_cv);
g_shutting_down = false;
grpc_register_built_in_plugins();
grpc_cq_global_init();
g_initializations = 0;
@ -118,8 +124,12 @@ void grpc_init(void) {
int i;
gpr_once_init(&g_basic_init, do_basic_init);
gpr_mu_lock(&g_init_mu);
grpc_core::MutexLock lock(&g_init_mu);
if (++g_initializations == 1) {
if (g_shutting_down) {
g_shutting_down = false;
gpr_cv_broadcast(g_shutting_down_cv);
}
grpc_core::Fork::GlobalInit();
grpc_fork_handlers_auto_register();
gpr_time_init();
@ -150,50 +160,88 @@ void grpc_init(void) {
grpc_channel_init_finalize();
grpc_iomgr_start();
}
gpr_mu_unlock(&g_init_mu);
GRPC_API_TRACE("grpc_init(void)", 0, ());
}
void grpc_shutdown(void) {
void grpc_shutdown_internal_locked(void) {
int i;
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
{
grpc_core::ExecCtx exec_ctx(0);
grpc_iomgr_shutdown_background_closure();
{
grpc_core::ExecCtx exec_ctx(0);
grpc_iomgr_shutdown_background_closure();
{
grpc_timer_manager_set_threading(
false); // shutdown timer_manager thread
grpc_core::Executor::ShutdownAll();
for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != nullptr) {
g_all_of_the_plugins[i].destroy();
}
grpc_timer_manager_set_threading(false); // shutdown timer_manager thread
grpc_core::Executor::ShutdownAll();
for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != nullptr) {
g_all_of_the_plugins[i].destroy();
}
}
grpc_iomgr_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();
grpc_mdctx_global_shutdown();
grpc_core::HandshakerRegistry::Shutdown();
grpc_slice_intern_shutdown();
grpc_core::channelz::ChannelzRegistry::Shutdown();
grpc_stats_shutdown();
grpc_core::Fork::GlobalShutdown();
}
grpc_core::ExecCtx::GlobalShutdown();
grpc_core::ApplicationCallbackExecCtx::GlobalShutdown();
grpc_iomgr_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();
grpc_mdctx_global_shutdown();
grpc_core::HandshakerRegistry::Shutdown();
grpc_slice_intern_shutdown();
grpc_core::channelz::ChannelzRegistry::Shutdown();
grpc_stats_shutdown();
grpc_core::Fork::GlobalShutdown();
}
grpc_core::ExecCtx::GlobalShutdown();
grpc_core::ApplicationCallbackExecCtx::GlobalShutdown();
g_shutting_down = false;
gpr_cv_broadcast(g_shutting_down_cv);
}
void grpc_shutdown_internal(void* ignored) {
GRPC_API_TRACE("grpc_shutdown_internal", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
// We have released lock from the shutdown thread and it is possible that
// another grpc_init has been called, and do nothing if that is the case.
if (--g_initializations != 0) {
return;
}
grpc_shutdown_internal_locked();
}
void grpc_shutdown(void) {
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
if (--g_initializations == 0) {
g_initializations++;
g_shutting_down = true;
// spawn a detached thread to do the actual clean up in case we are
// currently in an executor thread.
grpc_core::Thread cleanup_thread(
"grpc_shutdown", grpc_shutdown_internal, nullptr, nullptr,
grpc_core::Thread::Options().set_joinable(false).set_tracked(false));
cleanup_thread.Start();
}
}
void grpc_shutdown_blocking(void) {
GRPC_API_TRACE("grpc_shutdown_blocking(void)", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
if (--g_initializations == 0) {
g_shutting_down = true;
grpc_shutdown_internal_locked();
}
gpr_mu_unlock(&g_init_mu);
}
int grpc_is_initialized(void) {
int r;
gpr_once_init(&g_basic_init, do_basic_init);
gpr_mu_lock(&g_init_mu);
grpc_core::MutexLock lock(&g_init_mu);
r = g_initializations > 0;
gpr_mu_unlock(&g_init_mu);
return r;
}
void grpc_maybe_wait_for_async_shutdown(void) {
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(&g_init_mu);
while (g_shutting_down) {
gpr_cv_wait(g_shutting_down_cv, &g_init_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
}

@ -22,5 +22,6 @@
void grpc_register_security_filters(void);
void grpc_security_pre_init(void);
void grpc_security_init(void);
void grpc_maybe_wait_for_async_shutdown(void);
#endif /* GRPC_CORE_LIB_SURFACE_INIT_H */

@ -361,7 +361,7 @@ PHP_MSHUTDOWN_FUNCTION(grpc) {
zend_hash_destroy(&grpc_target_upper_bound_map);
grpc_shutdown_timeval(TSRMLS_C);
grpc_php_shutdown_completion_queue(TSRMLS_C);
grpc_shutdown();
grpc_shutdown_blocking();
GRPC_G(initialized) = 0;
}
return SUCCESS;

@ -87,7 +87,7 @@ cdef class Call:
def __dealloc__(self):
if self.c_call != NULL:
grpc_call_unref(self.c_call)
grpc_shutdown()
grpc_shutdown_blocking()
# The object *should* always be valid from Python. Used for debugging.
@property

@ -399,7 +399,7 @@ cdef _close(Channel channel, grpc_status_code code, object details,
_destroy_c_completion_queue(state.c_connectivity_completion_queue)
grpc_channel_destroy(state.c_channel)
state.c_channel = NULL
grpc_shutdown()
grpc_shutdown_blocking()
state.condition.notify_all()
else:
# Another call to close already completed in the past or is currently

@ -118,4 +118,4 @@ cdef class CompletionQueue:
self.c_completion_queue, c_deadline, NULL)
self._interpret_event(event)
grpc_completion_queue_destroy(self.c_completion_queue)
grpc_shutdown()
grpc_shutdown_blocking()

@ -61,7 +61,7 @@ cdef int _get_metadata(
cdef void _destroy(void *state) with gil:
cpython.Py_DECREF(<object>state)
grpc_shutdown()
grpc_shutdown_blocking()
cdef class MetadataPluginCallCredentials(CallCredentials):
@ -125,7 +125,7 @@ cdef class SSLSessionCacheLRU:
def __dealloc__(self):
if self._cache != NULL:
grpc_ssl_session_cache_destroy(self._cache)
grpc_shutdown()
grpc_shutdown_blocking()
cdef class SSLChannelCredentials(ChannelCredentials):
@ -191,7 +191,7 @@ cdef class ServerCertificateConfig:
def __dealloc__(self):
grpc_ssl_server_certificate_config_destroy(self.c_cert_config)
gpr_free(self.c_ssl_pem_key_cert_pairs)
grpc_shutdown()
grpc_shutdown_blocking()
cdef class ServerCredentials:
@ -207,7 +207,7 @@ cdef class ServerCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_server_credentials_release(self.c_credentials)
grpc_shutdown()
grpc_shutdown_blocking()
cdef const char* _get_c_pem_root_certs(pem_root_certs):
if pem_root_certs is None:

@ -319,7 +319,7 @@ cdef extern from "grpc/grpc.h":
grpc_op_data data
void grpc_init() nogil
void grpc_shutdown() nogil
void grpc_shutdown_blocking() nogil
int grpc_is_initialized() nogil
ctypedef struct grpc_completion_queue_factory:

@ -134,7 +134,7 @@ cdef class CallDetails:
def __dealloc__(self):
with nogil:
grpc_call_details_destroy(&self.c_details)
grpc_shutdown()
grpc_shutdown_blocking()
@property
def method(self):

@ -151,4 +151,4 @@ cdef class Server:
def __dealloc__(self):
if self.c_server == NULL:
grpc_shutdown()
grpc_shutdown_blocking()

@ -39,6 +39,7 @@ grpc_register_plugin_type grpc_register_plugin_import;
grpc_init_type grpc_init_import;
grpc_shutdown_type grpc_shutdown_import;
grpc_is_initialized_type grpc_is_initialized_import;
grpc_shutdown_blocking_type grpc_shutdown_blocking_import;
grpc_version_string_type grpc_version_string_import;
grpc_g_stands_for_type grpc_g_stands_for_import;
grpc_completion_queue_factory_lookup_type grpc_completion_queue_factory_lookup_import;
@ -306,6 +307,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_init_import = (grpc_init_type) GetProcAddress(library, "grpc_init");
grpc_shutdown_import = (grpc_shutdown_type) GetProcAddress(library, "grpc_shutdown");
grpc_is_initialized_import = (grpc_is_initialized_type) GetProcAddress(library, "grpc_is_initialized");
grpc_shutdown_blocking_import = (grpc_shutdown_blocking_type) GetProcAddress(library, "grpc_shutdown_blocking");
grpc_version_string_import = (grpc_version_string_type) GetProcAddress(library, "grpc_version_string");
grpc_g_stands_for_import = (grpc_g_stands_for_type) GetProcAddress(library, "grpc_g_stands_for");
grpc_completion_queue_factory_lookup_import = (grpc_completion_queue_factory_lookup_type) GetProcAddress(library, "grpc_completion_queue_factory_lookup");

@ -92,6 +92,9 @@ extern grpc_shutdown_type grpc_shutdown_import;
typedef int(*grpc_is_initialized_type)(void);
extern grpc_is_initialized_type grpc_is_initialized_import;
#define grpc_is_initialized grpc_is_initialized_import
typedef void(*grpc_shutdown_blocking_type)(void);
extern grpc_shutdown_blocking_type grpc_shutdown_blocking_import;
#define grpc_shutdown_blocking grpc_shutdown_blocking_import
typedef const char*(*grpc_version_string_type)(void);
extern grpc_version_string_type grpc_version_string_import;
#define grpc_version_string grpc_version_string_import

@ -18,6 +18,7 @@
#include <cstring>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
@ -281,7 +282,7 @@ int main(int argc, char** argv) {
grpc_core::ExecCtx exec_ctx;
GRPC_COMBINER_UNREF(g_combiner, "test");
}
grpc_shutdown();
grpc_shutdown_blocking();
GPR_ASSERT(g_all_callbacks_invoked);
return 0;
}

@ -1200,6 +1200,6 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_resource_quota_unref(g_resource_quota);
grpc_shutdown();
grpc_shutdown_blocking();
return 0;
}

@ -40,9 +40,8 @@ static void dont_log(gpr_log_func_args* args) {}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_test_only_set_slice_hash_seed(0);
struct grpc_memory_counters counters;
if (squelch) gpr_set_log_function(dont_log);
if (leak_check) grpc_memory_counters_init();
grpc_core::testing::LeakDetector leak_detector(leak_check);
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
@ -159,11 +158,6 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_byte_buffer_destroy(response_payload_recv);
}
}
grpc_shutdown();
if (leak_check) {
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
}
grpc_shutdown_blocking();
return 0;
}

@ -37,9 +37,8 @@ static void dont_log(gpr_log_func_args* args) {}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_test_only_set_slice_hash_seed(0);
struct grpc_memory_counters counters;
if (squelch) gpr_set_log_function(dont_log);
if (leak_check) grpc_memory_counters_init();
grpc_core::testing::LeakDetector leak_detector(leak_check);
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
@ -136,10 +135,5 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_completion_queue_destroy(cq);
}
grpc_shutdown();
if (leak_check) {
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
}
return 0;
}

@ -83,6 +83,6 @@ int main(int argc, char* argv[]) {
UniquePtr<HandshakerFactory>(New<ReadAheadHandshakerFactory>()));
const char* full_alpn_list[] = {"grpc-exp", "h2"};
GPR_ASSERT(server_ssl_test(full_alpn_list, 2, "grpc-exp"));
grpc_shutdown();
grpc_shutdown_blocking();
return 0;
}

@ -323,7 +323,11 @@ static bool mock_ipv6_disabled_source_addr_factory_get_source_addr(
}
void mock_ipv6_disabled_source_addr_factory_destroy(
address_sorting_source_addr_factory* factory) {}
address_sorting_source_addr_factory* factory) {
mock_ipv6_disabled_source_addr_factory* f =
reinterpret_cast<mock_ipv6_disabled_source_addr_factory*>(factory);
gpr_free(f);
}
const address_sorting_source_addr_factory_vtable
kMockIpv6DisabledSourceAddrFactoryVtable = {
@ -390,9 +394,11 @@ int main(int argc, char** argv) {
// Run a test case in which c-ares's address sorter
// thinks that IPv4 is available and IPv6 isn't.
grpc_init();
mock_ipv6_disabled_source_addr_factory factory;
factory.base.vtable = &kMockIpv6DisabledSourceAddrFactoryVtable;
address_sorting_override_source_addr_factory_for_testing(&factory.base);
mock_ipv6_disabled_source_addr_factory* factory =
static_cast<mock_ipv6_disabled_source_addr_factory*>(
gpr_malloc(sizeof(mock_ipv6_disabled_source_addr_factory)));
factory->base.vtable = &kMockIpv6DisabledSourceAddrFactoryVtable;
address_sorting_override_source_addr_factory_for_testing(&factory->base);
test_localhost_result_has_ipv4_first_when_ipv6_isnt_available();
grpc_shutdown();
}

@ -31,8 +31,7 @@ bool leak_check = true;
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
char* s;
struct grpc_memory_counters counters;
grpc_memory_counters_init();
grpc_core::testing::LeakDetector leak_detector(true);
s = static_cast<char*>(gpr_malloc(size));
memcpy(s, data, size);
grpc_json* x;
@ -40,8 +39,5 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_json_destroy(x);
}
gpr_free(s);
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
return 0;
}

@ -285,7 +285,7 @@ int main(int argc, char** argv) {
grpc_slice_unref(slice);
grpc_completion_queue_destroy(cq);
grpc_shutdown();
grpc_shutdown_blocking();
gpr_log(GPR_INFO, "---------client stats--------");
gpr_log(

@ -318,7 +318,7 @@ int main(int argc, char** argv) {
grpc_server_destroy(server);
grpc_completion_queue_destroy(cq);
grpc_shutdown();
grpc_shutdown_blocking();
grpc_memory_counters_destroy();
return 0;
}

@ -66,10 +66,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
gpr_set_log_function(dont_log);
}
gpr_free(grpc_trace_fuzzer);
struct grpc_memory_counters counters;
if (leak_check) {
grpc_memory_counters_init();
}
grpc_core::testing::LeakDetector leak_detector(leak_check);
input_stream inp = {data, data + size};
grpc_init();
bool is_on_gcp = grpc_alts_is_running_on_gcp();
@ -111,10 +108,5 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
gpr_free(handshaker_service_url);
}
grpc_shutdown();
if (leak_check) {
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
}
return 0;
}

@ -52,9 +52,8 @@ static void on_handshake_done(void* arg, grpc_error* error) {
}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
struct grpc_memory_counters counters;
if (squelch) gpr_set_log_function(dont_log);
if (leak_check) grpc_memory_counters_init();
grpc_core::testing::LeakDetector leak_detector(leak_check);
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
@ -118,11 +117,6 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_core::ExecCtx::Get()->Flush();
}
grpc_shutdown();
if (leak_check) {
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
}
grpc_shutdown_blocking();
return 0;
}

@ -31,24 +31,23 @@ bool squelch = true;
bool leak_check = true;
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
struct grpc_memory_counters counters;
grpc_init();
grpc_memory_counters_init();
grpc_slice input = grpc_slice_from_copied_buffer((const char*)data, size);
grpc_slice output;
if (grpc_strict_percent_decode_slice(
input, grpc_url_percent_encoding_unreserved_bytes, &output)) {
grpc_slice_unref(output);
{
grpc_core::testing::LeakDetector leak_detector(true);
grpc_slice input = grpc_slice_from_copied_buffer((const char*)data, size);
grpc_slice output;
if (grpc_strict_percent_decode_slice(
input, grpc_url_percent_encoding_unreserved_bytes, &output)) {
grpc_slice_unref(output);
}
if (grpc_strict_percent_decode_slice(
input, grpc_compatible_percent_encoding_unreserved_bytes,
&output)) {
grpc_slice_unref(output);
}
grpc_slice_unref(grpc_permissive_percent_decode_slice(input));
grpc_slice_unref(input);
}
if (grpc_strict_percent_decode_slice(
input, grpc_compatible_percent_encoding_unreserved_bytes, &output)) {
grpc_slice_unref(output);
}
grpc_slice_unref(grpc_permissive_percent_decode_slice(input));
grpc_slice_unref(input);
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
grpc_shutdown();
GPR_ASSERT(counters.total_size_relative == 0);
grpc_shutdown_blocking();
return 0;
}

@ -31,28 +31,26 @@ bool squelch = true;
bool leak_check = true;
static void test(const uint8_t* data, size_t size, const uint8_t* dict) {
struct grpc_memory_counters counters;
grpc_init();
grpc_memory_counters_init();
grpc_slice input =
grpc_slice_from_copied_buffer(reinterpret_cast<const char*>(data), size);
grpc_slice output = grpc_percent_encode_slice(input, dict);
grpc_slice decoded_output;
// encoder must always produce decodable output
GPR_ASSERT(grpc_strict_percent_decode_slice(output, dict, &decoded_output));
grpc_slice permissive_decoded_output =
grpc_permissive_percent_decode_slice(output);
// and decoded output must always match the input
GPR_ASSERT(grpc_slice_eq(input, decoded_output));
GPR_ASSERT(grpc_slice_eq(input, permissive_decoded_output));
grpc_slice_unref(input);
grpc_slice_unref(output);
grpc_slice_unref(decoded_output);
grpc_slice_unref(permissive_decoded_output);
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
grpc_shutdown();
GPR_ASSERT(counters.total_size_relative == 0);
{
grpc_core::testing::LeakDetector leak_detector(true);
grpc_slice input = grpc_slice_from_copied_buffer(
reinterpret_cast<const char*>(data), size);
grpc_slice output = grpc_percent_encode_slice(input, dict);
grpc_slice decoded_output;
// encoder must always produce decodable output
GPR_ASSERT(grpc_strict_percent_decode_slice(output, dict, &decoded_output));
grpc_slice permissive_decoded_output =
grpc_permissive_percent_decode_slice(output);
// and decoded output must always match the input
GPR_ASSERT(grpc_slice_eq(input, decoded_output));
GPR_ASSERT(grpc_slice_eq(input, permissive_decoded_output));
grpc_slice_unref(input);
grpc_slice_unref(output);
grpc_slice_unref(decoded_output);
grpc_slice_unref(permissive_decoded_output);
}
grpc_shutdown_blocking();
}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {

@ -18,6 +18,9 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/surface/init.h"
#include "test/core/util/test_config.h"
static int g_flag;
@ -30,6 +33,17 @@ static void test(int rounds) {
for (i = 0; i < rounds; i++) {
grpc_shutdown();
}
grpc_maybe_wait_for_async_shutdown();
}
static void test_blocking(int rounds) {
int i;
for (i = 0; i < rounds; i++) {
grpc_init();
}
for (i = 0; i < rounds; i++) {
grpc_shutdown_blocking();
}
}
static void test_mixed(void) {
@ -39,6 +53,7 @@ static void test_mixed(void) {
grpc_init();
grpc_shutdown();
grpc_shutdown();
grpc_maybe_wait_for_async_shutdown();
}
static void plugin_init(void) { g_flag = 1; }
@ -48,7 +63,7 @@ static void test_plugin() {
grpc_register_plugin(plugin_init, plugin_destroy);
grpc_init();
GPR_ASSERT(g_flag == 1);
grpc_shutdown();
grpc_shutdown_blocking();
GPR_ASSERT(g_flag == 2);
}
@ -57,6 +72,7 @@ static void test_repeatedly() {
grpc_init();
grpc_shutdown();
}
grpc_maybe_wait_for_async_shutdown();
}
int main(int argc, char** argv) {
@ -64,6 +80,9 @@ int main(int argc, char** argv) {
test(1);
test(2);
test(3);
test_blocking(1);
test_blocking(2);
test_blocking(3);
test_mixed();
test_plugin();
test_repeatedly();

@ -78,6 +78,7 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_init);
printf("%lx", (unsigned long) grpc_shutdown);
printf("%lx", (unsigned long) grpc_is_initialized);
printf("%lx", (unsigned long) grpc_shutdown_blocking);
printf("%lx", (unsigned long) grpc_version_string);
printf("%lx", (unsigned long) grpc_g_stands_for);
printf("%lx", (unsigned long) grpc_completion_queue_factory_lookup);

@ -16,13 +16,18 @@
*
*/
#include <inttypes.h>
#include <stdint.h>
#include <string.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/surface/init.h"
#include "test/core/util/memory_counters.h"
static struct grpc_memory_counters g_memory_counters;
@ -110,3 +115,29 @@ struct grpc_memory_counters grpc_memory_counters_snapshot() {
NO_BARRIER_LOAD(&g_memory_counters.total_allocs_absolute);
return counters;
}
namespace grpc_core {
namespace testing {
LeakDetector::LeakDetector(bool enable) : enabled_(enable) {
if (enabled_) {
grpc_memory_counters_init();
}
}
LeakDetector::~LeakDetector() {
// Wait for grpc_shutdown() to finish its async work.
grpc_maybe_wait_for_async_shutdown();
if (enabled_) {
struct grpc_memory_counters counters = grpc_memory_counters_snapshot();
if (counters.total_size_relative != 0) {
gpr_log(GPR_ERROR, "Leaking %" PRIuPTR " bytes",
static_cast<uintptr_t>(counters.total_size_relative));
GPR_ASSERT(0);
}
grpc_memory_counters_destroy();
}
}
} // namespace testing
} // namespace grpc_core

@ -32,4 +32,22 @@ void grpc_memory_counters_init();
void grpc_memory_counters_destroy();
struct grpc_memory_counters grpc_memory_counters_snapshot();
namespace grpc_core {
namespace testing {
// At destruction time, it will check there is no memory leak.
// The object should be created before grpc_init() is called and destroyed after
// grpc_shutdown() is returned.
class LeakDetector {
public:
explicit LeakDetector(bool enable);
~LeakDetector();
private:
const bool enabled_;
};
} // namespace testing
} // namespace grpc_core
#endif

@ -66,7 +66,7 @@ static void free_chosen_ports(void) {
for (i = 0; i < num_chosen_ports; i++) {
grpc_free_port_using_server(chosen_ports[i]);
}
grpc_shutdown();
grpc_shutdown_blocking();
gpr_free(chosen_ports);
}

@ -31,6 +31,7 @@
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/surface/init.h"
int64_t g_fixture_slowdown_factor = 1;
int64_t g_poller_slowdown_factor = 1;
@ -405,7 +406,7 @@ TestEnvironment::TestEnvironment(int argc, char** argv) {
grpc_test_init(argc, argv);
}
TestEnvironment::~TestEnvironment() {}
TestEnvironment::~TestEnvironment() { grpc_maybe_wait_for_async_shutdown(); }
} // namespace testing
} // namespace grpc

@ -197,7 +197,7 @@ void VerifyLbAddrOutputs(const grpc_core::ServerAddressList addresses,
class AddressSortingTest : public ::testing::Test {
protected:
void SetUp() override { grpc_init(); }
void TearDown() override { grpc_shutdown(); }
void TearDown() override { grpc_shutdown_blocking(); }
};
/* Tests for rule 1 */

@ -258,14 +258,6 @@ class GrpcToolTest : public ::testing::Test {
void ShutdownServer() { server_->Shutdown(); }
void ExitWhenError(int argc, const char** argv, const CliCredentials& cred,
GrpcToolOutputCallback callback) {
int result = GrpcToolMainLib(argc, argv, cred, callback);
if (result) {
exit(result);
}
}
std::unique_ptr<Server> server_;
TestServiceImpl service_;
reflection::ProtoServerReflectionPlugin plugin_;
@ -418,11 +410,9 @@ TEST_F(GrpcToolTest, TypeNotFound) {
const char* argv[] = {"grpc_cli", "type", server_address.c_str(),
"grpc.testing.DummyRequest"};
EXPECT_DEATH(ExitWhenError(ArraySize(argv), argv, TestCliCredentials(),
std::bind(PrintStream, &output_stream,
std::placeholders::_1)),
".*Type grpc.testing.DummyRequest not found.*");
EXPECT_TRUE(1 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
std::bind(PrintStream, &output_stream,
std::placeholders::_1)));
ShutdownServer();
}

Loading…
Cancel
Save