Remove functions in `class CoreCodegen` (#31767)

* Clean up `grpc_byte_buffer_copy()`

* Clean up `grpc_byte_buffer_destroy()`

* Clean up `grpc_byte_buffer_length()`

* Clean up `grpc_byte_buffer_reader_init()`

* Clean up `grpc_byte_buffer_reader_destroy()`

* Clean up `grpc_byte_buffer_reader_next()`

* Clean up `grpc_byte_buffer_reader_peek()`

* Clean up `grpc_raw_byte_buffer_create()`

* Clean up `grpc_slice_new_with_user_data()`

* Clean up `grpc_slice_new_with_len()`

* Clean up `grpc_call_start_batch()`

* Clean up `grpc_call_cancel_with_status()`

* Clean up `grpc_call_failed_before_recv_message()`

* Clean up `grpc_call_ref()`

* Clean up `grpc_call_unref()`

* Clean up `grpc_call_error_to_string()`

* Fix typos

* Automated change: Fix sanity tests
pull/31794/head
Cheng-Yu Chung 2 years ago committed by GitHub
parent 5933b52e11
commit 1d968a36aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      include/grpcpp/impl/call_op_set.h
  2. 33
      include/grpcpp/impl/codegen/core_codegen.h
  3. 35
      include/grpcpp/impl/codegen/core_codegen_interface.h
  4. 52
      include/grpcpp/impl/server_callback_handlers.h
  5. 4
      include/grpcpp/server_context.h
  6. 7
      include/grpcpp/server_interface.h
  7. 26
      include/grpcpp/support/async_stream.h
  8. 18
      include/grpcpp/support/async_unary_call.h
  9. 18
      include/grpcpp/support/byte_buffer.h
  10. 9
      include/grpcpp/support/callback_common.h
  11. 35
      include/grpcpp/support/client_callback.h
  12. 11
      include/grpcpp/support/method_handler.h
  13. 9
      include/grpcpp/support/proto_buffer_reader.h
  14. 4
      include/grpcpp/support/proto_buffer_writer.h
  15. 6
      include/grpcpp/support/slice.h
  16. 74
      src/cpp/common/core_codegen.cc
  17. 37
      test/cpp/microbenchmarks/bm_byte_buffer.cc

@ -23,6 +23,7 @@
#include <map>
#include <memory>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/impl/compression_types.h>
#include <grpc/support/alloc.h>
@ -892,7 +893,7 @@ class CallOpSet : public CallOpSetInterface,
void FillOps(Call* call) override {
done_intercepting_ = false;
g_core_codegen_interface->grpc_call_ref(call->call());
grpc_call_ref(call->call());
call_ =
*call; // It's fine to create a copy of call since it's just pointers
@ -913,7 +914,7 @@ class CallOpSet : public CallOpSetInterface,
// run
*tag = return_tag_;
*status = saved_status_;
g_core_codegen_interface->grpc_call_unref(call_.call());
grpc_call_unref(call_.call());
return true;
}
@ -926,7 +927,7 @@ class CallOpSet : public CallOpSetInterface,
saved_status_ = *status;
if (RunInterceptorsPostRecv()) {
*tag = return_tag_;
g_core_codegen_interface->grpc_call_unref(call_.call());
grpc_call_unref(call_.call());
return true;
}
// Interceptors are going to be run, so we can't return the tag just yet.
@ -967,15 +968,15 @@ class CallOpSet : public CallOpSetInterface,
this->Op5::AddOp(ops, &nops);
this->Op6::AddOp(ops, &nops);
grpc_call_error err = g_core_codegen_interface->grpc_call_start_batch(
call_.call(), ops, nops, core_cq_tag(), nullptr);
grpc_call_error err =
grpc_call_start_batch(call_.call(), ops, nops, core_cq_tag(), nullptr);
if (err != GRPC_CALL_OK) {
// A failure here indicates an API misuse; for example, doing a Write
// while another Write is already pending on the same RPC or invoking
// WritesDone multiple times
gpr_log(GPR_ERROR, "API misuse of type %s observed",
g_core_codegen_interface->grpc_call_error_to_string(err));
grpc_call_error_to_string(err));
GPR_CODEGEN_ASSERT(false);
}
}
@ -986,9 +987,9 @@ class CallOpSet : public CallOpSetInterface,
done_intercepting_ = true;
// The following call_start_batch is internally-generated so no need for an
// explanatory log on failure.
GPR_CODEGEN_ASSERT(g_core_codegen_interface->grpc_call_start_batch(
call_.call(), nullptr, 0, core_cq_tag(), nullptr) ==
GRPC_CALL_OK);
GPR_CODEGEN_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0,
core_cq_tag(),
nullptr) == GRPC_CALL_OK);
}
private:

