Remove `include/grpcpp/impl/codegen/method_handler.h` (#31249)

pull/31268/head
Cheng-Yu Chung 2 years ago committed by GitHub
parent e00c942931
commit d76c6f61f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 385
      include/grpcpp/impl/codegen/method_handler.h
  2. 2
      include/grpcpp/impl/method_handler_impl.h
  3. 382
      include/grpcpp/support/method_handler.h
  4. 4
      src/compiler/cpp_generator.cc
  5. 2
      test/cpp/codegen/compiler_test_golden

@ -19,388 +19,9 @@
#ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
#define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
// IWYU pragma: private, include <grpcpp/support/method_handler.h>
// IWYU pragma: private
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/rpc_service_method.h>
#include <grpcpp/impl/codegen/sync_stream.h>
#include <grpcpp/support/byte_buffer.h>
namespace grpc {
namespace internal {
// Invoke the method handler, fill in the status, and
// return whether or not we finished safely (without an exception).
// Note that exception handling is 0-cost in most compiler/library
// implementations (except when an exception is actually thrown),
// so this process doesn't require additional overhead in the common case.
// Additionally, we don't need to return if we caught an exception or not;
// the handling is the same in either case.
template <class Callable>
::grpc::Status CatchingFunctionHandler(Callable&& handler) {
#if GRPC_ALLOW_EXCEPTIONS
try {
return handler();
} catch (...) {
return grpc::Status(grpc::StatusCode::UNKNOWN,
"Unexpected error in RPC handling");
}
#else // GRPC_ALLOW_EXCEPTIONS
return handler();
#endif // GRPC_ALLOW_EXCEPTIONS
}
/// A helper function with reduced templating to do the common work needed to
/// actually send the server response. Uses non-const parameter for Status since
/// this should only ever be called from the end of the RunHandler method.
template <class ResponseType>
void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
ResponseType* rsp, grpc::Status& status) {
GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpSendMessage,
grpc::internal::CallOpServerSendStatus>
ops;
ops.SendInitialMetadata(&param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
if (status.ok()) {
status = ops.SendMessagePtr(rsp);
}
ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
/// A helper function with reduced templating to do deserializing.
template <class RequestType>
void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status,
RequestType* request) {
grpc::ByteBuffer buf;
buf.set_buffer(req);
*status = grpc::SerializationTraits<RequestType>::Deserialize(
&buf, static_cast<RequestType*>(request));
buf.Release();
if (status->ok()) {
return request;
}
request->~RequestType();
return nullptr;
}
/// A wrapper class of an application provided rpc method handler.
template <class ServiceType, class RequestType, class ResponseType,
class BaseRequestType = RequestType,
class BaseResponseType = ResponseType>
class RpcMethodHandler : public grpc::internal::MethodHandler {
public:
RpcMethodHandler(
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
const RequestType*, ResponseType*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
ResponseType rsp;
grpc::Status status = param.status;
if (status.ok()) {
status = CatchingFunctionHandler([this, &param, &rsp] {
return func_(service_,
static_cast<grpc::ServerContext*>(param.server_context),
static_cast<RequestType*>(param.request), &rsp);
});
static_cast<RequestType*>(param.request)->~RequestType();
}
UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
}
void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
grpc::Status* status, void** /*handler_data*/) final {
auto* request = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(RequestType))) RequestType;
return UnaryDeserializeHelper(req, status,
static_cast<BaseRequestType*>(request));
}
private:
/// Application provided rpc handler function.
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
const RequestType*, ResponseType*)>
func_;
// The class the above handler function lives in.
ServiceType* service_;
};
/// A wrapper class of an application provided client streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class ClientStreamingHandler : public grpc::internal::MethodHandler {
public:
ClientStreamingHandler(
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
ServerReader<RequestType>*, ResponseType*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
ServerReader<RequestType> reader(
param.call, static_cast<grpc::ServerContext*>(param.server_context));
ResponseType rsp;
grpc::Status status =
CatchingFunctionHandler([this, &param, &reader, &rsp] {
return func_(service_,
static_cast<grpc::ServerContext*>(param.server_context),
&reader, &rsp);
});
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpSendMessage,
grpc::internal::CallOpServerSendStatus>
ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(&param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
}
if (status.ok()) {
status = ops.SendMessagePtr(&rsp);
}
ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
ServerReader<RequestType>*, ResponseType*)>
func_;
ServiceType* service_;
};
/// A wrapper class of an application provided server streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler : public grpc::internal::MethodHandler {
public:
ServerStreamingHandler(std::function<grpc::Status(
ServiceType*, grpc::ServerContext*,
const RequestType*, ServerWriter<ResponseType>*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
grpc::Status status = param.status;
if (status.ok()) {
ServerWriter<ResponseType> writer(
param.call, static_cast<grpc::ServerContext*>(param.server_context));
status = CatchingFunctionHandler([this, &param, &writer] {
return func_(service_,
static_cast<grpc::ServerContext*>(param.server_context),
static_cast<RequestType*>(param.request), &writer);
});
static_cast<RequestType*>(param.request)->~RequestType();
}
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus>
ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(&param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
}
ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
if (param.server_context->has_pending_ops_) {
param.call->cq()->Pluck(&param.server_context->pending_ops_);
}
param.call->cq()->Pluck(&ops);
}
void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
grpc::Status* status, void** /*handler_data*/) final {
grpc::ByteBuffer buf;
buf.set_buffer(req);
auto* request = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(RequestType))) RequestType();
*status =
grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
buf.Release();
if (status->ok()) {
return request;
}
request->~RequestType();
return nullptr;
}
private:
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
const RequestType*, ServerWriter<ResponseType>*)>
func_;
ServiceType* service_;
};
/// A wrapper class of an application provided bidi-streaming handler.
/// This also applies to server-streamed implementation of a unary method
/// with the additional requirement that such methods must have done a
/// write for status to be ok
/// Since this is used by more than 1 class, the service is not passed in.
/// Instead, it is expected to be an implicitly-captured argument of func
/// (through bind or something along those lines)
template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
public:
explicit TemplatedBidiStreamingHandler(
std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
: func_(func), write_needed_(WriteNeeded) {}
void RunHandler(const HandlerParameter& param) final {
Streamer stream(param.call,
static_cast<grpc::ServerContext*>(param.server_context));
grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
return func_(static_cast<grpc::ServerContext*>(param.server_context),
&stream);
});
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus>
ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(&param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
if (write_needed_ && status.ok()) {
// If we needed a write but never did one, we need to mark the
// status as a fail
status = grpc::Status(grpc::StatusCode::INTERNAL,
"Service did not provide response message");
}
}
ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
if (param.server_context->has_pending_ops_) {
param.call->cq()->Pluck(&param.server_context->pending_ops_);
}
param.call->cq()->Pluck(&ops);
}
private:
std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
const bool write_needed_;
};
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler
: public TemplatedBidiStreamingHandler<
ServerReaderWriter<ResponseType, RequestType>, false> {
public:
BidiStreamingHandler(std::function<grpc::Status(
ServiceType*, grpc::ServerContext*,
ServerReaderWriter<ResponseType, RequestType>*)>
func,
ServiceType* service)
// TODO(vjpai): When gRPC supports C++14, move-capture func in the below
: TemplatedBidiStreamingHandler<
ServerReaderWriter<ResponseType, RequestType>, false>(
[func, service](
grpc::ServerContext* ctx,
ServerReaderWriter<ResponseType, RequestType>* streamer) {
return func(service, ctx, streamer);
}) {}
};
template <class RequestType, class ResponseType>
class StreamedUnaryHandler
: public TemplatedBidiStreamingHandler<
ServerUnaryStreamer<RequestType, ResponseType>, true> {
public:
explicit StreamedUnaryHandler(
std::function<
grpc::Status(grpc::ServerContext*,
ServerUnaryStreamer<RequestType, ResponseType>*)>
func)
: TemplatedBidiStreamingHandler<
ServerUnaryStreamer<RequestType, ResponseType>, true>(
std::move(func)) {}
};
template <class RequestType, class ResponseType>
class SplitServerStreamingHandler
: public TemplatedBidiStreamingHandler<
ServerSplitStreamer<RequestType, ResponseType>, false> {
public:
explicit SplitServerStreamingHandler(
std::function<
grpc::Status(grpc::ServerContext*,
ServerSplitStreamer<RequestType, ResponseType>*)>
func)
: TemplatedBidiStreamingHandler<
ServerSplitStreamer<RequestType, ResponseType>, false>(
std::move(func)) {}
};
/// General method handler class for errors that prevent real method use
/// e.g., handle unknown method by returning UNIMPLEMENTED error.
template <grpc::StatusCode code>
class ErrorMethodHandler : public grpc::internal::MethodHandler {
public:
explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
template <class T>
static void FillOps(grpc::ServerContextBase* context,
const std::string& message, T* ops) {
grpc::Status status(code, message);
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(&context->initial_metadata_,
context->initial_metadata_flags());
if (context->compression_level_set()) {
ops->set_compression_level(context->compression_level());
}
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(&context->trailing_metadata_, status);
}
void RunHandler(const HandlerParameter& param) final {
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus>
ops;
FillOps(param.server_context, message_, &ops);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
grpc::Status* /*status*/, void** /*handler_data*/) final {
// We have to destroy any request payload
if (req != nullptr) {
grpc::g_core_codegen_interface->grpc_byte_buffer_destroy(req);
}
return nullptr;
}
private:
const std::string message_;
};
typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
UnknownMethodHandler;
typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED>
ResourceExhaustedHandler;
} // namespace internal
} // namespace grpc
/// TODO(chengyuc): Remove this file after solving compatibility.
#include <grpcpp/support/method_handler.h>
#endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H

