Remove functions in `class CoreCodegen` (#31751)

* Clean up `grpc_completion_queue_factory_lookup()`

* Clean up `grpc_completion_queue_create()`

* Clean up `grpc_completion_queue_create_for_next()`

* Clean up `grpc_completion_queue_create_for_pluck()`

* Clean up `grpc_completion_queue_shutdown()`

* Clean up `grpc_completion_queue_destroy()`

* Clean up `grpc_completion_queue_pluck()`

* Clean up `gpr_malloc()`

* Clean up `gpr_free()`

* Clean up `grpc_init()`

* Clean up `grpc_shutdown()`

* Clean up `gpr_mu_init()`

* Clean up `gpr_mu_destroy()`

* Clean up `gpr_mu_lock()`

* Clean up `gpr_mu_unlock()`

* Clean up `gpr_cv_init()`

* Clean up `gpr_cv_destroy()`

* Clean up `gpr_cv_wait()`

* Clean up `gpr_cv_signal()`

* Remove `gpr_cv_broadcast()`
pull/31768/head
Cheng-Yu Chung 2 years ago committed by GitHub
parent 5498481265
commit 024a0f7388
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      include/grpcpp/completion_queue.h
  2. 13
      include/grpcpp/impl/call_op_set.h
  3. 32
      include/grpcpp/impl/codegen/core_codegen.h
  4. 38
      include/grpcpp/impl/codegen/core_codegen_interface.h
  5. 23
      include/grpcpp/impl/sync.h
  6. 57
      src/cpp/common/core_codegen.cc

@ -34,6 +34,7 @@
#include <list> #include <list>
#include <grpc/grpc.h>
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/rpc_service_method.h> #include <grpcpp/impl/codegen/rpc_service_method.h>
@ -115,9 +116,7 @@ class CompletionQueue : private grpc::internal::GrpcLibrary {
explicit CompletionQueue(grpc_completion_queue* take); explicit CompletionQueue(grpc_completion_queue* take);
/// Destructor. Destroys the owned wrapped completion queue / instance. /// Destructor. Destroys the owned wrapped completion queue / instance.
~CompletionQueue() override { ~CompletionQueue() override { grpc_completion_queue_destroy(cq_); }
grpc::g_core_codegen_interface->grpc_completion_queue_destroy(cq_);
}
/// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT. /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
enum NextStatus { enum NextStatus {
@ -250,10 +249,9 @@ class CompletionQueue : private grpc::internal::GrpcLibrary {
protected: protected:
/// Private constructor of CompletionQueue only visible to friend classes /// Private constructor of CompletionQueue only visible to friend classes
explicit CompletionQueue(const grpc_completion_queue_attributes& attributes) { explicit CompletionQueue(const grpc_completion_queue_attributes& attributes) {
cq_ = grpc::g_core_codegen_interface->grpc_completion_queue_create( cq_ = grpc_completion_queue_create(
grpc::g_core_codegen_interface->grpc_completion_queue_factory_lookup( grpc_completion_queue_factory_lookup(&attributes), &attributes,
&attributes), nullptr);
&attributes, nullptr);
InitialAvalanching(); // reserve this for the future shutdown InitialAvalanching(); // reserve this for the future shutdown
} }
@ -324,8 +322,7 @@ class CompletionQueue : private grpc::internal::GrpcLibrary {
auto deadline = auto deadline =
grpc::g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); grpc::g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME);
while (true) { while (true) {
auto ev = grpc::g_core_codegen_interface->grpc_completion_queue_pluck( auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
cq_, tag, deadline, nullptr);
bool ok = ev.success != 0; bool ok = ev.success != 0;
void* ignored = tag; void* ignored = tag;
if (tag->FinalizeResult(&ignored, &ok)) { if (tag->FinalizeResult(&ignored, &ok)) {
@ -346,8 +343,7 @@ class CompletionQueue : private grpc::internal::GrpcLibrary {
void TryPluck(grpc::internal::CompletionQueueTag* tag) { void TryPluck(grpc::internal::CompletionQueueTag* tag) {
auto deadline = auto deadline =
grpc::g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); grpc::g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME);
auto ev = grpc::g_core_codegen_interface->grpc_completion_queue_pluck( auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT) return; if (ev.type == GRPC_QUEUE_TIMEOUT) return;
bool ok = ev.success != 0; bool ok = ev.success != 0;
void* ignored = tag; void* ignored = tag;
@ -362,8 +358,7 @@ class CompletionQueue : private grpc::internal::GrpcLibrary {
/// that the tag is internal not something that is returned to the user. /// that the tag is internal not something that is returned to the user.
void TryPluck(grpc::internal::CompletionQueueTag* tag, void TryPluck(grpc::internal::CompletionQueueTag* tag,
gpr_timespec deadline) { gpr_timespec deadline) {
auto ev = grpc::g_core_codegen_interface->grpc_completion_queue_pluck( auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
return; return;
} }
@ -388,7 +383,7 @@ class CompletionQueue : private grpc::internal::GrpcLibrary {
void CompleteAvalanching() { void CompleteAvalanching() {
if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, gpr_atm{-1}) == if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, gpr_atm{-1}) ==
1) { 1) {
grpc::g_core_codegen_interface->grpc_completion_queue_shutdown(cq_); grpc_completion_queue_shutdown(cq_);
} }
} }

@ -25,6 +25,7 @@
#include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/grpc_types.h>
#include <grpc/impl/compression_types.h> #include <grpc/impl/compression_types.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/completion_queue.h> #include <grpcpp/completion_queue.h>
@ -58,9 +59,8 @@ inline grpc_metadata* FillMetadataArray(
if (*metadata_count == 0) { if (*metadata_count == 0) {
return nullptr; return nullptr;
} }
grpc_metadata* metadata_array = grpc_metadata* metadata_array = static_cast<grpc_metadata*>(
static_cast<grpc_metadata*>(g_core_codegen_interface->gpr_malloc( gpr_malloc((*metadata_count) * sizeof(grpc_metadata)));
(*metadata_count) * sizeof(grpc_metadata)));
size_t i = 0; size_t i = 0;
for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) { for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
metadata_array[i].key = SliceReferencingString(iter->first); metadata_array[i].key = SliceReferencingString(iter->first);
@ -254,7 +254,7 @@ class CallOpSendInitialMetadata {
} }
void FinishOp(bool* /*status*/) { void FinishOp(bool* /*status*/) {
if (!send_ || hijacked_) return; if (!send_ || hijacked_) return;
g_core_codegen_interface->gpr_free(initial_metadata_); gpr_free(initial_metadata_);
send_ = false; send_ = false;
} }
@ -687,7 +687,7 @@ class CallOpServerSendStatus {
void FinishOp(bool* /*status*/) { void FinishOp(bool* /*status*/) {
if (!send_status_available_ || hijacked_) return; if (!send_status_available_ || hijacked_) return;
g_core_codegen_interface->gpr_free(trailing_metadata_); gpr_free(trailing_metadata_);
send_status_available_ = false; send_status_available_ = false;
} }
@ -808,8 +808,7 @@ class CallOpClientRecvStatus {
metadata_map_->GetBinaryErrorDetails()); metadata_map_->GetBinaryErrorDetails());
if (debug_error_string_ != nullptr) { if (debug_error_string_ != nullptr) {
client_context_->set_debug_error_string(debug_error_string_); client_context_->set_debug_error_string(debug_error_string_);
g_core_codegen_interface->gpr_free( gpr_free(const_cast<char*>(debug_error_string_));
const_cast<char*>(debug_error_string_));
} }
} }
// TODO(soheil): Find callers that set debug string even for status OK, // TODO(soheil): Find callers that set debug string even for status OK,

@ -33,38 +33,6 @@ namespace grpc {
/// Implementation of the core codegen interface. /// Implementation of the core codegen interface.
class CoreCodegen final : public CoreCodegenInterface { class CoreCodegen final : public CoreCodegenInterface {
private: private:
const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) override;
grpc_completion_queue* grpc_completion_queue_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attributes,
void* reserved) override;
grpc_completion_queue* grpc_completion_queue_create_for_next(
void* reserved) override;
grpc_completion_queue* grpc_completion_queue_create_for_pluck(
void* reserved) override;
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) override;
void grpc_completion_queue_destroy(grpc_completion_queue* cq) override;
grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline,
void* reserved) override;
void* gpr_malloc(size_t size) override;
void gpr_free(void* p) override;
void grpc_init() override;
void grpc_shutdown() override;
void gpr_mu_init(gpr_mu* mu) override;
void gpr_mu_destroy(gpr_mu* mu) override;
void gpr_mu_lock(gpr_mu* mu) override;
void gpr_mu_unlock(gpr_mu* mu) override;
void gpr_cv_init(gpr_cv* cv) override;
void gpr_cv_destroy(gpr_cv* cv) override;
int gpr_cv_wait(gpr_cv* cv, gpr_mu* mu, gpr_timespec abs_deadline) override;
void gpr_cv_signal(gpr_cv* cv) override;
void gpr_cv_broadcast(gpr_cv* cv) override;
grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops, grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* tag, size_t nops, void* tag,
void* reserved) override; void* reserved) override;

@ -45,44 +45,6 @@ class CoreCodegenInterface {
virtual void assert_fail(const char* failed_assertion, const char* file, virtual void assert_fail(const char* failed_assertion, const char* file,
int line) = 0; int line) = 0;
virtual const grpc_completion_queue_factory*
grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attributes, void* reserved) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create_for_next(
void* reserved) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create_for_pluck(
void* reserved) = 0;
virtual void grpc_completion_queue_shutdown(grpc_completion_queue* cq) = 0;
virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0;
virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq,
void* tag,
gpr_timespec deadline,
void* reserved) = 0;
virtual void* gpr_malloc(size_t size) = 0;
virtual void gpr_free(void* p) = 0;
// These are only to be used to fix edge cases involving grpc_init and
// grpc_shutdown. Calling grpc_init from the codegen interface before
// the real grpc_init is called will cause a crash, so if you use this
// function, ensure that it is not the first call to grpc_init.
virtual void grpc_init() = 0;
virtual void grpc_shutdown() = 0;
virtual void gpr_mu_init(gpr_mu* mu) = 0;
virtual void gpr_mu_destroy(gpr_mu* mu) = 0;
virtual void gpr_mu_lock(gpr_mu* mu) = 0;
virtual void gpr_mu_unlock(gpr_mu* mu) = 0;
virtual void gpr_cv_init(gpr_cv* cv) = 0;
virtual void gpr_cv_destroy(gpr_cv* cv) = 0;
virtual int gpr_cv_wait(gpr_cv* cv, gpr_mu* mu,
gpr_timespec abs_deadline) = 0;
virtual void gpr_cv_signal(gpr_cv* cv) = 0;
virtual void gpr_cv_broadcast(gpr_cv* cv) = 0;
virtual grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) = 0; virtual grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) = 0;
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0; virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0;
virtual size_t grpc_byte_buffer_length(grpc_byte_buffer* bb) virtual size_t grpc_byte_buffer_length(grpc_byte_buffer* bb)