@ -33,39 +33,6 @@ namespace grpc {
/// Implementation of the core codegen interface.
class CoreCodegen final : public CoreCodegenInterface {
private:
grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* tag,
void* reserved) override;
grpc_call_error grpc_call_cancel_with_status(grpc_call* call,
grpc_status_code status,
const char* description,
void* reserved) override;
int grpc_call_failed_before_recv_message(const grpc_call* c) override;
void grpc_call_ref(grpc_call* call) override;
void grpc_call_unref(grpc_call* call) override;
void* grpc_call_arena_alloc(grpc_call* call, size_t length) override;
const char* grpc_call_error_to_string(grpc_call_error error) override;
grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) override;
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
size_t grpc_byte_buffer_length(grpc_byte_buffer* bb) override;
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) override;
void grpc_byte_buffer_reader_destroy(
grpc_byte_buffer_reader* reader) override;
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
grpc_slice* slice) override;
int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
grpc_slice** slice) override;
grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) override;
grpc_slice grpc_slice_new_with_user_data(void* p, size_t len,
void (*destroy)(void*),
void* user_data) override;
grpc_slice grpc_slice_new_with_len(void* p, size_t len,
void (*destroy)(void*, size_t)) override;
grpc_slice grpc_empty_slice() override;
grpc_slice grpc_slice_malloc(size_t length) override;
void grpc_slice_unref(grpc_slice slice) override;

@ -45,41 +45,6 @@ class CoreCodegenInterface {
virtual void assert_fail(const char* failed_assertion, const char* file,
int line) = 0;
virtual grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) = 0;
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0;
virtual size_t grpc_byte_buffer_length(grpc_byte_buffer* bb)
GRPC_MUST_USE_RESULT = 0;
virtual int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer)
GRPC_MUST_USE_RESULT = 0;
virtual void grpc_byte_buffer_reader_destroy(
grpc_byte_buffer_reader* reader) = 0;
virtual int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
grpc_slice* slice) = 0;
virtual int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
grpc_slice** slice) = 0;
virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) = 0;
virtual grpc_slice grpc_slice_new_with_user_data(void* p, size_t len,
void (*destroy)(void*),
void* user_data) = 0;
virtual grpc_slice grpc_slice_new_with_len(void* p, size_t len,
void (*destroy)(void*,
size_t)) = 0;
virtual grpc_call_error grpc_call_start_batch(grpc_call* call,
const grpc_op* ops, size_t nops,
void* tag, void* reserved) = 0;
virtual grpc_call_error grpc_call_cancel_with_status(grpc_call* call,
grpc_status_code status,
const char* description,
void* reserved) = 0;
virtual int grpc_call_failed_before_recv_message(const grpc_call* c) = 0;
virtual void grpc_call_ref(grpc_call* call) = 0;
virtual void grpc_call_unref(grpc_call* call) = 0;
virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) = 0;
virtual const char* grpc_call_error_to_string(grpc_call_error error) = 0;
virtual grpc_slice grpc_empty_slice() = 0;
virtual grpc_slice grpc_slice_malloc(size_t length) = 0;
virtual void grpc_slice_unref(grpc_slice slice) = 0;

