Make the core callback interface API so that it can be used in generated code

reviewable/pr16646/r5
Vijay Pai 6 years ago
parent 830e5ad5df
commit 9e6511ae2e
  1. 1
      BUILD
  2. 3
      CMakeLists.txt
  3. 3
      Makefile
  4. 1
      build.yaml
  5. 1
      gRPC-C++.podspec
  6. 2
      grpc.gyp
  7. 3
      include/grpc/grpc.h
  8. 15
      include/grpc/impl/codegen/grpc_types.h
  9. 92
      include/grpcpp/impl/codegen/callback_common.h
  10. 2
      include/grpcpp/impl/codegen/client_callback.h
  11. 34
      src/core/lib/surface/completion_queue.cc
  12. 19
      src/core/lib/surface/completion_queue.h
  13. 6
      src/core/lib/surface/completion_queue_factory.cc
  14. 10
      src/cpp/client/channel_cc.cc
  15. 149
      src/cpp/common/callback_common.cc
  16. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  17. 31
      test/core/end2end/inproc_callback_test.cc
  18. 25
      test/core/surface/completion_queue_test.cc
  19. 1
      tools/doxygen/Doxyfile.c++.internal
  20. 1
      tools/run_tests/generated/sources_and_headers.json

@ -119,7 +119,6 @@ GRPCXX_SRCS = [
"src/cpp/client/credentials_cc.cc", "src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc", "src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc", "src/cpp/common/alarm.cc",
"src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc", "src/cpp/common/channel_filter.cc",
"src/cpp/common/completion_queue_cc.cc", "src/cpp/common/completion_queue_cc.cc",

@ -2773,7 +2773,6 @@ add_library(grpc++
src/cpp/client/credentials_cc.cc src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc src/cpp/common/alarm.cc
src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc src/cpp/common/completion_queue_cc.cc
@ -3135,7 +3134,6 @@ add_library(grpc++_cronet
src/cpp/client/credentials_cc.cc src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc src/cpp/common/alarm.cc
src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc src/cpp/common/completion_queue_cc.cc
@ -4261,7 +4259,6 @@ add_library(grpc++_unsecure
src/cpp/client/credentials_cc.cc src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc src/cpp/common/alarm.cc
src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc src/cpp/common/completion_queue_cc.cc

@ -5228,7 +5228,6 @@ LIBGRPC++_SRC = \
src/cpp/client/credentials_cc.cc \ src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \ src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \ src/cpp/common/alarm.cc \
src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \ src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \ src/cpp/common/completion_queue_cc.cc \
@ -5598,7 +5597,6 @@ LIBGRPC++_CRONET_SRC = \
src/cpp/client/credentials_cc.cc \ src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \ src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \ src/cpp/common/alarm.cc \
src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \ src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \ src/cpp/common/completion_queue_cc.cc \
@ -6682,7 +6680,6 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/credentials_cc.cc \ src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \ src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \ src/cpp/common/alarm.cc \
src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \ src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \ src/cpp/common/completion_queue_cc.cc \

@ -1327,7 +1327,6 @@ filegroups:
- src/cpp/client/credentials_cc.cc - src/cpp/client/credentials_cc.cc
- src/cpp/client/generic_stub.cc - src/cpp/client/generic_stub.cc
- src/cpp/common/alarm.cc - src/cpp/common/alarm.cc
- src/cpp/common/callback_common.cc
- src/cpp/common/channel_arguments.cc - src/cpp/common/channel_arguments.cc
- src/cpp/common/channel_filter.cc - src/cpp/common/channel_filter.cc
- src/cpp/common/completion_queue_cc.cc - src/cpp/common/completion_queue_cc.cc

@ -190,7 +190,6 @@ Pod::Spec.new do |s|
'src/cpp/client/credentials_cc.cc', 'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc', 'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc', 'src/cpp/common/alarm.cc',
'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc', 'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc', 'src/cpp/common/completion_queue_cc.cc',

@ -1381,7 +1381,6 @@
'src/cpp/client/credentials_cc.cc', 'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc', 'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc', 'src/cpp/common/alarm.cc',
'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc', 'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc', 'src/cpp/common/completion_queue_cc.cc',
@ -1529,7 +1528,6 @@
'src/cpp/client/credentials_cc.cc', 'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc', 'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc', 'src/cpp/common/alarm.cc',
'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc', 'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc', 'src/cpp/common/completion_queue_cc.cc',

@ -111,7 +111,8 @@ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_pluck(
of GRPC_CQ_CALLBACK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING. of GRPC_CQ_CALLBACK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING.
This function is experimental. */ This function is experimental. */
GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_callback( GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_callback(
void* shutdown_callback, void* reserved); grpc_experimental_completion_queue_functor* shutdown_callback,
void* reserved);
/** Create a completion queue */ /** Create a completion queue */
GRPCAPI grpc_completion_queue* grpc_completion_queue_create( GRPCAPI grpc_completion_queue* grpc_completion_queue_create(

@ -660,6 +660,19 @@ typedef enum {
GRPC_CQ_CALLBACK GRPC_CQ_CALLBACK
} grpc_cq_completion_type; } grpc_cq_completion_type;
/** EXPERIMENTAL: Specifies an interface class to be used as a tag
for callback-based completion queues. This can be used directly,
as the first element of a struct in C, or as a base class in C++.
Its "run" value should be assigned to some non-member function, such as
a static method. */
typedef struct grpc_experimental_completion_queue_functor {
/** The run member specifies a function that will be called when this
tag is extracted from the completion queue. Its arguments will be a
pointer to this functor and a boolean that indicates whether the
success status of this operation */
void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int);
} grpc_experimental_completion_queue_functor;
/* The upgrade to version 2 is currently experimental. */ /* The upgrade to version 2 is currently experimental. */
#define GRPC_CQ_CURRENT_VERSION 2 #define GRPC_CQ_CURRENT_VERSION 2
@ -678,7 +691,7 @@ typedef struct grpc_completion_queue_attributes {
/* EXPERIMENTAL: START OF VERSION 2 CQ ATTRIBUTES */ /* EXPERIMENTAL: START OF VERSION 2 CQ ATTRIBUTES */
/** When creating a callbackable CQ, pass in a functor to get invoked when /** When creating a callbackable CQ, pass in a functor to get invoked when
* shutdown is complete */ * shutdown is complete */
void* cq_shutdown_cb; grpc_experimental_completion_queue_functor* cq_shutdown_cb;
/* END OF VERSION 2 CQ ATTRIBUTES */ /* END OF VERSION 2 CQ ATTRIBUTES */
} grpc_completion_queue_attributes; } grpc_completion_queue_attributes;

@ -21,25 +21,36 @@
#include <functional> #include <functional>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/status.h>
// Forward declarations
namespace grpc_core {
class CQCallbackInterface;
};
namespace grpc { namespace grpc {
namespace internal { namespace internal {
/// An exception-safe way of invoking a user-specified callback function
template <class Func, class Arg>
void CatchingCallback(Func&& func, Arg&& arg) {
#if GRPC_ALLOW_EXCEPTIONS
try {
func(arg);
} catch (...) {
// nothing to return or change here, just don't crash the library
}
#else // GRPC_ALLOW_EXCEPTIONS
func(arg);
#endif // GRPC_ALLOW_EXCEPTIONS
}
// The contract on these tags is that they are single-shot. They must be // The contract on these tags is that they are single-shot. They must be
// constructed and then fired at exactly one point. There is no expectation // constructed and then fired at exactly one point. There is no expectation
// that they can be reused without reconstruction. // that they can be reused without reconstruction.
class CallbackWithStatusTag { class CallbackWithStatusTag
: public grpc_experimental_completion_queue_functor {
public: public:
// always allocated against a call arena, no memory free required // always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) { static void operator delete(void* ptr, std::size_t size) {
@ -54,24 +65,48 @@ class CallbackWithStatusTag {
static void operator delete(void*, void*) { assert(0); } static void operator delete(void*, void*) { assert(0); }
CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f, CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f,
CompletionQueueTag* ops); CompletionQueueTag* ops)
: call_(call), func_(std::move(f)), ops_(ops), status_() {
g_core_codegen_interface->grpc_call_ref(call);
functor_run = &CallbackWithStatusTag::StaticRun;
}
~CallbackWithStatusTag() {} ~CallbackWithStatusTag() {}
void* tag() { return static_cast<void*>(impl_); } Status* status_ptr() { return &status_; }
Status* status_ptr() { return status_; }
CompletionQueueTag* ops() { return ops_; }
// force_run can not be performed on a tag if operations using this tag // force_run can not be performed on a tag if operations using this tag
// have been sent to PerformOpsOnCall. It is intended for error conditions // have been sent to PerformOpsOnCall. It is intended for error conditions
// that are detected before the operations are internally processed. // that are detected before the operations are internally processed.
void force_run(Status s); void force_run(Status s) {
status_ = std::move(s);
Run(true);
}
private: private:
grpc_core::CQCallbackInterface* impl_; grpc_call* call_;
Status* status_; std::function<void(Status)> func_;
CompletionQueueTag* ops_; CompletionQueueTag* ops_;
Status status_;
static void StaticRun(grpc_experimental_completion_queue_functor* cb,
int ok) {
static_cast<CallbackWithStatusTag*>(cb)->Run(static_cast<bool>(ok));
}
void Run(bool ok) {
void* ignored = ops_;
GPR_ASSERT(ops_->FinalizeResult(&ignored, &ok));
GPR_ASSERT(ignored == ops_);
// Last use of func_ or status_, so ok to move them out
CatchingCallback(std::move(func_), std::move(status_));
func_ = nullptr; // reset to clear this out for sure
g_core_codegen_interface->grpc_call_unref(call_);
}
}; };
class CallbackWithSuccessTag { class CallbackWithSuccessTag
: public grpc_experimental_completion_queue_functor {
public: public:
// always allocated against a call arena, no memory free required // always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) { static void operator delete(void* ptr, std::size_t size) {
@ -86,19 +121,40 @@ class CallbackWithSuccessTag {
static void operator delete(void*, void*) { assert(0); } static void operator delete(void*, void*) { assert(0); }
CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f, CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f,
CompletionQueueTag* ops); CompletionQueueTag* ops)
: call_(call), func_(std::move(f)), ops_(ops) {
g_core_codegen_interface->grpc_call_ref(call);
functor_run = &CallbackWithSuccessTag::StaticRun;
}
void* tag() { return static_cast<void*>(impl_); }
CompletionQueueTag* ops() { return ops_; } CompletionQueueTag* ops() { return ops_; }
// force_run can not be performed on a tag if operations using this tag // force_run can not be performed on a tag if operations using this tag
// have been sent to PerformOpsOnCall. It is intended for error conditions // have been sent to PerformOpsOnCall. It is intended for error conditions
// that are detected before the operations are internally processed. // that are detected before the operations are internally processed.
void force_run(bool ok); void force_run(bool ok) { Run(ok); }
private: private:
grpc_core::CQCallbackInterface* impl_; grpc_call* call_;
std::function<void(bool)> func_;
CompletionQueueTag* ops_; CompletionQueueTag* ops_;
static void StaticRun(grpc_experimental_completion_queue_functor* cb,
int ok) {
static_cast<CallbackWithSuccessTag*>(cb)->Run(static_cast<bool>(ok));
}
void Run(bool ok) {
void* ignored = ops_;
bool new_ok = ok;
GPR_ASSERT(ops_->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == ops_);
// Last use of func_, so ok to move it out for rvalue call above
CatchingCallback(std::move(func_), ok);
func_ = nullptr; // reset to clear this out for sure
g_core_codegen_interface->grpc_call_unref(call_);
}
}; };
} // namespace internal } // namespace internal

@ -84,7 +84,7 @@ class CallbackUnaryCallImpl {
ops->AllowNoMessage(); ops->AllowNoMessage();
ops->ClientSendClose(); ops->ClientSendClose();
ops->ClientRecvStatus(context, tag->status_ptr()); ops->ClientRecvStatus(context, tag->status_ptr());
ops->set_cq_tag(tag->tag()); ops->set_cq_tag(tag);
call.PerformOps(ops); call.PerformOps(ops);
} }
}; };

@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable { typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type; grpc_cq_completion_type cq_completion_type;
size_t data_size; size_t data_size;
void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback); void (*init)(void* data,
grpc_experimental_completion_queue_functor* shutdown_callback);
void (*shutdown)(grpc_completion_queue* cq); void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data); void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag); bool (*begin_op)(grpc_completion_queue* cq, void* tag);
@ -267,7 +268,7 @@ typedef struct cq_callback_data {
bool shutdown_called; bool shutdown_called;
/** A callback that gets invoked when the CQ completes shutdown */ /** A callback that gets invoked when the CQ completes shutdown */
grpc_core::CQCallbackInterface* shutdown_callback; grpc_experimental_completion_queue_functor* shutdown_callback;
} cq_callback_data; } cq_callback_data;
/* Completion queue structure */ /* Completion queue structure */
@ -333,12 +334,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved); gpr_timespec deadline, void* reserved);
// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
static void cq_init_next(void* data, static void cq_init_next(
grpc_core::CQCallbackInterface* shutdown_callback); void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_init_pluck(void* data, static void cq_init_pluck(
grpc_core::CQCallbackInterface* shutdown_callback); void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_init_callback(void* data, static void cq_init_callback(
grpc_core::CQCallbackInterface* shutdown_callback); void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data); static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data); static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data); static void cq_destroy_callback(void* data);
@ -462,7 +463,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
grpc_completion_queue* grpc_completion_queue_create_internal( grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
grpc_core::CQCallbackInterface* shutdown_callback) { grpc_experimental_completion_queue_functor* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0); GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
grpc_completion_queue* cq; grpc_completion_queue* cq;
@ -497,8 +498,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
return cq; return cq;
} }
static void cq_init_next(void* data, static void cq_init_next(
grpc_core::CQCallbackInterface* shutdown_callback) { void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_next_data* cqd = static_cast<cq_next_data*>(data); cq_next_data* cqd = static_cast<cq_next_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */ /* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1); gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@ -513,8 +514,8 @@ static void cq_destroy_next(void* data) {
cq_event_queue_destroy(&cqd->queue); cq_event_queue_destroy(&cqd->queue);
} }
static void cq_init_pluck(void* data, static void cq_init_pluck(
grpc_core::CQCallbackInterface* shutdown_callback) { void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data); cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */ /* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1); gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@ -532,7 +533,7 @@ static void cq_destroy_pluck(void* data) {
} }
static void cq_init_callback( static void cq_init_callback(
void* data, grpc_core::CQCallbackInterface* shutdown_callback) { void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_callback_data* cqd = static_cast<cq_callback_data*>(data); cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */ /* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1); gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@ -859,7 +860,8 @@ static void cq_end_op_for_callback(
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
(static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success); auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
(*functor->functor_run)(functor, is_success);
} }
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@ -1343,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
callback->Run(true); callback->functor_run(callback, true);
} }
static void cq_shutdown_callback(grpc_completion_queue* cq) { static void cq_shutdown_callback(grpc_completion_queue* cq) {

@ -48,23 +48,6 @@ typedef struct grpc_cq_completion {
uintptr_t next; uintptr_t next;
} grpc_cq_completion; } grpc_cq_completion;
/// For callback CQs, the tag that is passed in for an operation must
/// actually be a pointer to an implementation of the following class.
/// When the operation completes, the tag will be typecasted from void*
/// to grpc_core::CQCallbackInterface* and then the Run method will be
/// invoked on it. In practice, the language binding (e.g., C++ API
/// implementation) is responsible for providing and using an implementation
/// of this abstract base class.
namespace grpc_core {
class CQCallbackInterface {
public:
virtual ~CQCallbackInterface() {}
virtual void Run(bool) GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
} // namespace grpc_core
#ifndef NDEBUG #ifndef NDEBUG
void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason, void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason,
const char* file, int line); const char* file, int line);
@ -106,6 +89,6 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cc);
grpc_completion_queue* grpc_completion_queue_create_internal( grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
grpc_core::CQCallbackInterface* shutdown_callback); grpc_experimental_completion_queue_functor* shutdown_callback);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */

@ -31,8 +31,7 @@ static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory, const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attr) { const grpc_completion_queue_attributes* attr) {
return grpc_completion_queue_create_internal( return grpc_completion_queue_create_internal(
attr->cq_completion_type, attr->cq_polling_type, attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb);
static_cast<grpc_core::CQCallbackInterface*>(attr->cq_shutdown_cb));
} }
static grpc_completion_queue_factory_vtable default_vtable = {default_create}; static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@ -73,7 +72,8 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
} }
grpc_completion_queue* grpc_completion_queue_create_for_callback( grpc_completion_queue* grpc_completion_queue_create_for_callback(
void* shutdown_callback, void* reserved) { grpc_experimental_completion_queue_functor* shutdown_callback,
void* reserved) {
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = { grpc_completion_queue_attributes attr = {
2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}; 2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback};

@ -193,17 +193,19 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
} }
namespace { namespace {
class ShutdownCallback : public grpc_core::CQCallbackInterface { class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public: public:
ShutdownCallback() { functor_run = &ShutdownCallback::Run; }
// TakeCQ takes ownership of the cq into the shutdown callback // TakeCQ takes ownership of the cq into the shutdown callback
// so that the shutdown callback will be responsible for destroying it // so that the shutdown callback will be responsible for destroying it
void TakeCQ(CompletionQueue* cq) { cq_ = cq; } void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
// The Run function will get invoked by the completion queue library // The Run function will get invoked by the completion queue library
// when the shutdown is actually complete // when the shutdown is actually complete
void Run(bool) override { static void Run(grpc_experimental_completion_queue_functor* cb, int) {
delete cq_; auto* callback = static_cast<ShutdownCallback*>(cb);
grpc_core::Delete(this); delete callback->cq_;
grpc_core::Delete(callback);
} }
private: private:

@ -1,149 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <functional>
#include <grpcpp/impl/codegen/callback_common.h>
#include <grpcpp/impl/codegen/status.h>
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
namespace internal {
namespace {
template <class Func, class Arg>
void CatchingCallback(Func&& func, Arg&& arg) {
#if GRPC_ALLOW_EXCEPTIONS
try {
func(arg);
} catch (...) {
// nothing to return or change here, just don't crash the library
}
#else // GRPC_ALLOW_EXCEPTIONS
func(arg);
#endif // GRPC_ALLOW_EXCEPTIONS
}
class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
public:
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(CallbackWithSuccessImpl));
}
// This operator should never be called as the memory should be freed as part
// of the arena destruction. It only exists to provide a matching operator
// delete to the operator new so that some compilers will not complain (see
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { assert(0); }
CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent,
std::function<void(bool)> f)
: call_(call), parent_(parent), func_(std::move(f)) {
grpc_call_ref(call);
}
void Run(bool ok) override {
void* ignored = parent_->ops();
bool new_ok = ok;
GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == parent_->ops());
// Last use of func_, so ok to move it out for rvalue call above
CatchingCallback(std::move(func_), ok);
func_ = nullptr; // reset to clear this out for sure
grpc_call_unref(call_);
}
private:
grpc_call* call_;
CallbackWithSuccessTag* parent_;
std::function<void(bool)> func_;
};
class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
public:
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(CallbackWithStatusImpl));
}
// This operator should never be called as the memory should be freed as part
// of the arena destruction. It only exists to provide a matching operator
// delete to the operator new so that some compilers will not complain (see
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
// there are no tests catching the compiler warning.
static void operator delete(void*, void*) { assert(0); }
CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent,
std::function<void(Status)> f)
: call_(call), parent_(parent), func_(std::move(f)), status_() {
grpc_call_ref(call);
}
void Run(bool ok) override {
void* ignored = parent_->ops();
GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok));
GPR_ASSERT(ignored == parent_->ops());
// Last use of func_ or status_, so ok to move them out
CatchingCallback(std::move(func_), std::move(status_));
func_ = nullptr; // reset to clear this out for sure
grpc_call_unref(call_);
}
Status* status_ptr() { return &status_; }
private:
grpc_call* call_;
CallbackWithStatusTag* parent_;
std::function<void(Status)> func_;
Status status_;
};
} // namespace
CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call,
std::function<void(bool)> f,
CompletionQueueTag* ops)
: impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl)))
CallbackWithSuccessImpl(call, this, std::move(f))),
ops_(ops) {}
void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); }
CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call,
std::function<void(Status)> f,
CompletionQueueTag* ops)
: ops_(ops) {
auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl)))
CallbackWithStatusImpl(call, this, std::move(f));
impl_ = impl;
status_ = impl->status_ptr();
}
void CallbackWithStatusTag::force_run(Status s) {
*status_ = std::move(s);
impl_->Run(true);
}
} // namespace internal
} // namespace grpc

