Merge pull request #19823 from arjunroy/server_ref

s/gpr_ref/grpc_core::RefCount/ for frequent users.
pull/19824/head
Arjun Roy 5 years ago committed by GitHub
commit 9915319488
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 79
      src/core/lib/iomgr/ev_epollex_linux.cc
  2. 36
      src/core/lib/iomgr/tcp_posix.cc
  3. 26
      src/core/lib/surface/completion_queue.cc
  4. 10
      src/core/lib/surface/server.cc

@ -47,6 +47,7 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
@ -89,7 +90,7 @@ typedef struct pollable pollable;
/// - PO_MULTI - a pollable containing many fds
struct pollable {
pollable_type type; // immutable
gpr_refcount refs;
grpc_core::RefCount refs;
int epfd;
grpc_wakeup_fd wakeup;
@ -135,17 +136,26 @@ static char* pollable_desc(pollable* p) {
static pollable* g_empty_pollable;
static grpc_error* pollable_create(pollable_type type, pollable** p);
#ifdef NDEBUG
static pollable* pollable_ref(pollable* p);
static void pollable_unref(pollable* p);
#define POLLABLE_REF(p, r) pollable_ref(p)
#define POLLABLE_UNREF(p, r) pollable_unref(p)
#else
static pollable* pollable_ref(pollable* p, int line, const char* reason);
static void pollable_unref(pollable* p, int line, const char* reason);
#define POLLABLE_REF(p, r) pollable_ref((p), __LINE__, (r))
#define POLLABLE_UNREF(p, r) pollable_unref((p), __LINE__, (r))
#endif
static pollable* pollable_ref(pollable* p,
const grpc_core::DebugLocation& dbg_loc,
const char* reason) {
p->refs.Ref(dbg_loc, reason);
return p;
}
static void pollable_unref(pollable* p, const grpc_core::DebugLocation& dbg_loc,
const char* reason) {
if (p == nullptr) return;
if (GPR_UNLIKELY(p != nullptr && p->refs.Unref(dbg_loc, reason))) {
GRPC_FD_TRACE("pollable_unref: Closing epfd: %d", p->epfd);
close(p->epfd);
grpc_wakeup_fd_destroy(&p->wakeup);
gpr_mu_destroy(&p->owner_orphan_mu);
gpr_mu_destroy(&p->mu);
gpr_free(p);
}
}
#define POLLABLE_REF(p, r) pollable_ref((p), DEBUG_LOCATION, (r))
#define POLLABLE_UNREF(p, r) pollable_unref((p), DEBUG_LOCATION, (r))
/*******************************************************************************
* Fd Declarations
@ -283,7 +293,7 @@ struct grpc_pollset {
*/
struct grpc_pollset_set {
gpr_refcount refs;
grpc_core::RefCount refs;
gpr_mu mu;
grpc_pollset_set* parent;
@ -568,7 +578,7 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
}
(*p)->type = type;
gpr_ref_init(&(*p)->refs, 1);
new (&(*p)->refs) grpc_core::RefCount(1, &grpc_trace_pollable_refcount);
gpr_mu_init(&(*p)->mu);
(*p)->epfd = epfd;
(*p)->owner_fd = nullptr;
@ -582,41 +592,6 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
return GRPC_ERROR_NONE;
}
#ifdef NDEBUG
static pollable* pollable_ref(pollable* p) {
#else
static pollable* pollable_ref(pollable* p, int line, const char* reason) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_pollable_refcount)) {
int r = static_cast<int> gpr_atm_no_barrier_load(&p->refs.count);
gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
"POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason);
}
#endif
gpr_ref(&p->refs);
return p;
}
#ifdef NDEBUG
static void pollable_unref(pollable* p) {
#else
static void pollable_unref(pollable* p, int line, const char* reason) {
if (p == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_pollable_refcount)) {
int r = static_cast<int> gpr_atm_no_barrier_load(&p->refs.count);
gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
"POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason);
}
#endif
if (p != nullptr && gpr_unref(&p->refs)) {
GRPC_FD_TRACE("pollable_unref: Closing epfd: %d", p->epfd);
close(p->epfd);
grpc_wakeup_fd_destroy(&p->wakeup);
gpr_mu_destroy(&p->owner_orphan_mu);
gpr_mu_destroy(&p->mu);
gpr_free(p);
}
}
static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
grpc_error* error = GRPC_ERROR_NONE;
static const char* err_desc = "pollable_add_fd";
@ -1331,13 +1306,13 @@ static grpc_pollset_set* pollset_set_create(void) {
grpc_pollset_set* pss =
static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pss)));
gpr_mu_init(&pss->mu);
gpr_ref_init(&pss->refs, 1);
new (&pss->refs) grpc_core::RefCount();
return pss;
}
static void pollset_set_unref(grpc_pollset_set* pss) {
if (pss == nullptr) return;
if (!gpr_unref(&pss->refs)) return;
if (GPR_LIKELY(!pss->refs.Unref())) return;
pollset_set_unref(pss->parent);
gpr_mu_destroy(&pss->mu);
for (size_t i = 0; i < pss->pollset_count; i++) {
@ -1528,7 +1503,7 @@ static void pollset_set_add_pollset_set(grpc_pollset_set* a,
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_INFO, "PSS: parent %p to %p", b, a);
}
gpr_ref(&a->refs);
a->refs.Ref();
b->parent = a;
if (a->fd_capacity < a->fd_count + b->fd_count) {
a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count);

@ -89,7 +89,7 @@ struct grpc_tcp {
bool is_first_read;
double target_length;
double bytes_read_this_round;
gpr_refcount refcount;
grpc_core::RefCount refcount;
gpr_atm shutdown_count;
int min_read_chunk_size;
@ -359,41 +359,29 @@ static void tcp_free(grpc_tcp* tcp) {
}
#ifndef NDEBUG
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
int line) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
val - 1);
}
if (gpr_unref(&tcp->refcount)) {
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), DEBUG_LOCATION)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), DEBUG_LOCATION)
static void tcp_unref(grpc_tcp* tcp, const char* reason,
const grpc_core::DebugLocation& debug_location) {
if (GPR_UNLIKELY(tcp->refcount.Unref(debug_location, reason))) {
tcp_free(tcp);
}
}
static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
int line) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
val + 1);
}
gpr_ref(&tcp->refcount);
static void tcp_ref(grpc_tcp* tcp, const char* reason,
const grpc_core::DebugLocation& debug_location) {
tcp->refcount.Ref(debug_location, reason);
}
#else
#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
static void tcp_unref(grpc_tcp* tcp) {
if (gpr_unref(&tcp->refcount)) {
if (GPR_UNLIKELY(tcp->refcount.Unref())) {
tcp_free(tcp);
}
}
static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
static void tcp_ref(grpc_tcp* tcp) { tcp->refcount.Ref(); }
#endif
static void tcp_destroy(grpc_endpoint* ep) {
@ -1230,7 +1218,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->ts_capable = true;
tcp->outgoing_buffer_arg = nullptr;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
new (&tcp->refcount) grpc_core::RefCount(1, &grpc_tcp_trace);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
grpc_slice_buffer_init(&tcp->last_read_buffer);

@ -320,7 +320,7 @@ struct cq_callback_data {
/* Completion queue structure */
struct grpc_completion_queue {
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
grpc_core::RefCount owning_refs;
gpr_mu* mu;
@ -518,7 +518,7 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
cq->poller_vtable = poller_vtable;
/* One for destroy(), one for pollset_shutdown */
gpr_ref_init(&cq->owning_refs, 2);
new (&cq->owning_refs) grpc_core::RefCount(2);
poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
@ -573,16 +573,13 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
#ifndef NDEBUG
void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
const char* file, int line) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cq_refcount)) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1,
reason);
}
grpc_core::DebugLocation debug_location(file, line);
#else
void grpc_cq_internal_ref(grpc_completion_queue* cq) {
grpc_core::DebugLocation debug_location;
const char* reason = nullptr;
#endif
gpr_ref(&cq->owning_refs);
cq->owning_refs.Ref(debug_location, reason);
}
static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
@ -593,16 +590,13 @@ static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
#ifndef NDEBUG
void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
const char* file, int line) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cq_refcount)) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1,
reason);
}
grpc_core::DebugLocation debug_location(file, line);
#else
void grpc_cq_internal_unref(grpc_completion_queue* cq) {
grpc_core::DebugLocation debug_location;
const char* reason = nullptr;
#endif
if (gpr_unref(&cq->owning_refs)) {
if (GPR_UNLIKELY(cq->owning_refs.Unref(debug_location, reason))) {
cq->vtable->destroy(DATA_FROM_CQ(cq));
cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
#ifndef NDEBUG

@ -256,7 +256,7 @@ struct grpc_server {
listener* listeners;
int listeners_destroyed;
gpr_refcount internal_refcount;
grpc_core::RefCount internal_refcount;
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time;
@ -400,9 +400,7 @@ static void request_matcher_kill_requests(grpc_server* server,
* server proper
*/
static void server_ref(grpc_server* server) {
gpr_ref(&server->internal_refcount);
}
static void server_ref(grpc_server* server) { server->internal_refcount.Ref(); }
static void server_delete(grpc_server* server) {
registered_method* rm;
@ -434,7 +432,7 @@ static void server_delete(grpc_server* server) {
}
static void server_unref(grpc_server* server) {
if (gpr_unref(&server->internal_refcount)) {
if (GPR_UNLIKELY(server->internal_refcount.Unref())) {
server_delete(server);
}
}
@ -1031,7 +1029,7 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
gpr_cv_init(&server->starting_cv);
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
new (&server->internal_refcount) grpc_core::RefCount();
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;

Loading…
Cancel
Save