@ -18,6 +18,7 @@
#ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
#define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
#include <grpc/grpc.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/server_context.h>
#include <grpcpp/support/message_allocator.h>
@ -43,13 +44,13 @@ class CallbackUnaryHandler : public grpc::internal::MethodHandler {
void RunHandler(const HandlerParameter& param) final {
// Arena allocate a controller structure (that includes request/response)
grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
grpc_call_ref(param.call->call());
auto* allocator_state =
static_cast<MessageHolder<RequestType, ResponseType>*>(
param.internal_data);
auto* call = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackUnaryImpl)))
auto* call = new (grpc_call_arena_alloc(param.call->call(),
sizeof(ServerCallbackUnaryImpl)))
ServerCallbackUnaryImpl(
static_cast<grpc::CallbackServerContext*>(param.server_context),
param.call, allocator_state, param.call_requester);
@ -66,8 +67,8 @@ class CallbackUnaryHandler : public grpc::internal::MethodHandler {
if (reactor == nullptr) {
// if deserialization or reactor creator failed, we need to fail the call
reactor = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(UnimplementedUnaryReactor)))
reactor = new (grpc_call_arena_alloc(param.call->call(),
sizeof(UnimplementedUnaryReactor)))
UnimplementedUnaryReactor(
grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
}
@ -85,10 +86,9 @@ class CallbackUnaryHandler : public grpc::internal::MethodHandler {
if (allocator_ != nullptr) {
allocator_state = allocator_->AllocateMessages();
} else {
allocator_state =
new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
DefaultMessageHolder<RequestType, ResponseType>();
allocator_state = new (grpc_call_arena_alloc(
call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
DefaultMessageHolder<RequestType, ResponseType>();
}
*handler_data = allocator_state;
request = allocator_state->request();
@ -208,7 +208,7 @@ class CallbackUnaryHandler : public grpc::internal::MethodHandler {
ctx_->context_allocator()->Release(ctx_);
}
this->~ServerCallbackUnaryImpl(); // explicitly call destructor
grpc::g_core_codegen_interface->grpc_call_unref(call);
grpc_call_unref(call);
call_requester();
}
@ -256,10 +256,10 @@ class CallbackClientStreamingHandler : public grpc::internal::MethodHandler {
: get_reactor_(std::move(get_reactor)) {}
void RunHandler(const HandlerParameter& param) final {
// Arena allocate a reader structure (that includes response)
grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
grpc_call_ref(param.call->call());
auto* reader = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackReaderImpl)))
auto* reader = new (grpc_call_arena_alloc(param.call->call(),
sizeof(ServerCallbackReaderImpl)))
ServerCallbackReaderImpl(
static_cast<grpc::CallbackServerContext*>(param.server_context),
param.call, param.call_requester);
@ -282,7 +282,7 @@ class CallbackClientStreamingHandler : public grpc::internal::MethodHandler {
if (reactor == nullptr) {
// if deserialization or reactor creator failed, we need to fail the call
reactor = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
reactor = new (grpc_call_arena_alloc(
param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
UnimplementedReadReactor<RequestType>(
grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
@ -405,7 +405,7 @@ class CallbackClientStreamingHandler : public grpc::internal::MethodHandler {
ctx_->context_allocator()->Release(ctx_);
}
this->~ServerCallbackReaderImpl(); // explicitly call destructor
grpc::g_core_codegen_interface->grpc_call_unref(call);
grpc_call_unref(call);
call_requester();
}
@ -447,10 +447,10 @@ class CallbackServerStreamingHandler : public grpc::internal::MethodHandler {
: get_reactor_(std::move(get_reactor)) {}
void RunHandler(const HandlerParameter& param) final {
// Arena allocate a writer structure
grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
grpc_call_ref(param.call->call());
auto* writer = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackWriterImpl)))
auto* writer = new (grpc_call_arena_alloc(param.call->call(),
sizeof(ServerCallbackWriterImpl)))
ServerCallbackWriterImpl(
static_cast<grpc::CallbackServerContext*>(param.server_context),
param.call, static_cast<RequestType*>(param.request),
@ -473,7 +473,7 @@ class CallbackServerStreamingHandler : public grpc::internal::MethodHandler {
}
if (reactor == nullptr) {
// if deserialization or reactor creator failed, we need to fail the call
reactor = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
reactor = new (grpc_call_arena_alloc(
param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
UnimplementedWriteReactor<ResponseType>(
grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
@ -486,8 +486,8 @@ class CallbackServerStreamingHandler : public grpc::internal::MethodHandler {
grpc::Status* status, void** /*handler_data*/) final {
grpc::ByteBuffer buf;
buf.set_buffer(req);
auto* request = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(RequestType))) RequestType();
auto* request =
new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
*status =
grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
buf.Release();
@ -630,7 +630,7 @@ class CallbackServerStreamingHandler : public grpc::internal::MethodHandler {
ctx_->context_allocator()->Release(ctx_);
}
this->~ServerCallbackWriterImpl(); // explicitly call destructor
grpc::g_core_codegen_interface->grpc_call_unref(call);
grpc_call_unref(call);
call_requester();
}
@ -672,9 +672,9 @@ class CallbackBidiHandler : public grpc::internal::MethodHandler {
get_reactor)
: get_reactor_(std::move(get_reactor)) {}
void RunHandler(const HandlerParameter& param) final {
grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
grpc_call_ref(param.call->call());
auto* stream = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
auto* stream = new (grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
ServerCallbackReaderWriterImpl(
static_cast<grpc::CallbackServerContext*>(param.server_context),
@ -697,7 +697,7 @@ class CallbackBidiHandler : public grpc::internal::MethodHandler {
if (reactor == nullptr) {
// if deserialization or reactor creator failed, we need to fail the call
reactor = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
reactor = new (grpc_call_arena_alloc(
param.call->call(),
sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
UnimplementedBidiReactor<RequestType, ResponseType>(
@ -846,7 +846,7 @@ class CallbackBidiHandler : public grpc::internal::MethodHandler {
ctx_->context_allocator()->Release(ctx_);
}
this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
grpc::g_core_codegen_interface->grpc_call_unref(call);
grpc_call_unref(call);
call_requester();
}

@ -28,6 +28,7 @@
#include <type_traits>
#include <vector>
#include <grpc/grpc.h>
#include <grpc/impl/compression_types.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set.h>
@ -441,8 +442,7 @@ class ServerContextBase {
}
void MaybeMarkCancelledOnRead() {
if (g_core_codegen_interface->grpc_call_failed_before_recv_message(
call_.call)) {
if (grpc_call_failed_before_recv_message(call_.call)) {
marked_cancelled_.store(true, std::memory_order_release);
}
}

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_hook.h>
@ -272,9 +273,9 @@ class ServerInterface : public internal::CallHook {
// a new instance of ourselves to request another call. We then
// return false, which prevents the call from being returned to
// the application.
g_core_codegen_interface->grpc_call_cancel_with_status(
call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr);
g_core_codegen_interface->grpc_call_unref(call_);
grpc_call_cancel_with_status(call_, GRPC_STATUS_INTERNAL,
"Unable to parse request", nullptr);
grpc_call_unref(call_);
new PayloadAsyncRequest(registered_method_, server_, context_,
stream_, call_cq_, notification_cq_, tag_,
request_);

@ -185,8 +185,8 @@ class ClientAsyncReaderFactory {
grpc::ClientContext* context,
const W& request, bool start, void* tag) {
grpc::internal::Call call = channel->CreateCall(method, context, cq);
return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientAsyncReader<R>)))
return new (
grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncReader<R>)))
ClientAsyncReader<R>(call, context, request, start, tag);
}
};
@ -318,7 +318,7 @@ class ClientAsyncWriterFactory {
/// Create a stream object.
/// Start the RPC if \a start is set
/// \a tag will be notified on \a cq when the call has been started (i.e.
/// intitial metadata sent) and \a request has been written out.
/// initial metadata sent) and \a request has been written out.
/// If \a start is not set, \a tag must be nullptr and the actual call
/// must be initiated by StartCall
/// Note that \a context will be used to fill in custom initial metadata
@ -333,8 +333,8 @@ class ClientAsyncWriterFactory {
grpc::ClientContext* context, R* response,
bool start, void* tag) {
grpc::internal::Call call = channel->CreateCall(method, context, cq);
return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientAsyncWriter<W>)))
return new (
grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncWriter<W>)))
ClientAsyncWriter<W>(call, context, response, start, tag);
}
};
@ -488,7 +488,7 @@ class ClientAsyncReaderWriterFactory {
/// Create a stream object.
/// Start the RPC request if \a start is set.
/// \a tag will be notified on \a cq when the call has been started (i.e.
/// intitial metadata sent). If \a start is not set, \a tag must be
/// initial metadata sent). If \a start is not set, \a tag must be
/// nullptr and the actual call must be initiated by StartCall
/// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
@ -498,8 +498,8 @@ class ClientAsyncReaderWriterFactory {
bool start, void* tag) {
grpc::internal::Call call = channel->CreateCall(method, context, cq);
return new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientAsyncReaderWriter<W, R>)))
return new (grpc_call_arena_alloc(call.call(),
sizeof(ClientAsyncReaderWriter<W, R>)))
ClientAsyncReaderWriter<W, R>(call, context, start, tag);
}
};
@ -651,7 +651,7 @@ class ServerAsyncReaderInterface
///
/// It is appropriate to call this method when:
/// * all messages from the client have been received (either known
/// implictly, or explicitly because a previous
/// implicitly, or explicitly because a previous
/// \a AsyncReaderInterface::Read operation with a non-ok result,
/// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
///
@ -727,7 +727,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
/// See the \a ServerAsyncReaderInterface.Read method for semantics
///
/// Side effect:
/// - also sends initial metadata if not alreay sent.
/// - also sends initial metadata if not already sent.
/// - uses the \a ServerContext associated with this call to send possible
/// initial and trailing metadata.
///
@ -758,7 +758,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
/// See the \a ServerAsyncReaderInterface.Read method for semantics
///
/// Side effect:
/// - also sends initial metadata if not alreay sent.
/// - also sends initial metadata if not already sent.
/// - uses the \a ServerContext associated with this call to send possible
/// initial and trailing metadata.
///
@ -805,7 +805,7 @@ class ServerAsyncWriterInterface
///
/// It is appropriate to call this method when either:
/// * all messages from the client have been received (either known
/// implictly, or explicitly because a previous \a
/// implicitly, or explicitly because a previous \a
/// AsyncReaderInterface::Read operation with a non-ok
/// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'.
/// * it is desired to end the call early with some non-OK status code.
@ -967,7 +967,7 @@ class ServerAsyncReaderWriterInterface
///
/// It is appropriate to call this method when either:
/// * all messages from the client have been received (either known
/// implictly, or explicitly because a previous \a
/// implicitly, or explicitly because a previous \a
/// AsyncReaderInterface::Read operation
/// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok'
/// with 'false'.

@ -79,7 +79,7 @@ class ClientAsyncResponseReaderHelper {
public:
/// Start a call and write the request out if \a start is set.
/// \a tag will be notified on \a cq when the call has been started (i.e.
/// intitial metadata sent) and \a request has been written out.
/// initial metadata sent) and \a request has been written out.
/// If \a start is not set, the actual call must be initiated by StartCall
/// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
@ -95,10 +95,9 @@ class ClientAsyncResponseReaderHelper {
const grpc::internal::RpcMethod& method, grpc::ClientContext* context,
const W& request) /* __attribute__((noinline)) */ {
grpc::internal::Call call = channel->CreateCall(method, context, cq);
ClientAsyncResponseReader<R>* result =
new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientAsyncResponseReader<R>)))
ClientAsyncResponseReader<R>(call, context);
ClientAsyncResponseReader<R>* result = new (grpc_call_arena_alloc(
call.call(), sizeof(ClientAsyncResponseReader<R>)))
ClientAsyncResponseReader<R>(call, context);
SetupRequest<BaseR, BaseW>(
call.call(), &result->single_buf_, &result->read_initial_metadata_,
&result->finish_, static_cast<const BaseW&>(request));
@ -128,8 +127,7 @@ class ClientAsyncResponseReaderHelper {
grpc::internal::CallOpRecvMessage<R>,
grpc::internal::CallOpClientRecvStatus>;
SingleBufType* single_buf =
new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(SingleBufType))) SingleBufType;
new (grpc_call_arena_alloc(call, sizeof(SingleBufType))) SingleBufType;
*single_buf_ptr = single_buf;
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(single_buf->SendMessage(request).ok());
@ -154,7 +152,7 @@ class ClientAsyncResponseReaderHelper {
// will be static-cast'ed back to the class specified here by hiding that
// class information inside the function definition. Note that this feature
// expects the class being specified here for R to be a base-class of the
// "real" R without any multiple-inheritance (as applies in protbuf wrt
// "real" R without any multiple-inheritance (as applies in protobuf wrt
// MessageLite)
*finish = [](ClientContext* context, internal::Call* call,
bool initial_metadata_read,
@ -166,8 +164,8 @@ class ClientAsyncResponseReaderHelper {
grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>,
grpc::internal::CallOpClientRecvStatus>;
FinishBufType* finish_buf =
new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call->call(), sizeof(FinishBufType))) FinishBufType;
new (grpc_call_arena_alloc(call->call(), sizeof(FinishBufType)))
FinishBufType;
*finish_buf_ptr = finish_buf;
finish_buf->set_output_tag(tag);
finish_buf->RecvMessage(static_cast<R*>(msg));

@ -85,11 +85,11 @@ class ByteBuffer final {
// than its advertised side effect of increasing the reference count of the
// slices it processes, and such an increase does not affect the semantics
// seen by the caller of this constructor.
buffer_ = g_core_codegen_interface->grpc_raw_byte_buffer_create(
buffer_ = grpc_raw_byte_buffer_create(
reinterpret_cast<grpc_slice*>(const_cast<Slice*>(slices)), nslices);
}
/// Constuct a byte buffer by referencing elements of existing buffer
/// Construct a byte buffer by referencing elements of existing buffer
/// \a buf. Wrapper of core function grpc_byte_buffer_copy . This is not
/// a deep copy; it is just a referencing. As a result, its performance is
/// size-independent.
@ -97,7 +97,7 @@ class ByteBuffer final {
~ByteBuffer() {
if (buffer_) {
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer_);
grpc_byte_buffer_destroy(buffer_);
}
}
@ -110,7 +110,7 @@ class ByteBuffer final {
}
if (buf.buffer_) {
// then copy
buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buf.buffer_);
buffer_ = grpc_byte_buffer_copy(buf.buffer_);
}
return *this;
}
@ -128,7 +128,7 @@ class ByteBuffer final {
/// Remove all data.
void Clear() {
if (buffer_) {
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer_);
grpc_byte_buffer_destroy(buffer_);
buffer_ = nullptr;
}
}
@ -138,9 +138,7 @@ class ByteBuffer final {
/// bbuf.Duplicate(); is equivalent to bbuf=bbuf; but is actually readable.
/// This is not a deep copy; it is a referencing and its performance
/// is size-independent.
void Duplicate() {
buffer_ = g_core_codegen_interface->grpc_byte_buffer_copy(buffer_);
}
void Duplicate() { buffer_ = grpc_byte_buffer_copy(buffer_); }
/// Forget underlying byte buffer without destroying
/// Use this only for un-owned byte buffers
@ -148,9 +146,7 @@ class ByteBuffer final {
/// Buffer size in bytes.
size_t Length() const {
return buffer_ == nullptr
? 0
: g_core_codegen_interface->grpc_byte_buffer_length(buffer_);
return buffer_ == nullptr ? 0 : grpc_byte_buffer_length(buffer_);
}
/// Swap the state of *this and *other.

@ -21,6 +21,7 @@
#include <functional>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/codegen/channel_interface.h>
@ -83,7 +84,7 @@ class CallbackWithStatusTag : public grpc_completion_queue_functor {
CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f,
CompletionQueueTag* ops)
: call_(call), func_(std::move(f)), ops_(ops) {
g_core_codegen_interface->grpc_call_ref(call);
grpc_call_ref(call);
functor_run = &CallbackWithStatusTag::StaticRun;
// A client-side callback should never be run inline since they will always
// have work to do from the user application. So, set the parent's
@ -125,7 +126,7 @@ class CallbackWithStatusTag : public grpc_completion_queue_functor {
func_ = nullptr; // reset to clear this out for sure
status_ = Status(); // reset to clear this out for sure
CatchingCallback(std::move(func), std::move(status));
g_core_codegen_interface->grpc_call_unref(call_);
grpc_call_unref(call_);
}
};
@ -162,7 +163,7 @@ class CallbackWithSuccessTag : public grpc_completion_queue_functor {
void Set(grpc_call* call, std::function<void(bool)> f,
CompletionQueueTag* ops, bool can_inline) {
GPR_CODEGEN_ASSERT(call_ == nullptr);
g_core_codegen_interface->grpc_call_ref(call);
grpc_call_ref(call);
call_ = call;
func_ = std::move(f);
ops_ = ops;
@ -175,7 +176,7 @@ class CallbackWithSuccessTag : public grpc_completion_queue_functor {
grpc_call* call = call_;
call_ = nullptr;
func_ = nullptr;
g_core_codegen_interface->grpc_call_unref(call);
grpc_call_unref(call);
}
}

@ -22,6 +22,7 @@
#include <atomic>
#include <functional>
#include <grpc/grpc.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
@ -84,9 +85,8 @@ class CallbackUnaryCallImpl {
grpc::internal::CallbackWithStatusTag tag;
};
const size_t alloc_sz = sizeof(OpSetAndTag);
auto* const alloced = static_cast<OpSetAndTag*>(
grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(),
alloc_sz));
auto* const alloced =
static_cast<OpSetAndTag*>(grpc_call_arena_alloc(call.call(), alloc_sz));
auto* ops = new (&alloced->opset) FullCallOpSet;
auto* tag = new (&alloced->tag)
grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
@ -649,7 +649,7 @@ class ClientCallbackReaderWriterImpl
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackReaderWriterImpl();
grpc::g_core_codegen_interface->grpc_call_unref(call);
grpc_call_unref(call);
if (GPR_LIKELY(from_reaction)) {
reactor->OnDone(s);
} else {
@ -712,8 +712,8 @@ class ClientCallbackReaderWriterFactory {
grpc::internal::Call call =
channel->CreateCall(method, context, channel->CallbackCQ());
grpc::g_core_codegen_interface->grpc_call_ref(call.call());
new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
grpc_call_ref(call.call());
new (grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
ClientCallbackReaderWriterImpl<Request, Response>(call, context,
reactor);
@ -822,7 +822,7 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackReaderImpl();
grpc::g_core_codegen_interface->grpc_call_unref(call);
grpc_call_unref(call);
if (GPR_LIKELY(from_reaction)) {
reactor->OnDone(s);
} else {
@ -872,9 +872,9 @@ class ClientCallbackReaderFactory {
grpc::internal::Call call =
channel->CreateCall(method, context, channel->CallbackCQ());
grpc::g_core_codegen_interface->grpc_call_ref(call.call());
new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
grpc_call_ref(call.call());
new (grpc_call_arena_alloc(call.call(),
sizeof(ClientCallbackReaderImpl<Response>)))
ClientCallbackReaderImpl<Response>(call, context, request, reactor);
}
};
@ -1040,7 +1040,7 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackWriterImpl();
grpc::g_core_codegen_interface->grpc_call_unref(call);
grpc_call_unref(call);
if (GPR_LIKELY(from_reaction)) {
reactor->OnDone(s);
} else {
@ -1101,9 +1101,9 @@ class ClientCallbackWriterFactory {
grpc::internal::Call call =
channel->CreateCall(method, context, channel->CallbackCQ());
grpc::g_core_codegen_interface->grpc_call_ref(call.call());
new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
grpc_call_ref(call.call());
new (grpc_call_arena_alloc(call.call(),
sizeof(ClientCallbackWriterImpl<Request>)))
ClientCallbackWriterImpl<Request>(call, context, response, reactor);
}
};
@ -1175,7 +1175,7 @@ class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackUnaryImpl();
grpc::g_core_codegen_interface->grpc_call_unref(call);
grpc_call_unref(call);
reactor->OnDone(s);
}
}
@ -1212,10 +1212,9 @@ class ClientCallbackUnaryFactory {
grpc::internal::Call call =
channel->CreateCall(method, context, channel->CallbackCQ());
grpc::g_core_codegen_interface->grpc_call_ref(call.call());
grpc_call_ref(call.call());
new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call.call(), sizeof(ClientCallbackUnaryImpl)))
new (grpc_call_arena_alloc(call.call(), sizeof(ClientCallbackUnaryImpl)))
ClientCallbackUnaryImpl(call, context,
static_cast<const BaseRequest*>(request),
static_cast<BaseResponse*>(response), reactor);

@ -19,6 +19,7 @@
#ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
#define GRPCPP_SUPPORT_METHOD_HANDLER_H
#include <grpc/byte_buffer.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/support/byte_buffer.h>
@ -120,8 +121,8 @@ class RpcMethodHandler : public grpc::internal::MethodHandler {
void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
grpc::Status* status, void** /*handler_data*/) final {
auto* request = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(RequestType))) RequestType;
auto* request =
new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType;
return UnaryDeserializeHelper(req, status,
static_cast<BaseRequestType*>(request));
}
@ -229,8 +230,8 @@ class ServerStreamingHandler : public grpc::internal::MethodHandler {
grpc::Status* status, void** /*handler_data*/) final {
grpc::ByteBuffer buf;
buf.set_buffer(req);
auto* request = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(RequestType))) RequestType();
auto* request =
new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
*status =
grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
buf.Release();
@ -384,7 +385,7 @@ class ErrorMethodHandler : public grpc::internal::MethodHandler {
grpc::Status* /*status*/, void** /*handler_data*/) final {
// We have to destroy any request payload
if (req != nullptr) {
grpc::g_core_codegen_interface->grpc_byte_buffer_destroy(req);
grpc_byte_buffer_destroy(req);
}
return nullptr;
}

