From 789471cfc64cd0200a53914939a0485846b5a80f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 4 Jun 2015 16:19:22 -0700 Subject: [PATCH] Client side compiles/links --- include/grpc++/async_unary_call.h | 10 +- include/grpc++/byte_buffer.h | 13 +- include/grpc++/client_context.h | 4 +- include/grpc++/impl/call.h | 178 ++++++++++++++++++++--- include/grpc++/impl/client_unary_call.h | 4 +- include/grpc++/impl/rpc_service_method.h | 8 +- include/grpc++/stream.h | 39 +++-- src/cpp/common/call.cc | 41 ++++-- test/cpp/end2end/generic_end2end_test.cc | 2 +- 9 files changed, 243 insertions(+), 56 deletions(-) diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h index d009ac6bbfe..3d475d06687 100644 --- a/include/grpc++/async_unary_call.h +++ b/include/grpc++/async_unary_call.h @@ -63,7 +63,8 @@ class ClientAsyncResponseReader GRPC_FINAL const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)) { init_buf_.SendInitialMetadata(context->send_initial_metadata_); - init_buf_.SendMessage(request); + // TODO(ctiller): don't assert + GPR_ASSERT(init_buf_.SendMessage(request)); init_buf_.ClientSendClose(); call_.PerformOps(&init_buf_); } @@ -117,10 +118,11 @@ class ServerAsyncResponseWriter GRPC_FINAL ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. - if (status.IsOk()) { - finish_buf_.SendMessage(msg); + if (status.IsOk() && !finish_buf_.SendMessage(msg)) { + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, Status(INVALID_ARGUMENT, "Failed to serialize message")); + } else { + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); } - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } diff --git a/include/grpc++/byte_buffer.h b/include/grpc++/byte_buffer.h index 80dedda94c6..618ec764dfe 100644 --- a/include/grpc++/byte_buffer.h +++ b/include/grpc++/byte_buffer.h @@ -62,6 +62,12 @@ class ByteBuffer GRPC_FINAL { void Clear(); size_t Length(); + private: + friend class SerializationTraits; + + ByteBuffer(const ByteBuffer&); + ByteBuffer& operator=(const ByteBuffer&); + // takes ownership void set_buffer(grpc_byte_buffer* buf) { if (buffer_) { @@ -71,9 +77,6 @@ class ByteBuffer GRPC_FINAL { buffer_ = buf; } - private: - friend class CallOpBuffer; - grpc_byte_buffer* buffer() const { return buffer_; } grpc_byte_buffer* buffer_; @@ -86,6 +89,10 @@ class SerializationTraits { dest->set_buffer(byte_buffer); return Status::OK; } + static bool Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer) { + *buffer = source.buffer(); + return true; + } }; } // namespace grpc diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index d444c9035f4..0af797322f9 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -48,7 +48,6 @@ struct grpc_completion_queue; namespace grpc { -class CallOpBuffer; class ChannelInterface; class CompletionQueue; class Credentials; @@ -115,7 +114,8 @@ class ClientContext { ClientContext(const ClientContext&); ClientContext& operator=(const ClientContext&); - friend class CallOpBuffer; + friend class CallOpClientRecvStatus; + friend class CallOpRecvInitialMetadata; friend class Channel; template friend class ::grpc::ClientReader; diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 3701e403de0..40dbf9e6417 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -35,6 +35,7 @@ #define GRPCXX_IMPL_CALL_H #include +#include #include #include #include @@ -51,6 +52,10 @@ namespace grpc { class ByteBuffer; class Call; +void FillMetadataMap(grpc_metadata_array* arr, + std::multimap* metadata); +grpc_metadata* FillMetadataArray(const std::multimap& metadata); + class CallNoOp { protected: void AddOp(grpc_op* ops, size_t* nops) {} @@ -59,11 +64,29 @@ class CallNoOp { class CallOpSendInitialMetadata { public: - void SendInitialMetadata(const std::multimap& metadata); + CallOpSendInitialMetadata() : send_(false) {} + + void SendInitialMetadata(const std::multimap& metadata) { + send_ = true; + initial_metadata_count_ = metadata.size(); + initial_metadata_ = FillMetadataArray(metadata); + } protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status, int max_message_size); + void AddOp(grpc_op* ops, size_t* nops) { + if (!send_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = initial_metadata_count_; + op->data.send_initial_metadata.metadata = initial_metadata_; + } + void FinishOp(void* tag, bool* status, int max_message_size) { + // nothing to do + } + + bool send_; + size_t initial_metadata_count_; + grpc_metadata* initial_metadata_; }; class CallOpSendMessage { @@ -71,7 +94,7 @@ class CallOpSendMessage { CallOpSendMessage() : send_buf_(nullptr) {} template - bool SendMessage(const M& message) { + bool SendMessage(const M& message) GRPC_MUST_USE_RESULT { return SerializationTraits::Serialize(message, &send_buf_); } @@ -132,50 +155,167 @@ class CallOpRecvMessage { class CallOpGenericRecvMessage { public: + CallOpGenericRecvMessage() : got_message(false) {} + template - void RecvMessage(R* message); + void RecvMessage(R* message) { + deserialize_ = [message](grpc_byte_buffer* buf, int max_message_size) -> Status { + return SerializationTraits::Deserialize(buf, message, max_message_size); + }; + } bool got_message; protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status, int max_message_size); + void AddOp(grpc_op* ops, size_t* nops) { + if (!deserialize_) return; + grpc_op *op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &recv_buf_; + } + + void FinishOp(void* tag, bool* status, int max_message_size) { + if (!deserialize_) return; + if (recv_buf_) { + if (*status) { + got_message = true; + *status = deserialize_(recv_buf_, max_message_size).IsOk(); + } else { + got_message = false; + grpc_byte_buffer_destroy(recv_buf_); + } + } else { + got_message = false; + *status = false; + } + } + + private: + std::function deserialize_; + grpc_byte_buffer* recv_buf_; }; class CallOpClientSendClose { public: - void ClientSendClose(); + CallOpClientSendClose() : send_(false) {} + + void ClientSendClose() { send_ = true; } protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status, int max_message_size); + void AddOp(grpc_op* ops, size_t* nops) { + if (!send_) return; + ops[(*nops)++].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + } + void FinishOp(void* tag, bool* status, int max_message_size) { + // nothing to do + } + + private: + bool send_; }; class CallOpServerSendStatus { public: - void ServerSendStatus(const std::multimap& trailing_metadata, const Status& status); + CallOpServerSendStatus() : send_status_available_(false) {} + + void ServerSendStatus(const std::multimap& trailing_metadata, const Status& status){ + trailing_metadata_count_ = trailing_metadata.size(); + trailing_metadata_ = FillMetadataArray(trailing_metadata); + send_status_available_ = true; + send_status_code_ = static_cast(status.code()); + send_status_details_ = status.details(); + } protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status, int max_message_size); + void AddOp(grpc_op* ops, size_t* nops) { + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = + trailing_metadata_count_; + op->data.send_status_from_server.trailing_metadata = + trailing_metadata_; + op->data.send_status_from_server.status = send_status_code_; + op->data.send_status_from_server.status_details = + send_status_details_.empty() ? nullptr : send_status_details_.c_str(); + } + + void FinishOp(void* tag, bool* status, int max_message_size) { + // nothing to do + } + + private: + bool send_status_available_; + grpc_status_code send_status_code_; + grpc::string send_status_details_; + size_t trailing_metadata_count_; + grpc_metadata* trailing_metadata_; }; class CallOpRecvInitialMetadata { public: - void RecvInitialMetadata(ClientContext* context); + CallOpRecvInitialMetadata() : recv_initial_metadata_(nullptr) { + memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_)); + } + + void RecvInitialMetadata(ClientContext* context) { + context->initial_metadata_received_ = true; + recv_initial_metadata_ = &context->recv_initial_metadata_; + } protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status, int max_message_size); + void AddOp(grpc_op* ops, size_t* nops) { + if (!recv_initial_metadata_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &recv_initial_metadata_arr_; + } + void FinishOp(void* tag, bool* status, int max_message_size) { + FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_); + } + + private: + std::multimap* recv_initial_metadata_; + grpc_metadata_array recv_initial_metadata_arr_; }; class CallOpClientRecvStatus { public: - void ClientRecvStatus(ClientContext* context, Status* status); + CallOpClientRecvStatus() { + memset(this, 0, sizeof(*this)); + } + + void ClientRecvStatus(ClientContext* context, Status* status) { + recv_trailing_metadata_ = &context->trailing_metadata_; + recv_status_ = status; + } protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status, int max_message_size); + void AddOp(grpc_op* ops, size_t* nops) { + if (recv_status_ == nullptr) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &recv_trailing_metadata_arr_; + op->data.recv_status_on_client.status = &status_code_; + op->data.recv_status_on_client.status_details = &status_details_; + op->data.recv_status_on_client.status_details_capacity = + &status_details_capacity_; + } + + void FinishOp(void* tag, bool* status, int max_message_size) { + FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_); + *recv_status_ = Status( + static_cast(status_code_), + status_details_ ? grpc::string(status_details_) : grpc::string()); + } + + private: + std::multimap* recv_trailing_metadata_; + Status* recv_status_; + grpc_metadata_array recv_trailing_metadata_arr_; + grpc_status_code status_code_; + char* status_details_; + size_t status_details_capacity_; }; class CallOpSetInterface : public CompletionQueueTag { diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index 8c42fb47927..5e37e63c6d2 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -63,7 +63,9 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, CallOpClientRecvStatus> ops; Status status; ops.SendInitialMetadata(context->send_initial_metadata_); - ops.SendMessage(request); + if (!ops.SendMessage(request)) { + return Status(INVALID_ARGUMENT, "Failed to serialize message"); + } ops.RecvInitialMetadata(context); ops.RecvMessage(result); ops.ClientSendClose(); diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 05bba6ef7c3..cd31b223238 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -89,7 +89,9 @@ class RpcMethodHandler : public MethodHandler { CallOpSet ops; ops.SendInitialMetadata(param.server_context->initial_metadata_); if (status.IsOk()) { - ops.SendMessage(rsp); + if (!ops.SendMessage(rsp)) { + status = Status(INTERNAL, "Failed to serialize response"); + } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -123,7 +125,9 @@ class ClientStreamingHandler : public MethodHandler { CallOpSet ops; ops.SendInitialMetadata(param.server_context->initial_metadata_); if (status.IsOk()) { - ops.SendMessage(rsp); + if (!ops.SendMessage(rsp)) { + status = Status(INTERNAL, "Failed to serialize response"); + } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 39a1cc85952..eb2bce678ff 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -99,7 +99,8 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { : context_(context), call_(channel->CreateCall(method, context, &cq_)) { CallOpSet ops; ops.SendInitialMetadata(context->send_initial_metadata_); - ops.SendMessage(request); + // TODO(ctiller): don't assert + GPR_ASSERT(ops.SendMessage(request)); ops.ClientSendClose(); call_.PerformOps(&ops); cq_.Pluck(&ops); @@ -169,7 +170,9 @@ class ClientWriter : public ClientWriterInterface { bool Write(const W& msg) GRPC_OVERRIDE { CallOpSet ops; - ops.SendMessage(msg); + if (!ops.SendMessage(msg)) { + return false; + } call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -245,7 +248,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { bool Write(const W& msg) GRPC_OVERRIDE { CallOpSet ops; - ops.SendMessage(msg); + if (!ops.SendMessage(msg)) return false; call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -316,11 +319,13 @@ class ServerWriter GRPC_FINAL : public WriterInterface { bool Write(const W& msg) GRPC_OVERRIDE { CallOpSet ops; + if (!ops.SendMessage(msg)) { + return false; + } if (!ctx_->sent_initial_metadata_) { ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - ops.SendMessage(msg); call_->PerformOps(&ops); return call_->cq()->Pluck(&ops); } @@ -356,11 +361,13 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface, bool Write(const W& msg) GRPC_OVERRIDE { CallOpSet ops; + if (!ops.SendMessage(msg)) { + return false; + } if (!ctx_->sent_initial_metadata_) { ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - ops.SendMessage(msg); call_->PerformOps(&ops); return call_->cq()->Pluck(&ops); } @@ -415,7 +422,8 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { : context_(context), call_(channel->CreateCall(method, context, cq)) { init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_); - init_ops_.SendMessage(request); + // TODO(ctiller): don't assert + GPR_ASSERT(init_ops_.SendMessage(request)); init_ops_.ClientSendClose(); call_.PerformOps(&init_ops_); } @@ -488,7 +496,8 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { void Write(const W& msg, void* tag) GRPC_OVERRIDE { write_ops_.set_output_tag(tag); - write_ops_.SendMessage(msg); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg)); call_.PerformOps(&write_ops_); } @@ -558,7 +567,8 @@ class ClientAsyncReaderWriter GRPC_FINAL void Write(const W& msg, void* tag) GRPC_OVERRIDE { write_ops_.set_output_tag(tag); - write_ops_.SendMessage(msg); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg)); call_.PerformOps(&write_ops_); } @@ -617,10 +627,11 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. - if (status.IsOk()) { - finish_ops_.SendMessage(msg); + if (status.IsOk() && !finish_ops_.SendMessage(msg)) { + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, Status(INTERNAL, "Failed to serialize response")); + } else { + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -667,7 +678,8 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - write_ops_.SendMessage(msg); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg)); call_.PerformOps(&write_ops_); } @@ -721,7 +733,8 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - write_ops_.SendMessage(msg); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg)); call_.PerformOps(&write_ops_); } diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index dc3d36faa2d..3f3f5279eea 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -42,6 +42,36 @@ namespace grpc { +void FillMetadataMap(grpc_metadata_array* arr, + std::multimap* metadata) { + for (size_t i = 0; i < arr->count; i++) { + // TODO(yangg) handle duplicates? + metadata->insert(std::pair( + arr->metadata[i].key, + grpc::string(arr->metadata[i].value, arr->metadata[i].value_length))); + } + grpc_metadata_array_destroy(arr); + grpc_metadata_array_init(arr); +} + +// TODO(yangg) if the map is changed before we send, the pointers will be a +// mess. Make sure it does not happen. +grpc_metadata* FillMetadataArray( + const std::multimap& metadata) { + if (metadata.empty()) { + return nullptr; + } + grpc_metadata* metadata_array = + (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata)); + size_t i = 0; + for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) { + metadata_array[i].key = iter->first.c_str(); + metadata_array[i].value = iter->second.c_str(); + metadata_array[i].value_length = iter->second.size(); + } + return metadata_array; +} + #if 0 CallOpBuffer::CallOpBuffer() : return_tag_(this), @@ -147,17 +177,6 @@ grpc_metadata* FillMetadataArray( return metadata_array; } -void FillMetadataMap(grpc_metadata_array* arr, - std::multimap* metadata) { - for (size_t i = 0; i < arr->count; i++) { - // TODO(yangg) handle duplicates? - metadata->insert(std::pair( - arr->metadata[i].key, - grpc::string(arr->metadata[i].value, arr->metadata[i].value_length))); - } - grpc_metadata_array_destroy(arr); - grpc_metadata_array_init(arr); -} } // namespace void CallOpBuffer::AddSendInitialMetadata( diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 80e43fd8544..5a26834df92 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -33,10 +33,10 @@ #include -#include "src/cpp/proto/proto_utils.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" #include "test/cpp/util/echo.grpc.pb.h" +#include #include #include #include