Merge pull request #8 from yang-g/c++api

Finish call.h and call.cc
pull/501/head
Craig Tiller 10 years ago
commit dbc8d685db
  1. 34
      include/grpc++/impl/call.h
  2. 12
      include/grpc++/stream.h
  3. 2
      src/cpp/client/client_unary_call.cc
  4. 82
      src/cpp/common/call.cc

@ -64,12 +64,15 @@ class CallOpBuffer final : public CompletionQueueTag {
void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendInitialMetadata(ClientContext *ctx);
void AddRecvInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
void AddClientRecvStatus(Status *status);
void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata,
Status *status);
void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata,
const Status &status);
const Status& status);
// INTERNAL API:
@ -81,19 +84,30 @@ class CallOpBuffer final : public CompletionQueueTag {
private:
void *return_tag_ = nullptr;
// Send initial metadata
bool send_initial_metadata_ = false;
size_t initial_metadata_count_ = 0;
grpc_metadata *initial_metadata_ = nullptr;
const google::protobuf::Message *send_message_ = nullptr;
grpc_byte_buffer *send_message_buf_ = nullptr;
google::protobuf::Message *recv_message_ = nullptr;
grpc_byte_buffer *recv_message_buf_ = nullptr;
grpc_metadata* initial_metadata_ = nullptr;
// Recv initial metadta
std::multimap<grpc::string, grpc::string>* recv_initial_metadata_ = nullptr;
grpc_metadata_array recv_initial_metadata_arr_ = {0, 0, nullptr};
// Send message
const google::protobuf::Message* send_message_ = nullptr;
grpc_byte_buffer* send_message_buf_ = nullptr;
// Recv message
google::protobuf::Message* recv_message_ = nullptr;
grpc_byte_buffer* recv_message_buf_ = nullptr;
// Client send close
bool client_send_close_ = false;
Status *recv_status_ = nullptr;
// Client recv status
std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_ = nullptr;
Status* recv_status_ = nullptr;
grpc_metadata_array recv_trailing_metadata_arr_ = {0, 0, nullptr};
grpc_status_code status_code_ = GRPC_STATUS_OK;
char *status_details_ = nullptr;
size_t status_details_capacity_ = 0;
Status *send_status_ = nullptr;
// Server send status
const Status* send_status_ = nullptr;
size_t trailing_metadata_count_ = 0;
grpc_metadata *trailing_metadata_ = nullptr;
};
@ -107,7 +121,7 @@ class CCallDeleter {
class CallHook {
public:
virtual ~CallHook() {}
virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
};
// Straightforward wrapping of the C call object

@ -105,7 +105,7 @@ class ClientReader final : public ClientStreamingInterface,
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
buf.AddClientRecvStatus(nullptr, &status); // TODO metadata
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
return status;
@ -146,7 +146,7 @@ class ClientWriter final : public ClientStreamingInterface,
CallOpBuffer buf;
Status status;
buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(&status);
buf.AddClientRecvStatus(nullptr, &status); // TODO metadata
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
return status;
@ -193,7 +193,7 @@ class ClientReaderWriter final : public ClientStreamingInterface,
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
buf.AddClientRecvStatus(nullptr, &status); // TODO metadata
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
return status;
@ -312,7 +312,7 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface,
virtual void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
finish_buf_.AddClientRecvStatus(status);
finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata
call_.PerformOps(&finish_buf_);
}
@ -350,7 +350,7 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
virtual void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
finish_buf_.AddRecvMessage(response_);
finish_buf_.AddClientRecvStatus(status);
finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata
call_.PerformOps(&finish_buf_);
}
@ -393,7 +393,7 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
virtual void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
finish_buf_.AddClientRecvStatus(status);
finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata
call_.PerformOps(&finish_buf_);
}

@ -52,7 +52,7 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
buf.AddSendMessage(request);
buf.AddRecvMessage(result);
buf.AddClientSendClose();
buf.AddClientRecvStatus(&status);
buf.AddClientRecvStatus(nullptr, &status); // TODO metadata
call.PerformOps(&buf);
cq.Pluck(&buf);
return status;