@ -19,6 +19,6 @@
#ifndef GRPCPP_IMPL_METHOD_HANDLER_IMPL_H
#define GRPCPP_IMPL_METHOD_HANDLER_IMPL_H
#include <grpcpp/impl/codegen/method_handler.h>
#include <grpcpp/support/method_handler.h>
#endif // GRPCPP_IMPL_METHOD_HANDLER_IMPL_H

@ -19,6 +19,386 @@
#ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
#define GRPCPP_SUPPORT_METHOD_HANDLER_H
#include <grpcpp/impl/codegen/method_handler.h> // IWYU pragma: export
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/sync_stream.h>
namespace grpc {
namespace internal {
// Invoke the method handler, fill in the status, and
// return whether or not we finished safely (without an exception).
// Note that exception handling is 0-cost in most compiler/library
// implementations (except when an exception is actually thrown),
// so this process doesn't require additional overhead in the common case.
// Additionally, we don't need to return if we caught an exception or not;
// the handling is the same in either case.
template <class Callable>
::grpc::Status CatchingFunctionHandler(Callable&& handler) {
#if GRPC_ALLOW_EXCEPTIONS
try {
return handler();
} catch (...) {
return grpc::Status(grpc::StatusCode::UNKNOWN,
"Unexpected error in RPC handling");
}
#else // GRPC_ALLOW_EXCEPTIONS
return handler();
#endif // GRPC_ALLOW_EXCEPTIONS
}
/// A helper function with reduced templating to do the common work needed to
/// actually send the server response. Uses non-const parameter for Status since
/// this should only ever be called from the end of the RunHandler method.
template <class ResponseType>
void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
ResponseType* rsp, grpc::Status& status) {
GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpSendMessage,
grpc::internal::CallOpServerSendStatus>
ops;
ops.SendInitialMetadata(&param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
if (status.ok()) {
status = ops.SendMessagePtr(rsp);
}
ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
/// A helper function with reduced templating to do deserializing.
template <class RequestType>
void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status,
RequestType* request) {
grpc::ByteBuffer buf;
buf.set_buffer(req);
*status = grpc::SerializationTraits<RequestType>::Deserialize(
&buf, static_cast<RequestType*>(request));
buf.Release();
if (status->ok()) {
return request;
}
request->~RequestType();
return nullptr;
}
/// A wrapper class of an application provided rpc method handler.
template <class ServiceType, class RequestType, class ResponseType,
class BaseRequestType = RequestType,
class BaseResponseType = ResponseType>
class RpcMethodHandler : public grpc::internal::MethodHandler {
public:
RpcMethodHandler(
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
const RequestType*, ResponseType*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
ResponseType rsp;
grpc::Status status = param.status;
if (status.ok()) {
status = CatchingFunctionHandler([this, &param, &rsp] {
return func_(service_,
static_cast<grpc::ServerContext*>(param.server_context),
static_cast<RequestType*>(param.request), &rsp);
});
static_cast<RequestType*>(param.request)->~RequestType();
}
UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
}
void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
grpc::Status* status, void** /*handler_data*/) final {
auto* request = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(RequestType))) RequestType;
return UnaryDeserializeHelper(req, status,
static_cast<BaseRequestType*>(request));
}
private:
/// Application provided rpc handler function.
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
const RequestType*, ResponseType*)>
func_;
// The class the above handler function lives in.
ServiceType* service_;
};
/// A wrapper class of an application provided client streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class ClientStreamingHandler : public grpc::internal::MethodHandler {
public:
ClientStreamingHandler(
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
ServerReader<RequestType>*, ResponseType*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
ServerReader<RequestType> reader(
param.call, static_cast<grpc::ServerContext*>(param.server_context));
ResponseType rsp;
grpc::Status status =
CatchingFunctionHandler([this, &param, &reader, &rsp] {
return func_(service_,
static_cast<grpc::ServerContext*>(param.server_context),
&reader, &rsp);
});
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpSendMessage,
grpc::internal::CallOpServerSendStatus>
ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(&param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
}
if (status.ok()) {
status = ops.SendMessagePtr(&rsp);
}
ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
private:
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
ServerReader<RequestType>*, ResponseType*)>
func_;
ServiceType* service_;
};
/// A wrapper class of an application provided server streaming handler.
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler : public grpc::internal::MethodHandler {
public:
ServerStreamingHandler(std::function<grpc::Status(
ServiceType*, grpc::ServerContext*,
const RequestType*, ServerWriter<ResponseType>*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
grpc::Status status = param.status;
if (status.ok()) {
ServerWriter<ResponseType> writer(
param.call, static_cast<grpc::ServerContext*>(param.server_context));
status = CatchingFunctionHandler([this, &param, &writer] {
return func_(service_,
static_cast<grpc::ServerContext*>(param.server_context),
static_cast<RequestType*>(param.request), &writer);
});
static_cast<RequestType*>(param.request)->~RequestType();
}
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus>
ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(&param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
}
ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
if (param.server_context->has_pending_ops_) {
param.call->cq()->Pluck(&param.server_context->pending_ops_);
}
param.call->cq()->Pluck(&ops);
}
void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
grpc::Status* status, void** /*handler_data*/) final {
grpc::ByteBuffer buf;
buf.set_buffer(req);
auto* request = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(RequestType))) RequestType();
*status =
grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
buf.Release();
if (status->ok()) {
return request;
}
request->~RequestType();
return nullptr;
}
private:
std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
const RequestType*, ServerWriter<ResponseType>*)>
func_;
ServiceType* service_;
};
/// A wrapper class of an application provided bidi-streaming handler.
/// This also applies to server-streamed implementation of a unary method
/// with the additional requirement that such methods must have done a
/// write for status to be ok
/// Since this is used by more than 1 class, the service is not passed in.
/// Instead, it is expected to be an implicitly-captured argument of func
/// (through bind or something along those lines)
template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
public:
explicit TemplatedBidiStreamingHandler(
std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
: func_(func), write_needed_(WriteNeeded) {}
void RunHandler(const HandlerParameter& param) final {
Streamer stream(param.call,
static_cast<grpc::ServerContext*>(param.server_context));
grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
return func_(static_cast<grpc::ServerContext*>(param.server_context),
&stream);
});
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus>
ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(&param.server_context->initial_metadata_,
param.server_context->initial_metadata_flags());
if (param.server_context->compression_level_set()) {
ops.set_compression_level(param.server_context->compression_level());
}
if (write_needed_ && status.ok()) {
// If we needed a write but never did one, we need to mark the
// status as a fail
status = grpc::Status(grpc::StatusCode::INTERNAL,
"Service did not provide response message");
}
}
ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
if (param.server_context->has_pending_ops_) {
param.call->cq()->Pluck(&param.server_context->pending_ops_);
}
param.call->cq()->Pluck(&ops);
}
private:
std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
const bool write_needed_;
};
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler
: public TemplatedBidiStreamingHandler<
ServerReaderWriter<ResponseType, RequestType>, false> {
public:
BidiStreamingHandler(std::function<grpc::Status(
ServiceType*, grpc::ServerContext*,
ServerReaderWriter<ResponseType, RequestType>*)>
func,
ServiceType* service)
// TODO(vjpai): When gRPC supports C++14, move-capture func in the below
: TemplatedBidiStreamingHandler<
ServerReaderWriter<ResponseType, RequestType>, false>(
[func, service](
grpc::ServerContext* ctx,
ServerReaderWriter<ResponseType, RequestType>* streamer) {
return func(service, ctx, streamer);
}) {}
};
template <class RequestType, class ResponseType>
class StreamedUnaryHandler
: public TemplatedBidiStreamingHandler<
ServerUnaryStreamer<RequestType, ResponseType>, true> {
public:
explicit StreamedUnaryHandler(
std::function<
grpc::Status(grpc::ServerContext*,
ServerUnaryStreamer<RequestType, ResponseType>*)>
func)
: TemplatedBidiStreamingHandler<
ServerUnaryStreamer<RequestType, ResponseType>, true>(
std::move(func)) {}
};
template <class RequestType, class ResponseType>
class SplitServerStreamingHandler
: public TemplatedBidiStreamingHandler<
ServerSplitStreamer<RequestType, ResponseType>, false> {
public:
explicit SplitServerStreamingHandler(
std::function<
grpc::Status(grpc::ServerContext*,
ServerSplitStreamer<RequestType, ResponseType>*)>
func)
: TemplatedBidiStreamingHandler<
ServerSplitStreamer<RequestType, ResponseType>, false>(
std::move(func)) {}
};
/// General method handler class for errors that prevent real method use
/// e.g., handle unknown method by returning UNIMPLEMENTED error.
template <grpc::StatusCode code>
class ErrorMethodHandler : public grpc::internal::MethodHandler {
public:
explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
template <class T>
static void FillOps(grpc::ServerContextBase* context,
const std::string& message, T* ops) {
grpc::Status status(code, message);
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(&context->initial_metadata_,
context->initial_metadata_flags());
if (context->compression_level_set()) {
ops->set_compression_level(context->compression_level());
}
context->sent_initial_metadata_ = true;
}
ops->ServerSendStatus(&context->trailing_metadata_, status);
}
void RunHandler(const HandlerParameter& param) final {
grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
grpc::internal::CallOpServerSendStatus>
ops;
FillOps(param.server_context, message_, &ops);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
grpc::Status* /*status*/, void** /*handler_data*/) final {
// We have to destroy any request payload
if (req != nullptr) {
grpc::g_core_codegen_interface->grpc_byte_buffer_destroy(req);
}
return nullptr;
}
private:
const std::string message_;
};
typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
UnknownMethodHandler;
typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED>
ResourceExhaustedHandler;
} // namespace internal
} // namespace grpc
#endif // GRPCPP_SUPPORT_METHOD_HANDLER_H