@ -21,6 +21,7 @@
#include <type_traits>
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/slice.h>
@ -52,8 +53,7 @@ class ProtoBufferReader : public grpc::protobuf::io::ZeroCopyInputStream {
/// Implemented through a grpc_byte_buffer_reader which iterates
/// over the slices that make up a byte buffer
if (!buffer->Valid() ||
!g_core_codegen_interface->grpc_byte_buffer_reader_init(
&reader_, buffer->c_buffer())) {
!grpc_byte_buffer_reader_init(&reader_, buffer->c_buffer())) {
status_ = Status(StatusCode::INTERNAL,
"Couldn't initialize byte buffer reader");
}
@ -61,7 +61,7 @@ class ProtoBufferReader : public grpc::protobuf::io::ZeroCopyInputStream {
~ProtoBufferReader() override {
if (status_.ok()) {
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_);
grpc_byte_buffer_reader_destroy(&reader_);
}
}
@ -81,8 +81,7 @@ class ProtoBufferReader : public grpc::protobuf::io::ZeroCopyInputStream {
return true;
}
/// Otherwise get the next slice from the byte buffer reader
if (!g_core_codegen_interface->grpc_byte_buffer_reader_peek(&reader_,
&slice_)) {
if (!grpc_byte_buffer_reader_peek(&reader_, &slice_)) {
return false;
}
*data = GRPC_SLICE_START_PTR(*slice_);

@ -21,6 +21,7 @@
#include <type_traits>
#include <grpc/byte_buffer.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/slice.h>
#include <grpcpp/impl/codegen/config_protobuf.h>
@ -64,8 +65,7 @@ class ProtoBufferWriter : public grpc::protobuf::io::ZeroCopyOutputStream {
have_backup_(false) {
GPR_CODEGEN_ASSERT(!byte_buffer->Valid());
/// Create an empty raw byte buffer and look at its underlying slice buffer
grpc_byte_buffer* bp =
g_core_codegen_interface->grpc_raw_byte_buffer_create(nullptr, 0);
grpc_byte_buffer* bp = grpc_raw_byte_buffer_create(nullptr, 0);
byte_buffer->set_buffer(bp);
slice_buffer_ = &bp->data.raw.slice_buffer;
}

@ -90,8 +90,7 @@ class Slice final {
/// different (e.g., if data is part of a larger structure that must be
/// destroyed when the data is no longer needed)
Slice(void* buf, size_t len, void (*destroy)(void*), void* user_data)
: slice_(g_core_codegen_interface->grpc_slice_new_with_user_data(
buf, len, destroy, user_data)) {}
: slice_(grpc_slice_new_with_user_data(buf, len, destroy, user_data)) {}
/// Specialization of above for common case where buf == user_data
Slice(void* buf, size_t len, void (*destroy)(void*))
@ -99,8 +98,7 @@ class Slice final {
/// Similar to the above but has a destroy that also takes slice length
Slice(void* buf, size_t len, void (*destroy)(void*, size_t))
: slice_(g_core_codegen_interface->grpc_slice_new_with_len(buf, len,
destroy)) {}
: slice_(grpc_slice_new_with_len(buf, len, destroy)) {}
/// Byte size.
size_t size() const { return GRPC_SLICE_LENGTH(slice_); }

@ -36,80 +36,6 @@
namespace grpc {
grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer* bb) {
return ::grpc_byte_buffer_copy(bb);
}
void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}
size_t CoreCodegen::grpc_byte_buffer_length(grpc_byte_buffer* bb) {
return ::grpc_byte_buffer_length(bb);
}
grpc_call_error CoreCodegen::grpc_call_start_batch(grpc_call* call,
const grpc_op* ops,
size_t nops, void* tag,
void* reserved) {
return ::grpc_call_start_batch(call, ops, nops, tag, reserved);
}
grpc_call_error CoreCodegen::grpc_call_cancel_with_status(
grpc_call* call, grpc_status_code status, const char* description,
void* reserved) {
return ::grpc_call_cancel_with_status(call, status, description, reserved);
}
int CoreCodegen::grpc_call_failed_before_recv_message(const grpc_call* c) {
return ::grpc_call_failed_before_recv_message(c);
}
void CoreCodegen::grpc_call_ref(grpc_call* call) { ::grpc_call_ref(call); }
void CoreCodegen::grpc_call_unref(grpc_call* call) { ::grpc_call_unref(call); }
void* CoreCodegen::grpc_call_arena_alloc(grpc_call* call, size_t length) {
return ::grpc_call_arena_alloc(call, length);
}
const char* CoreCodegen::grpc_call_error_to_string(grpc_call_error error) {
return ::grpc_call_error_to_string(error);
}
int CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
return ::grpc_byte_buffer_reader_init(reader, buffer);
}
void CoreCodegen::grpc_byte_buffer_reader_destroy(
grpc_byte_buffer_reader* reader) {
::grpc_byte_buffer_reader_destroy(reader);
}
int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
grpc_slice* slice) {
return ::grpc_byte_buffer_reader_next(reader, slice);
}
int CoreCodegen::grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,
grpc_slice** slice) {
return ::grpc_byte_buffer_reader_peek(reader, slice);
}
grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) {
return ::grpc_raw_byte_buffer_create(slice, nslices);
}
grpc_slice CoreCodegen::grpc_slice_new_with_user_data(void* p, size_t len,
void (*destroy)(void*),
void* user_data) {
return ::grpc_slice_new_with_user_data(p, len, destroy, user_data);
}
grpc_slice CoreCodegen::grpc_slice_new_with_len(void* p, size_t len,
void (*destroy)(void*,
size_t)) {
return ::grpc_slice_new_with_len(p, len, destroy);
}
grpc_slice CoreCodegen::grpc_empty_slice() { return ::grpc_empty_slice(); }
grpc_slice CoreCodegen::grpc_slice_malloc(size_t length) {

@ -22,6 +22,7 @@
#include <benchmark/benchmark.h>
#include <grpc/byte_buffer.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/support/byte_buffer.h>
@ -58,24 +59,20 @@ static void BM_ByteBufferReader_Next(benchmark::State& state) {
slices.emplace_back(g_core_codegen_interface->grpc_slice_from_copied_buffer(
buf.get(), kSliceSize));
}
grpc_byte_buffer* bb = g_core_codegen_interface->grpc_raw_byte_buffer_create(
slices.data(), num_slices);
grpc_byte_buffer* bb = grpc_raw_byte_buffer_create(slices.data(), num_slices);
grpc_byte_buffer_reader reader;
GPR_ASSERT(
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb));
for (auto _ : state) {
grpc_slice* slice;
if (GPR_UNLIKELY(!g_core_codegen_interface->grpc_byte_buffer_reader_peek(
&reader, &slice))) {
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
GPR_ASSERT(
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
if (GPR_UNLIKELY(!grpc_byte_buffer_reader_peek(&reader, &slice))) {
grpc_byte_buffer_reader_destroy(&reader);
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb));
continue;
}
}
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
g_core_codegen_interface->grpc_byte_buffer_destroy(bb);
grpc_byte_buffer_reader_destroy(&reader);
grpc_byte_buffer_destroy(bb);
for (auto& slice : slices) {
g_core_codegen_interface->grpc_slice_unref(slice);
}
@ -91,24 +88,20 @@ static void BM_ByteBufferReader_Peek(benchmark::State& state) {
slices.emplace_back(g_core_codegen_interface->grpc_slice_from_copied_buffer(
buf.get(), kSliceSize));
}
grpc_byte_buffer* bb = g_core_codegen_interface->grpc_raw_byte_buffer_create(
slices.data(), num_slices);
grpc_byte_buffer* bb = grpc_raw_byte_buffer_create(slices.data(), num_slices);
grpc_byte_buffer_reader reader;
GPR_ASSERT(
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb));
for (auto _ : state) {
grpc_slice* slice;
if (GPR_UNLIKELY(!g_core_codegen_interface->grpc_byte_buffer_reader_peek(
&reader, &slice))) {
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
GPR_ASSERT(
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader, bb));
if (GPR_UNLIKELY(!grpc_byte_buffer_reader_peek(&reader, &slice))) {
grpc_byte_buffer_reader_destroy(&reader);
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb));
continue;
}
}
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader);
g_core_codegen_interface->grpc_byte_buffer_destroy(bb);
grpc_byte_buffer_reader_destroy(&reader);
grpc_byte_buffer_destroy(bb);
for (auto& slice : slices) {
g_core_codegen_interface->grpc_slice_unref(slice);
}

Loading…
Cancel
Save