Drop experimental tags from core callback API (#26535)

pull/26558/head
Vijay Pai 4 years ago committed by GitHub
parent 4aa9591662
commit ea4b68e7a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      include/grpc/grpc.h
  2. 26
      include/grpc/impl/codegen/grpc_types.h
  3. 12
      include/grpcpp/impl/codegen/callback_common.h
  4. 2
      include/grpcpp/impl/codegen/completion_queue.h
  5. 7
      src/core/lib/iomgr/exec_ctx.h
  6. 5
      src/core/lib/iomgr/executor/threadpool.cc
  7. 4
      src/core/lib/iomgr/executor/threadpool.h
  8. 38
      src/core/lib/surface/completion_queue.cc
  9. 2
      src/core/lib/surface/completion_queue.h
  10. 3
      src/core/lib/surface/completion_queue_factory.cc
  11. 4
      src/cpp/client/channel_cc.cc
  12. 3
      src/cpp/common/completion_queue_cc.cc
  13. 9
      src/cpp/server/server_cc.cc
  14. 8
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
  15. 4
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
  16. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  17. 6
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  18. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  19. 13
      test/core/end2end/inproc_callback_test.cc
  20. 12
      test/core/end2end/tests/connectivity.cc
  21. 11
      test/core/iomgr/threadpool_test.cc
  22. 9
      test/core/surface/completion_queue_test.cc
  23. 8
      test/cpp/microbenchmarks/bm_cq.cc
  24. 17
      test/cpp/microbenchmarks/bm_threadpool.cc

@ -115,8 +115,7 @@ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_pluck(
of GRPC_CQ_CALLBACK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING.
This function is experimental. */
GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_callback(
grpc_experimental_completion_queue_functor* shutdown_callback,
void* reserved);
grpc_completion_queue_functor* shutdown_callback, void* reserved);
/** Create a completion queue */
GRPCAPI grpc_completion_queue* grpc_completion_queue_create(

@ -746,21 +746,20 @@ typedef enum {
/** Events are popped out by calling grpc_completion_queue_pluck() API ONLY*/
GRPC_CQ_PLUCK,
/** EXPERIMENTAL: Events trigger a callback specified as the tag */
/** Events trigger a callback specified as the tag */
GRPC_CQ_CALLBACK
} grpc_cq_completion_type;
/** EXPERIMENTAL: Specifies an interface class to be used as a tag
for callback-based completion queues. This can be used directly,
as the first element of a struct in C, or as a base class in C++.
Its "run" value should be assigned to some non-member function, such as
a static method. */
typedef struct grpc_experimental_completion_queue_functor {
/** Specifies an interface class to be used as a tag for callback-based
* completion queues. This can be used directly, as the first element of a
* struct in C, or as a base class in C++. Its "run" value should be assigned to
* some non-member function, such as a static method. */
typedef struct grpc_completion_queue_functor {
/** The run member specifies a function that will be called when this
tag is extracted from the completion queue. Its arguments will be a
pointer to this functor and a boolean that indicates whether the
operation succeeded (non-zero) or failed (zero) */
void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int);
void (*functor_run)(struct grpc_completion_queue_functor*, int);
/** The inlineable member specifies whether this functor can be run inline.
This should only be used for trivial internally-defined functors. */
@ -768,10 +767,11 @@ typedef struct grpc_experimental_completion_queue_functor {
/** The following fields are not API. They are meant for internal use. */
int internal_success;
struct grpc_experimental_completion_queue_functor* internal_next;
} grpc_experimental_completion_queue_functor;
struct grpc_completion_queue_functor* internal_next;
} grpc_completion_queue_functor;
/* The upgrade to version 2 is currently experimental. */
typedef grpc_completion_queue_functor
grpc_experimental_completion_queue_functor;
#define GRPC_CQ_CURRENT_VERSION 2
#define GRPC_CQ_VERSION_MINIMUM_FOR_CALLBACKABLE 2
@ -786,10 +786,10 @@ typedef struct grpc_completion_queue_attributes {
/* END OF VERSION 1 CQ ATTRIBUTES */
/* EXPERIMENTAL: START OF VERSION 2 CQ ATTRIBUTES */
/* START OF VERSION 2 CQ ATTRIBUTES */
/** When creating a callbackable CQ, pass in a functor to get invoked when
* shutdown is complete */
grpc_experimental_completion_queue_functor* cq_shutdown_cb;
grpc_completion_queue_functor* cq_shutdown_cb;
/* END OF VERSION 2 CQ ATTRIBUTES */
} grpc_completion_queue_attributes;

@ -66,8 +66,7 @@ Reactor* CatchingReactorGetter(Func&& func, Args&&... args) {
// constructed and then fired at exactly one point. There is no expectation
// that they can be reused without reconstruction.
class CallbackWithStatusTag
: public grpc_experimental_completion_queue_functor {
class CallbackWithStatusTag : public grpc_completion_queue_functor {
public:
// always allocated against a call arena, no memory free required
static void operator delete(void* /*ptr*/, std::size_t size) {
@ -108,8 +107,7 @@ class CallbackWithStatusTag
CompletionQueueTag* ops_;
Status status_;
static void StaticRun(grpc_experimental_completion_queue_functor* cb,
int ok) {
static void StaticRun(grpc_completion_queue_functor* cb, int ok) {
static_cast<CallbackWithStatusTag*>(cb)->Run(static_cast<bool>(ok));
}
void Run(bool ok) {
@ -134,8 +132,7 @@ class CallbackWithStatusTag
/// CallbackWithSuccessTag can be reused multiple times, and will be used in
/// this fashion for streaming operations. As a result, it shouldn't clear
/// anything up until its destructor
class CallbackWithSuccessTag
: public grpc_experimental_completion_queue_functor {
class CallbackWithSuccessTag : public grpc_completion_queue_functor {
public:
// always allocated against a call arena, no memory free required
static void operator delete(void* /*ptr*/, std::size_t size) {
@ -198,8 +195,7 @@ class CallbackWithSuccessTag
std::function<void(bool)> func_;
CompletionQueueTag* ops_;
static void StaticRun(grpc_experimental_completion_queue_functor* cb,
int ok) {
static void StaticRun(grpc_completion_queue_functor* cb, int ok) {
static_cast<CallbackWithSuccessTag*>(cb)->Run(static_cast<bool>(ok));
}
void Run(bool ok) {

@ -443,7 +443,7 @@ class ServerCompletionQueue : public CompletionQueue {
/// \param shutdown_cb is the shutdown callback used for CALLBACK api queues
ServerCompletionQueue(grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type,
grpc_experimental_completion_queue_functor* shutdown_cb)
grpc_completion_queue_functor* shutdown_cb)
: CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, completion_type, polling_type,
shutdown_cb}),

@ -349,8 +349,7 @@ class ApplicationCallbackExecCtx {
}
}
static void Enqueue(grpc_experimental_completion_queue_functor* functor,
int is_success) {
static void Enqueue(grpc_completion_queue_functor* functor, int is_success) {
functor->internal_success = is_success;
functor->internal_next = nullptr;
@ -375,8 +374,8 @@ class ApplicationCallbackExecCtx {
private:
uintptr_t flags_{0u};
grpc_experimental_completion_queue_functor* head_{nullptr};
grpc_experimental_completion_queue_functor* tail_{nullptr};
grpc_completion_queue_functor* head_{nullptr};
grpc_completion_queue_functor* tail_{nullptr};
GPR_TLS_CLASS_DECL(callback_exec_ctx_);
};
} // namespace grpc_core

@ -41,8 +41,7 @@ void ThreadPoolWorker::Run() {
break;
}
// Runs closure
auto* closure =
static_cast<grpc_experimental_completion_queue_functor*>(elem);
auto* closure = static_cast<grpc_completion_queue_functor*>(elem);
closure->functor_run(closure, closure->internal_success);
}
}
@ -120,7 +119,7 @@ ThreadPool::~ThreadPool() {
delete queue_;
}
void ThreadPool::Add(grpc_experimental_completion_queue_functor* closure) {
void ThreadPool::Add(grpc_completion_queue_functor* closure) {
AssertHasNotBeenShutDown();
queue_->Put(static_cast<void*>(closure));
}

@ -43,7 +43,7 @@ class ThreadPoolInterface {
// current thread to be blocked (in case of unable to schedule).
// Closure should contain a function pointer and arguments it will take, more
// details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h
virtual void Add(grpc_experimental_completion_queue_functor* closure) = 0;
virtual void Add(grpc_completion_queue_functor* closure) = 0;
// Returns the current number of pending closures
virtual int num_pending_closures() const = 0;
@ -120,7 +120,7 @@ class ThreadPool : public ThreadPoolInterface {
// Adds given closure into pending queue immediately. Since closure queue has
// infinite length, this routine will not block.
void Add(grpc_experimental_completion_queue_functor* closure) override;
void Add(grpc_completion_queue_functor* closure) override;
int num_pending_closures() const override;
int pool_capacity() const override;

@ -197,8 +197,7 @@ const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
void (*init)(void* data,
grpc_experimental_completion_queue_functor* shutdown_callback);
void (*init)(void* data, grpc_completion_queue_functor* shutdown_callback);
void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
@ -310,8 +309,7 @@ struct cq_pluck_data {
};
struct cq_callback_data {
explicit cq_callback_data(
grpc_experimental_completion_queue_functor* shutdown_callback)
explicit cq_callback_data(grpc_completion_queue_functor* shutdown_callback)
: shutdown_callback(shutdown_callback) {}
~cq_callback_data() {
@ -332,7 +330,7 @@ struct cq_callback_data {
bool shutdown_called = false;
/** A callback that gets invoked when the CQ completes shutdown */
grpc_experimental_completion_queue_functor* shutdown_callback;
grpc_completion_queue_functor* shutdown_callback;
};
} // namespace
@ -397,12 +395,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
static void cq_init_next(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_init_pluck(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_init_callback(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_init_next(void* data,
grpc_completion_queue_functor* shutdown_callback);
static void cq_init_pluck(void* data,
grpc_completion_queue_functor* shutdown_callback);
static void cq_init_callback(void* data,
grpc_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data);
@ -513,7 +511,7 @@ grpc_cq_completion* CqEventQueue::Pop() {
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
grpc_experimental_completion_queue_functor* shutdown_callback) {
grpc_completion_queue_functor* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
grpc_completion_queue* cq;
@ -548,9 +546,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
return cq;
}
static void cq_init_next(
void* data,
grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
static void cq_init_next(void* data,
grpc_completion_queue_functor* /*shutdown_callback*/) {
new (data) cq_next_data();
}
@ -560,8 +557,7 @@ static void cq_destroy_next(void* data) {
}
static void cq_init_pluck(
void* data,
grpc_experimental_completion_queue_functor* /*shutdown_callback*/) {
void* data, grpc_completion_queue_functor* /*shutdown_callback*/) {
new (data) cq_pluck_data();
}
@ -570,8 +566,8 @@ static void cq_destroy_pluck(void* data) {
cqd->~cq_pluck_data();
}
static void cq_init_callback(
void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
static void cq_init_callback(void* data,
grpc_completion_queue_functor* shutdown_callback) {
new (data) cq_callback_data(shutdown_callback);
}
@ -836,7 +832,7 @@ static void cq_end_op_for_pluck(
}
static void functor_callback(void* arg, grpc_error_handle error) {
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
auto* functor = static_cast<grpc_completion_queue_functor*>(arg);
functor->functor_run(functor, error == GRPC_ERROR_NONE);
}
@ -880,7 +876,7 @@ static void cq_end_op_for_callback(
// 2. The callback is marked inlineable and there is an ACEC available
// 3. We are already running in a background poller thread (which always has
// an ACEC available at the base of the stack).
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
if (((internal || functor->inlineable) &&
grpc_core::ApplicationCallbackExecCtx::Available()) ||
grpc_iomgr_is_any_background_poller_thread()) {

@ -93,6 +93,6 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cq);
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
grpc_experimental_completion_queue_functor* shutdown_callback);
grpc_completion_queue_functor* shutdown_callback);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */

@ -72,8 +72,7 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
}
grpc_completion_queue* grpc_completion_queue_create_for_callback(
grpc_experimental_completion_queue_functor* shutdown_callback,
void* reserved) {
grpc_completion_queue_functor* shutdown_callback, void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {
2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback};

@ -214,7 +214,7 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
}
namespace {
class ShutdownCallback : public grpc_experimental_completion_queue_functor {
class ShutdownCallback : public grpc_completion_queue_functor {
public:
ShutdownCallback() {
functor_run = &ShutdownCallback::Run;
@ -230,7 +230,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
// The Run function will get invoked by the completion queue library
// when the shutdown is actually complete
static void Run(grpc_experimental_completion_queue_functor* cb, int) {
static void Run(grpc_completion_queue_functor* cb, int) {
auto* callback = static_cast<ShutdownCallback*>(cb);
delete callback->cq_;
delete callback;

@ -89,8 +89,7 @@ struct CallbackAlternativeCQ {
// hold any application locks before executing the callback,
// and cannot be entered recursively.
auto* functor =
static_cast<grpc_experimental_completion_queue_functor*>(
ev.tag);
static_cast<grpc_completion_queue_functor*>(ev.tag);
functor->functor_run(functor, ev.success);
}
},

@ -264,7 +264,7 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
}
namespace {
class ShutdownCallback : public grpc_experimental_completion_queue_functor {
class ShutdownCallback : public grpc_completion_queue_functor {
public:
ShutdownCallback() {
functor_run = &ShutdownCallback::Run;
@ -280,7 +280,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
// The Run function will get invoked by the completion queue library
// when the shutdown is actually complete
static void Run(grpc_experimental_completion_queue_functor* cb, int) {
static void Run(grpc_completion_queue_functor* cb, int) {
auto* callback = static_cast<ShutdownCallback*>(cb);
delete callback->cq_;
delete callback;
@ -576,7 +576,7 @@ class Server::CallbackRequest final
// method_name needs to be specialized between named method and generic
const char* method_name() const;
class CallbackCallTag : public grpc_experimental_completion_queue_functor {
class CallbackCallTag : public grpc_completion_queue_functor {
public:
explicit CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
: req_(req) {
@ -599,8 +599,7 @@ class Server::CallbackRequest final
Server::CallbackRequest<ServerContextType>* req_;
grpc::internal::Call* call_;
static void StaticRun(grpc_experimental_completion_queue_functor* cb,
int ok) {
static void StaticRun(grpc_completion_queue_functor* cb, int ok) {
static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
}
void Run(bool ok) {

@ -25,7 +25,7 @@ cdef struct CallbackContext:
# C struct to store callback context in the form of pointers.
#
# Attributes:
# functor: A grpc_experimental_completion_queue_functor represents the
# functor: A grpc_completion_queue_functor represents the
# callback function in the only way Core understands.
# waiter: An asyncio.Future object that fulfills when the callback is
# invoked by Core.
@ -33,7 +33,7 @@ cdef struct CallbackContext:
# returns 'success == 0' state.
# wrapper: A self-reference to the CallbackWrapper to help life cycle
# management.
grpc_experimental_completion_queue_functor functor
grpc_completion_queue_functor functor
cpython.PyObject *waiter
cpython.PyObject *loop
cpython.PyObject *failure_handler
@ -47,10 +47,10 @@ cdef class CallbackWrapper:
@staticmethod
cdef void functor_run(
grpc_experimental_completion_queue_functor* functor,
grpc_completion_queue_functor* functor,
int succeed)
cdef grpc_experimental_completion_queue_functor *c_functor(self)
cdef grpc_completion_queue_functor *c_functor(self)
cdef class GrpcCallWrapper:

@ -49,7 +49,7 @@ cdef class CallbackWrapper:
@staticmethod
cdef void functor_run(
grpc_experimental_completion_queue_functor* functor,
grpc_completion_queue_functor* functor,
int success):
cdef CallbackContext *context = <CallbackContext *>functor
cdef object waiter = <object>context.waiter
@ -60,7 +60,7 @@ cdef class CallbackWrapper:
waiter.set_result(None)
cpython.Py_DECREF(<object>context.callback_wrapper)
cdef grpc_experimental_completion_queue_functor *c_functor(self):
cdef grpc_completion_queue_functor *c_functor(self):
return &self.context.functor

@ -163,7 +163,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
if loop is context_loop:
# Executes callbacks: complete the future
CallbackWrapper.functor_run(
<grpc_experimental_completion_queue_functor *>event.tag,
<grpc_completion_queue_functor *>event.tag,
event.success
)
else:

@ -42,8 +42,8 @@ cdef extern from "grpc/byte_buffer_reader.h":
cdef extern from "grpc/impl/codegen/grpc_types.h":
ctypedef struct grpc_experimental_completion_queue_functor:
void (*functor_run)(grpc_experimental_completion_queue_functor*, int);
ctypedef struct grpc_completion_queue_functor:
void (*functor_run)(grpc_completion_queue_functor*, int);
cdef extern from "grpc/grpc.h":
@ -358,7 +358,7 @@ cdef extern from "grpc/grpc.h":
void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil
grpc_completion_queue *grpc_completion_queue_create_for_callback(
grpc_experimental_completion_queue_functor* shutdown_callback,
grpc_completion_queue_functor* shutdown_callback,
void *reserved) nogil
grpc_call_error grpc_call_start_batch(

@ -110,7 +110,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f
typedef grpc_completion_queue*(*grpc_completion_queue_create_for_pluck_type)(void* reserved);
extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import;
#define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import
typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(grpc_experimental_completion_queue_functor* shutdown_callback, void* reserved);
typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(grpc_completion_queue_functor* shutdown_callback, void* reserved);
extern grpc_completion_queue_create_for_callback_type grpc_completion_queue_create_for_callback_import;
#define grpc_completion_queue_create_for_callback grpc_completion_queue_create_for_callback_import
typedef grpc_completion_queue*(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attributes, void* reserved);

@ -37,14 +37,14 @@ typedef struct inproc_fixture_data {
namespace {
template <typename F>
class CQDeletingCallback : public grpc_experimental_completion_queue_functor {
class CQDeletingCallback : public grpc_completion_queue_functor {
public:
explicit CQDeletingCallback(F f) : func_(f) {
functor_run = &CQDeletingCallback::Run;
inlineable = false;
}
~CQDeletingCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
static void Run(grpc_completion_queue_functor* cb, int ok) {
auto* callback = static_cast<CQDeletingCallback*>(cb);
callback->func_(static_cast<bool>(ok));
delete callback;
@ -55,11 +55,11 @@ class CQDeletingCallback : public grpc_experimental_completion_queue_functor {
};
template <typename F>
grpc_experimental_completion_queue_functor* NewDeletingCallback(F f) {
grpc_completion_queue_functor* NewDeletingCallback(F f) {
return new CQDeletingCallback<F>(f);
}
class ShutdownCallback : public grpc_experimental_completion_queue_functor {
class ShutdownCallback : public grpc_completion_queue_functor {
public:
ShutdownCallback() : done_(false) {
functor_run = &ShutdownCallback::StaticRun;
@ -71,8 +71,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
gpr_mu_destroy(&mu_);
gpr_cv_destroy(&cv_);
}
static void StaticRun(grpc_experimental_completion_queue_functor* cb,
int ok) {
static void StaticRun(grpc_completion_queue_functor* cb, int ok) {
auto* callback = static_cast<ShutdownCallback*>(cb);
callback->Run(static_cast<bool>(ok));
}
@ -184,7 +183,7 @@ static void verify_tags(gpr_timespec deadline) {
// This function creates a callback functor that emits the
// desired tag into the global tag set
static grpc_experimental_completion_queue_functor* tag(intptr_t t) {
static grpc_completion_queue_functor* tag(intptr_t t) {
auto func = [t](bool ok) {
gpr_mu_lock(&tags_mu);
gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t);

@ -34,10 +34,10 @@ typedef struct {
} child_events;
struct CallbackContext {
grpc_experimental_completion_queue_functor functor;
grpc_completion_queue_functor functor;
gpr_event finished;
explicit CallbackContext(void (*cb)(
grpc_experimental_completion_queue_functor* functor, int success)) {
explicit CallbackContext(void (*cb)(grpc_completion_queue_functor* functor,
int success)) {
functor.functor_run = cb;
functor.inlineable = false;
gpr_event_init(&finished);
@ -174,8 +174,8 @@ static void test_connectivity(grpc_end2end_test_config config) {
cq_verifier_destroy(cqv);
}
static void cb_watch_connectivity(
grpc_experimental_completion_queue_functor* functor, int success) {
static void cb_watch_connectivity(grpc_completion_queue_functor* functor,
int success) {
CallbackContext* cb_ctx = reinterpret_cast<CallbackContext*>(functor);
gpr_log(GPR_DEBUG, "cb_watch_connectivity called, verifying");
@ -186,7 +186,7 @@ static void cb_watch_connectivity(
gpr_event_set(&cb_ctx->finished, reinterpret_cast<void*>(1));
}
static void cb_shutdown(grpc_experimental_completion_queue_functor* functor,
static void cb_shutdown(grpc_completion_queue_functor* functor,
int /*success*/) {
CallbackContext* cb_ctx = reinterpret_cast<CallbackContext*>(functor);

@ -44,7 +44,7 @@ static void test_constructor_option(void) {
}
// Simple functor for testing. It will count how many times being called.
class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor {
class SimpleFunctorForAdd : public grpc_completion_queue_functor {
public:
friend class SimpleFunctorCheckForAdd;
SimpleFunctorForAdd() {
@ -54,8 +54,7 @@ class SimpleFunctorForAdd : public grpc_experimental_completion_queue_functor {
internal_success = 0;
}
~SimpleFunctorForAdd() {}
static void Run(struct grpc_experimental_completion_queue_functor* cb,
int /*ok*/) {
static void Run(struct grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<SimpleFunctorForAdd*>(cb);
callback->count_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
}
@ -138,8 +137,7 @@ static void test_multi_add(void) {
}
// Checks the current count with a given number.
class SimpleFunctorCheckForAdd
: public grpc_experimental_completion_queue_functor {
class SimpleFunctorCheckForAdd : public grpc_completion_queue_functor {
public:
SimpleFunctorCheckForAdd(int ok, int* count) : count_(count) {
functor_run = &SimpleFunctorCheckForAdd::Run;
@ -147,8 +145,7 @@ class SimpleFunctorCheckForAdd
internal_success = ok;
}
~SimpleFunctorCheckForAdd() {}
static void Run(struct grpc_experimental_completion_queue_functor* cb,
int /*ok*/) {
static void Run(struct grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<SimpleFunctorCheckForAdd*>(cb);
(*callback->count_)++;
GPR_ASSERT(*callback->count_ == callback->internal_success);

@ -378,14 +378,14 @@ static void test_callback(void) {
LOG_TEST("test_callback");
bool got_shutdown = false;
class ShutdownCallback : public grpc_experimental_completion_queue_functor {
class ShutdownCallback : public grpc_completion_queue_functor {
public:
explicit ShutdownCallback(bool* done) : done_(done) {
functor_run = &ShutdownCallback::Run;
inlineable = false;
}
~ShutdownCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
static void Run(grpc_completion_queue_functor* cb, int ok) {
gpr_mu_lock(&shutdown_mu);
*static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
// Signal when the shutdown callback is completed.
@ -413,7 +413,7 @@ static void test_callback(void) {
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
class TagCallback : public grpc_experimental_completion_queue_functor {
class TagCallback : public grpc_completion_queue_functor {
public:
TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
functor_run = &TagCallback::Run;
@ -421,8 +421,7 @@ static void test_callback(void) {
inlineable = false;
}
~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb,
int ok) {
static void Run(grpc_completion_queue_functor* cb, int ok) {
GPR_ASSERT(static_cast<bool>(ok));
auto* callback = static_cast<TagCallback*>(cb);
gpr_mu_lock(&mu);

@ -162,14 +162,14 @@ static gpr_mu shutdown_mu, mu;
static gpr_cv shutdown_cv, cv;
// Tag completion queue iterate times
class TagCallback : public grpc_experimental_completion_queue_functor {
class TagCallback : public grpc_completion_queue_functor {
public:
explicit TagCallback(int* iter) : iter_(iter) {
functor_run = &TagCallback::Run;
inlineable = false;
}
~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
static void Run(grpc_completion_queue_functor* cb, int ok) {
gpr_mu_lock(&mu);
GPR_ASSERT(static_cast<bool>(ok));
*static_cast<TagCallback*>(cb)->iter_ += 1;
@ -182,14 +182,14 @@ class TagCallback : public grpc_experimental_completion_queue_functor {
};
// Check if completion queue is shut down
class ShutdownCallback : public grpc_experimental_completion_queue_functor {
class ShutdownCallback : public grpc_completion_queue_functor {
public:
explicit ShutdownCallback(bool* done) : done_(done) {
functor_run = &ShutdownCallback::Run;
inlineable = false;
}
~ShutdownCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
static void Run(grpc_completion_queue_functor* cb, int ok) {
gpr_mu_lock(&shutdown_mu);
*static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
gpr_cv_signal(&shutdown_cv);

@ -62,7 +62,7 @@ class BlockingCounter {
// number passed in (num_add) is greater than 0. Otherwise, it will decrement
// the counter to indicate that task is finished. This functor will suicide at
// the end, therefore, no need for caller to do clean-ups.
class AddAnotherFunctor : public grpc_experimental_completion_queue_functor {
class AddAnotherFunctor : public grpc_completion_queue_functor {
public:
AddAnotherFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter,
int num_add)
@ -74,7 +74,7 @@ class AddAnotherFunctor : public grpc_experimental_completion_queue_functor {
}
// When the functor gets to run in thread pool, it will take itself as first
// argument and internal_success as second one.
static void Run(grpc_experimental_completion_queue_functor* cb, int /*ok*/) {
static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<AddAnotherFunctor*>(cb);
if (--callback->num_add_ > 0) {
callback->pool_->Add(new AddAnotherFunctor(
@ -128,7 +128,7 @@ BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 2048)
->RangePair(524288, 524288, 1, 1024);
// A functor class that will delete self on end of running.
class SuicideFunctorForAdd : public grpc_experimental_completion_queue_functor {
class SuicideFunctorForAdd : public grpc_completion_queue_functor {
public:
explicit SuicideFunctorForAdd(BlockingCounter* counter) : counter_(counter) {
functor_run = &SuicideFunctorForAdd::Run;
@ -137,7 +137,7 @@ class SuicideFunctorForAdd : public grpc_experimental_completion_queue_functor {
internal_success = 0;
}
static void Run(grpc_experimental_completion_queue_functor* cb, int /*ok*/) {
static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
// On running, the first argument would be itself.
auto* callback = static_cast<SuicideFunctorForAdd*>(cb);
callback->counter_->DecrementCount();
@ -179,7 +179,7 @@ BENCHMARK(BM_ThreadPoolExternalAdd)
// Functor (closure) that adds itself into pool repeatedly. By adding self, the
// overhead would be low and can measure the time of add more accurately.
class AddSelfFunctor : public grpc_experimental_completion_queue_functor {
class AddSelfFunctor : public grpc_completion_queue_functor {
public:
AddSelfFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter,
int num_add)
@ -191,7 +191,7 @@ class AddSelfFunctor : public grpc_experimental_completion_queue_functor {
}
// When the functor gets to run in thread pool, it will take itself as first
// argument and internal_success as second one.
static void Run(grpc_experimental_completion_queue_functor* cb, int /*ok*/) {
static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<AddSelfFunctor*>(cb);
if (--callback->num_add_ > 0) {
callback->pool_->Add(cb);
@ -258,8 +258,7 @@ BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 2048)->RangePair(524288, 524288, 1, 1024);
// A functor (closure) that simulates closures with small but non-trivial amount
// of work.
class ShortWorkFunctorForAdd
: public grpc_experimental_completion_queue_functor {
class ShortWorkFunctorForAdd : public grpc_completion_queue_functor {
public:
BlockingCounter* counter_;
@ -270,7 +269,7 @@ class ShortWorkFunctorForAdd
internal_success = 0;
val_ = 0;
}
static void Run(grpc_experimental_completion_queue_functor* cb, int /*ok*/) {
static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<ShortWorkFunctorForAdd*>(cb);
// Uses pad to avoid compiler complaining unused variable error.
callback->pad[0] = 0;

Loading…
Cancel
Save