@ -42,33 +42,48 @@ namespace grpc {
void CallOpBuffer::Reset(void* next_return_tag) {
return_tag_ = next_return_tag;
send_initial_metadata_ = false;
initial_metadata_count_ = 0;
gpr_free(initial_metadata_);
recv_initial_metadata_ = nullptr;
gpr_free(recv_initial_metadata_arr_.metadata);
recv_initial_metadata_arr_ = {0, 0, nullptr};
send_message_ = nullptr;
if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_);
send_message_buf_ = nullptr;
}
recv_message_ = nullptr;
if (recv_message_buf_) {
grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr;
}
client_send_close_ = false;
recv_trailing_metadata_ = nullptr;
recv_status_ = nullptr;
gpr_free(recv_trailing_metadata_arr_.metadata);
recv_trailing_metadata_arr_ = {0, 0, nullptr};
status_code_ = GRPC_STATUS_OK;
if (status_details_) {
gpr_free(status_details_);
status_details_ = nullptr;
}
gpr_free(status_details_);
status_details_ = nullptr;
status_details_capacity_ = 0;
send_status_ = nullptr;
trailing_metadata_count_ = 0;
trailing_metadata_ = nullptr;
}
namespace {
// TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen.
grpc_metadata* FillMetadata(
grpc_metadata* FillMetadataArray(
std::multimap<grpc::string, grpc::string>* metadata) {
if (metadata->empty()) { return nullptr; }
grpc_metadata* metadata_array = (grpc_metadata*)gpr_malloc(
@ -83,13 +98,24 @@ grpc_metadata* FillMetadata(
}
return metadata_array;
}
void FillMetadataMap(grpc_metadata_array* arr,
std::multimap<grpc::string, grpc::string>* metadata) {
for (size_t i = 0; i < arr->count; i++) {
// TODO(yangg) handle duplicates?
metadata->insert(std::pair<grpc::string, grpc::string>(
arr->metadata[i].key, {arr->metadata[i].value, arr->metadata[i].value_length}));
}
grpc_metadata_array_destroy(arr);
grpc_metadata_array_init(arr);
}
} // namespace
void CallOpBuffer::AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string>* metadata) {
send_initial_metadata_ = true;
initial_metadata_count_ = metadata->size();
initial_metadata_ = FillMetadata(metadata);
initial_metadata_ = FillMetadataArray(metadata);
}
void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) {
@ -108,13 +134,17 @@ void CallOpBuffer::AddClientSendClose() {
client_send_close_ = true;
}
void CallOpBuffer::AddClientRecvStatus(Status *status) {
void CallOpBuffer::AddClientRecvStatus(
std::multimap<grpc::string, grpc::string>* metadata, Status *status) {
recv_trailing_metadata_ = metadata;
recv_status_ = status;
}
void CallOpBuffer::AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata,
const Status& status) {
void CallOpBuffer::AddServerSendStatus(
std::multimap<grpc::string, grpc::string>* metadata, const Status& status) {
trailing_metadata_count_ = metadata->size();
trailing_metadata_ = FillMetadataArray(metadata);
send_status_ = &status;
}
void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
@ -125,6 +155,11 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
(*nops)++;
}
if (recv_initial_metadata_) {
ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
(*nops)++;
}
if (send_message_) {
bool success = SerializeProto(*send_message_, &send_message_buf_);
if (!success) {
@ -145,10 +180,24 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
}
if (recv_status_) {
ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
// TODO ops[*nops].data.recv_status_on_client.trailing_metadata =
ops[*nops].data.recv_status_on_client.trailing_metadata =
&recv_trailing_metadata_arr_;
ops[*nops].data.recv_status_on_client.status = &status_code_;
ops[*nops].data.recv_status_on_client.status_details = &status_details_;
ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_;
ops[*nops].data.recv_status_on_client.status_details_capacity =
&status_details_capacity_;
(*nops)++;
}
if (send_status_) {
ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[*nops].data.send_status_from_server.trailing_metadata_count =
trailing_metadata_count_;
ops[*nops].data.send_status_from_server.trailing_metadata =
trailing_metadata_;
ops[*nops].data.send_status_from_server.status =
static_cast<grpc_status_code>(send_status_->code());
ops[*nops].data.send_status_from_server.status_details =
send_status_->details().c_str();
(*nops)++;
}
}
@ -163,8 +212,16 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) {
gpr_free(initial_metadata_);
initial_metadata_ = nullptr;
}
if (trailing_metadata_count_) {
gpr_free(trailing_metadata_);
trailing_metadata_ = nullptr;
}
// Set user-facing tag.
*tag = return_tag_;
// Process received initial metadata
if (recv_initial_metadata_) {
FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
}
// Parse received message if any.
if (recv_message_ && recv_message_buf_) {
*status = DeserializeProto(recv_message_buf_, recv_message_);
@ -173,6 +230,7 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) {
}
// Parse received status.
if (recv_status_) {
FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
*recv_status_ = Status(
static_cast<StatusCode>(status_code_),
status_details_ ? grpc::string(status_details_, status_details_capacity_)

Loading…
Cancel
Save