@ -107,7 +107,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f
typedef grpc_completion_queue*(*grpc_completion_queue_create_for_pluck_type)(void* reserved); typedef grpc_completion_queue*(*grpc_completion_queue_create_for_pluck_type)(void* reserved);
extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import; extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import;
#define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import #define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import
typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(void* shutdown_callback, void* reserved); typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(grpc_experimental_completion_queue_functor* shutdown_callback, void* reserved);
extern grpc_completion_queue_create_for_callback_type grpc_completion_queue_create_for_callback_import; extern grpc_completion_queue_create_for_callback_type grpc_completion_queue_create_for_callback_import;
#define grpc_completion_queue_create_for_callback grpc_completion_queue_create_for_callback_import #define grpc_completion_queue_create_for_callback grpc_completion_queue_create_for_callback_import
typedef grpc_completion_queue*(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attributes, void* reserved); typedef grpc_completion_queue*(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attributes, void* reserved);

@ -37,13 +37,16 @@ typedef struct inproc_fixture_data {
namespace { namespace {
template <typename F> template <typename F>
class CQDeletingCallback : public grpc_core::CQCallbackInterface { class CQDeletingCallback : public grpc_experimental_completion_queue_functor {
public: public:
explicit CQDeletingCallback(F f) : func_(f) {} explicit CQDeletingCallback(F f) : func_(f) {
~CQDeletingCallback() override {} functor_run = &CQDeletingCallback::Run;
void Run(bool ok) override { }
func_(ok); ~CQDeletingCallback() {}
grpc_core::Delete(this); static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
auto* callback = static_cast<CQDeletingCallback*>(cb);
callback->func_(static_cast<bool>(ok));
grpc_core::Delete(callback);
} }
private: private:
@ -51,18 +54,24 @@ class CQDeletingCallback : public grpc_core::CQCallbackInterface {
}; };
template <typename F> template <typename F>
grpc_core::CQCallbackInterface* NewDeletingCallback(F f) { grpc_experimental_completion_queue_functor* NewDeletingCallback(F f) {
return grpc_core::New<CQDeletingCallback<F>>(f); return grpc_core::New<CQDeletingCallback<F>>(f);
} }
class ShutdownCallback : public grpc_core::CQCallbackInterface { class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public: public:
ShutdownCallback() : done_(false) { ShutdownCallback() : done_(false) {
functor_run = &ShutdownCallback::StaticRun;
gpr_mu_init(&mu_); gpr_mu_init(&mu_);
gpr_cv_init(&cv_); gpr_cv_init(&cv_);
} }
~ShutdownCallback() override {} ~ShutdownCallback() {}
void Run(bool ok) override { static void StaticRun(grpc_experimental_completion_queue_functor* cb,
int ok) {
auto* callback = static_cast<ShutdownCallback*>(cb);
callback->Run(static_cast<bool>(ok));
}
void Run(bool ok) {
gpr_log(GPR_DEBUG, "CQ shutdown notification invoked"); gpr_log(GPR_DEBUG, "CQ shutdown notification invoked");
gpr_mu_lock(&mu_); gpr_mu_lock(&mu_);
done_ = true; done_ = true;
@ -170,7 +179,7 @@ static void verify_tags(gpr_timespec deadline) {
// This function creates a callback functor that emits the // This function creates a callback functor that emits the
// desired tag into the global tag set // desired tag into the global tag set
static grpc_core::CQCallbackInterface* tag(intptr_t t) { static grpc_experimental_completion_queue_functor* tag(intptr_t t) {
auto func = [t](bool ok) { auto func = [t](bool ok) {
gpr_mu_lock(&tags_mu); gpr_mu_lock(&tags_mu);
gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t); gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t);

@ -369,11 +369,15 @@ static void test_callback(void) {
LOG_TEST("test_callback"); LOG_TEST("test_callback");
bool got_shutdown = false; bool got_shutdown = false;
class ShutdownCallback : public grpc_core::CQCallbackInterface { class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public: public:
ShutdownCallback(bool* done) : done_(done) {} ShutdownCallback(bool* done) : done_(done) {
functor_run = &ShutdownCallback::Run;
}
~ShutdownCallback() {} ~ShutdownCallback() {}
void Run(bool ok) override { *done_ = ok; } static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
*static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
}
private: private:
bool* done_; bool* done_;
@ -391,14 +395,17 @@ static void test_callback(void) {
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
int counter = 0; int counter = 0;
class TagCallback : public grpc_core::CQCallbackInterface { class TagCallback : public grpc_experimental_completion_queue_functor {
public: public:
TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {} TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
functor_run = &TagCallback::Run;
}
~TagCallback() {} ~TagCallback() {}
void Run(bool ok) override { static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
GPR_ASSERT(ok); GPR_ASSERT(static_cast<bool>(ok));
*counter_ += tag_; auto* callback = static_cast<TagCallback*>(cb);
grpc_core::Delete(this); *callback->counter_ += callback->tag_;
grpc_core::Delete(callback);
}; };
private: private:

@ -1190,7 +1190,6 @@ src/cpp/client/secure_credentials.h \
src/cpp/codegen/codegen_init.cc \ src/cpp/codegen/codegen_init.cc \
src/cpp/common/alarm.cc \ src/cpp/common/alarm.cc \
src/cpp/common/auth_property_iterator.cc \ src/cpp/common/auth_property_iterator.cc \
src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \ src/cpp/common/channel_filter.cc \
src/cpp/common/channel_filter.h \ src/cpp/common/channel_filter.h \

@ -11470,7 +11470,6 @@
"src/cpp/client/credentials_cc.cc", "src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc", "src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc", "src/cpp/common/alarm.cc",
"src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc", "src/cpp/common/channel_filter.cc",
"src/cpp/common/channel_filter.h", "src/cpp/common/channel_filter.h",

Loading…
Cancel
Save