@ -144,7 +144,7 @@ std::string GetHeaderIncludes(grpc_generator::File* file,
"grpcpp/client_context.h",
"grpcpp/impl/codegen/completion_queue.h",
"grpcpp/support/message_allocator.h",
"grpcpp/impl/codegen/method_handler.h",
"grpcpp/support/method_handler.h",
"grpcpp/impl/codegen/proto_utils.h",
"grpcpp/impl/codegen/rpc_method.h",
"grpcpp/impl/codegen/server_callback.h",
@ -1654,7 +1654,7 @@ std::string GetSourceIncludes(grpc_generator::File* file,
"grpcpp/impl/codegen/client_unary_call.h",
"grpcpp/support/client_callback.h",
"grpcpp/support/message_allocator.h",
"grpcpp/impl/codegen/method_handler.h",
"grpcpp/support/method_handler.h",
"grpcpp/impl/codegen/rpc_service_method.h",
"grpcpp/impl/codegen/server_callback.h",
"grpcpp/impl/codegen/server_callback_handlers.h",

@ -34,7 +34,7 @@
#include <grpcpp/client_context.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/support/message_allocator.h>
#include <grpcpp/impl/codegen/method_handler.h>
#include <grpcpp/support/method_handler.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/impl/codegen/rpc_method.h>
#include <grpcpp/impl/codegen/server_callback.h>

Loading…
Cancel
Save