@ -56,18 +56,14 @@ using CondVar = absl::CondVar;
class ABSL_LOCKABLE Mutex { class ABSL_LOCKABLE Mutex {
public: public:
Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); } Mutex() { gpr_mu_init(&mu_); }
~Mutex() { g_core_codegen_interface->gpr_mu_destroy(&mu_); } ~Mutex() { gpr_mu_destroy(&mu_); }
Mutex(const Mutex&) = delete; Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete; Mutex& operator=(const Mutex&) = delete;
void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION() { void Lock() ABSL_EXCLUSIVE_LOCK_FUNCTION() { gpr_mu_lock(&mu_); }
g_core_codegen_interface->gpr_mu_lock(&mu_); void Unlock() ABSL_UNLOCK_FUNCTION() { gpr_mu_unlock(&mu_); }
}
void Unlock() ABSL_UNLOCK_FUNCTION() {
g_core_codegen_interface->gpr_mu_unlock(&mu_);
}
private: private:
union { union {
@ -121,18 +117,17 @@ class ABSL_SCOPED_LOCKABLE ReleasableMutexLock {
class CondVar { class CondVar {
public: public:
CondVar() { g_core_codegen_interface->gpr_cv_init(&cv_); } CondVar() { gpr_cv_init(&cv_); }
~CondVar() { g_core_codegen_interface->gpr_cv_destroy(&cv_); } ~CondVar() { gpr_cv_destroy(&cv_); }
CondVar(const CondVar&) = delete; CondVar(const CondVar&) = delete;
CondVar& operator=(const CondVar&) = delete; CondVar& operator=(const CondVar&) = delete;
void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); } void Signal() { gpr_cv_signal(&cv_); }
void SignalAll() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); } void SignalAll() { gpr_cv_broadcast(&cv_); }
void Wait(Mutex* mu) { void Wait(Mutex* mu) {
g_core_codegen_interface->gpr_cv_wait( gpr_cv_wait(&cv_, &mu->mu_,
&cv_, &mu->mu_,
g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
} }

@ -36,63 +36,6 @@
namespace grpc { namespace grpc {
const grpc_completion_queue_factory*
CoreCodegen::grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) {
return ::grpc_completion_queue_factory_lookup(attributes);
}
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attributes, void* reserved) {
return ::grpc_completion_queue_create(factory, attributes, reserved);
}
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next(
void* reserved) {
return ::grpc_completion_queue_create_for_next(reserved);
}
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_pluck(
void* reserved) {
return ::grpc_completion_queue_create_for_pluck(reserved);
}
void CoreCodegen::grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
::grpc_completion_queue_shutdown(cq);
}
void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) {
::grpc_completion_queue_destroy(cq);
}
grpc_event CoreCodegen::grpc_completion_queue_pluck(grpc_completion_queue* cq,
void* tag,
gpr_timespec deadline,
void* reserved) {
return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved);
}
void* CoreCodegen::gpr_malloc(size_t size) { return ::gpr_malloc(size); }
void CoreCodegen::gpr_free(void* p) { return ::gpr_free(p); }
void CoreCodegen::grpc_init() { ::grpc_init(); }
void CoreCodegen::grpc_shutdown() { ::grpc_shutdown(); }
void CoreCodegen::gpr_mu_init(gpr_mu* mu) { ::gpr_mu_init(mu); }
void CoreCodegen::gpr_mu_destroy(gpr_mu* mu) { ::gpr_mu_destroy(mu); }
void CoreCodegen::gpr_mu_lock(gpr_mu* mu) { ::gpr_mu_lock(mu); }
void CoreCodegen::gpr_mu_unlock(gpr_mu* mu) { ::gpr_mu_unlock(mu); }
void CoreCodegen::gpr_cv_init(gpr_cv* cv) { ::gpr_cv_init(cv); }
void CoreCodegen::gpr_cv_destroy(gpr_cv* cv) { ::gpr_cv_destroy(cv); }
int CoreCodegen::gpr_cv_wait(gpr_cv* cv, gpr_mu* mu,
gpr_timespec abs_deadline) {
return ::gpr_cv_wait(cv, mu, abs_deadline);
}
void CoreCodegen::gpr_cv_signal(gpr_cv* cv) { ::gpr_cv_signal(cv); }
void CoreCodegen::gpr_cv_broadcast(gpr_cv* cv) { ::gpr_cv_broadcast(cv); }
grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer* bb) { grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer* bb) {
return ::grpc_byte_buffer_copy(bb); return ::grpc_byte_buffer_copy(bb);
} }

Loading…
Cancel
Save