Clarify serialization traits interface

pull/1969/head
Craig Tiller 10 years ago
parent 6ef7f31b98
commit bd277cb3cb
  1. 7
      include/grpc++/async_unary_call.h
  2. 4
      include/grpc++/byte_buffer.h
  3. 4
      include/grpc++/impl/call.h
  4. 8
      include/grpc++/impl/client_unary_call.h
  5. 8
      include/grpc++/impl/proto_utils.h
  6. 8
      include/grpc++/impl/rpc_service_method.h
  7. 23
      include/grpc++/impl/serialization_traits.h
  8. 24
      include/grpc++/stream.h
  9. 4
      src/cpp/proto/proto_utils.cc

@ -64,7 +64,7 @@ class ClientAsyncResponseReader GRPC_FINAL
: context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(init_buf_.SendMessage(request));
GPR_ASSERT(init_buf_.SendMessage(request).IsOk());
init_buf_.ClientSendClose();
call_.PerformOps(&init_buf_);
}
@ -120,10 +120,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk() && !finish_buf_.SendMessage(msg)) {
if (status.IsOk()) {
finish_buf_.ServerSendStatus(
ctx_->trailing_metadata_,
Status(INVALID_ARGUMENT, "Failed to serialize message"));
ctx_->trailing_metadata_, finish_buf_.SendMessage(msg));
} else {
finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
}

@ -90,11 +90,11 @@ class SerializationTraits<ByteBuffer, void> {
dest->set_buffer(byte_buffer);
return Status::OK;
}
static bool Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer,
static Status Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer,
bool* own_buffer) {
*buffer = source.buffer();
*own_buffer = false;
return true;
return Status::OK;
}
};

@ -98,7 +98,7 @@ class CallOpSendMessage {
CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {}
template <class M>
bool SendMessage(const M& message) GRPC_MUST_USE_RESULT;
Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
protected:
void AddOp(grpc_op* ops, size_t* nops) {
@ -118,7 +118,7 @@ class CallOpSendMessage {
};
template <class M>
bool CallOpSendMessage::SendMessage(const M& message) {
Status CallOpSendMessage::SendMessage(const M& message) {
return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
}

@ -56,11 +56,11 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
CallOpClientSendClose, CallOpClientRecvStatus> ops;
Status status;
ops.SendInitialMetadata(context->send_initial_metadata_);
if (!ops.SendMessage(request)) {
return Status(INVALID_ARGUMENT, "Failed to serialize message");
Status status = ops.SendMessage(request);
if (!status.IsOk()) {
return status;
}
ops.SendInitialMetadata(context->send_initial_metadata_);
ops.RecvInitialMetadata(context);
ops.RecvMessage(result);
ops.ClientSendClose();

@ -47,8 +47,8 @@ namespace grpc {
// Serialize the msg into a buffer created inside the function. The caller
// should destroy the returned buffer when done with it. If serialization fails,
// false is returned and buffer is left unchanged.
bool SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer);
Status SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer);
// The caller keeps ownership of buffer and msg.
Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
@ -58,8 +58,8 @@ template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc::protobuf::Message, T>::value>::type> {
public:
static bool Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer, bool* own_buffer) {
static Status Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer, bool* own_buffer) {
*own_buffer = true;
return SerializeProto(msg, buffer);
}

@ -94,9 +94,7 @@ class RpcMethodHandler : public MethodHandler {
CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.IsOk()) {
if (!ops.SendMessage(rsp)) {
status = Status(INTERNAL, "Failed to serialize response");
}
status = ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
@ -131,9 +129,7 @@ class ClientStreamingHandler : public MethodHandler {
CallOpServerSendStatus> ops;
ops.SendInitialMetadata(param.server_context->initial_metadata_);
if (status.IsOk()) {
if (!ops.SendMessage(rsp)) {
status = Status(INTERNAL, "Failed to serialize response");
}
status = ops.SendMessage(rsp);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);

@ -38,6 +38,29 @@ struct grpc_byte_buffer;
namespace grpc {
/// Defines how to serialize and deserialize some type.
///
/// Used for hooking different message serialization API's into GRPC.
/// Each SerializationTraits implementation must provide the following
/// functions:
/// static Status Serialize(const Message& msg,
/// grpc_byte_buffer** buffer,
// bool* own_buffer);
/// static Status Deserialize(grpc_byte_buffer* buffer,
/// Message* msg,
/// int max_message_size);
///
/// Serialize is required to convert message to a grpc_byte_buffer, and
/// to store a pointer to that byte buffer at *buffer. *own_buffer should
/// be set to true if the caller owns said byte buffer, or false if
/// ownership is retained elsewhere.
///
/// Deserialize is required to convert buffer into the message stored at
/// msg. max_message_size is passed in as a bound on the maximum number of
/// message bytes Deserialize should accept.
///
/// Both functions return a Status, allowing them to explain what went
/// wrong if required.
template <class Message,
class UnusedButHereForPartialTemplateSpecialization = void>
class SerializationTraits;

@ -101,7 +101,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
CallOpClientSendClose> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(ops.SendMessage(request));
GPR_ASSERT(ops.SendMessage(request).IsOk());
ops.ClientSendClose();
call_.PerformOps(&ops);
cq_.Pluck(&ops);
@ -170,7 +170,7 @@ class ClientWriter : public ClientWriterInterface<W> {
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg)) {
if (!ops.SendMessage(msg).IsOk()) {
return false;
}
call_.PerformOps(&ops);
@ -248,7 +248,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg)) return false;
if (!ops.SendMessage(msg).IsOk()) return false;
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@ -319,7 +319,7 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg)) {
if (!ops.SendMessage(msg).IsOk()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
@ -361,7 +361,7 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
bool Write(const W& msg) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg)) {
if (!ops.SendMessage(msg).IsOk()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
@ -422,7 +422,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(init_ops_.SendMessage(request));
GPR_ASSERT(init_ops_.SendMessage(request).IsOk());
init_ops_.ClientSendClose();
call_.PerformOps(&init_ops_);
}
@ -496,7 +496,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg));
GPR_ASSERT(write_ops_.SendMessage(msg).IsOk());
call_.PerformOps(&write_ops_);
}
@ -568,7 +568,7 @@ class ClientAsyncReaderWriter GRPC_FINAL
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg));
GPR_ASSERT(write_ops_.SendMessage(msg).IsOk());
call_.PerformOps(&write_ops_);
}
@ -627,10 +627,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk() && !finish_ops_.SendMessage(msg)) {
if (status.IsOk()) {
finish_ops_.ServerSendStatus(
ctx_->trailing_metadata_,
Status(INTERNAL, "Failed to serialize response"));
finish_ops_.SendMessage(msg));
} else {
finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
}
@ -682,7 +682,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg));
GPR_ASSERT(write_ops_.SendMessage(msg).IsOk());
call_.PerformOps(&write_ops_);
}
@ -737,7 +737,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg));
GPR_ASSERT(write_ops_.SendMessage(msg).IsOk());
call_.PerformOps(&write_ops_);
}

@ -152,9 +152,9 @@ class GrpcBufferReader GRPC_FINAL
namespace grpc {
bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
GrpcBufferWriter writer(bp);
return msg.SerializeToZeroCopyStream(&writer);
return msg.SerializeToZeroCopyStream(&writer) ? Status::OK : Status(INVALID_ARGUMENT, "Failed to serialize message");
}
Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,

Loading…
Cancel
Save