Merge pull request #5 from yang-g/c++api

Implement FillOps
pull/501/head
Craig Tiller 10 years ago
commit 1e67aeed22
  1. 14
      include/grpc++/impl/call.h
  2. 95
      src/cpp/common/call.cc

@ -34,6 +34,7 @@
#ifndef __GRPCPP_CALL_H__ #ifndef __GRPCPP_CALL_H__
#define __GRPCPP_CALL_H__ #define __GRPCPP_CALL_H__
#include <grpc/grpc.h>
#include <grpc++/status.h> #include <grpc++/status.h>
#include <grpc++/completion_queue.h> #include <grpc++/completion_queue.h>
@ -72,16 +73,25 @@ class CallOpBuffer final : public CompletionQueueTag {
// Convert to an array of grpc_op elements // Convert to an array of grpc_op elements
void FillOps(grpc_op *ops, size_t *nops); void FillOps(grpc_op *ops, size_t *nops);
// Release send buffers.
void ReleaseSendBuffer();
// Called by completion queue just prior to returning from Next() or Pluck() // Called by completion queue just prior to returning from Next() or Pluck()
void FinalizeResult(void *tag, bool *status) override; void FinalizeResult(void *tag, bool *status) override;
private: private:
void *return_tag_ = nullptr; void *return_tag_ = nullptr;
std::multimap<grpc::string, grpc::string>* metadata_ = nullptr; size_t initial_metadata_count_ = 0;
grpc_metadata* initial_metadata_ = nullptr;
const google::protobuf::Message* send_message_ = nullptr; const google::protobuf::Message* send_message_ = nullptr;
grpc_byte_buffer* write_buffer_ = nullptr;
google::protobuf::Message* recv_message_ = nullptr; google::protobuf::Message* recv_message_ = nullptr;
grpc_byte_buffer* recv_message_buf_ = nullptr;
bool client_send_close_ = false; bool client_send_close_ = false;
Status* status_ = nullptr; Status* recv_status_ = nullptr;
grpc_status_code status_code_ = GRPC_STATUS_OK;
char* status_details_ = nullptr;
size_t status_details_capacity_ = 0;
}; };
class CCallDeleter { class CCallDeleter {

@ -31,23 +31,64 @@
* *
*/ */
#include <include/grpc/support/alloc.h>
#include <include/grpc++/impl/call.h> #include <include/grpc++/impl/call.h>
#include <include/grpc++/channel_interface.h> #include <include/grpc++/channel_interface.h>
#include "src/cpp/proto/proto_utils.h"
namespace grpc { namespace grpc {
void CallOpBuffer::Reset(void* next_return_tag) { void CallOpBuffer::Reset(void* next_return_tag) {
return_tag_ = next_return_tag; return_tag_ = next_return_tag;
metadata_ = nullptr; initial_metadata_count_ = 0;
if (initial_metadata_) {
gpr_free(initial_metadata_);
}
send_message_ = nullptr; send_message_ = nullptr;
if (write_buffer_) {
grpc_byte_buffer_destroy(write_buffer_);
write_buffer_ = nullptr;
}
recv_message_ = nullptr; recv_message_ = nullptr;
if (recv_message_buf_) {
grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr;
}
client_send_close_ = false; client_send_close_ = false;
status_ = false; recv_status_ = nullptr;
status_code_ = GRPC_STATUS_OK;
if (status_details_) {
gpr_free(status_details_);
status_details_ = nullptr;
}
status_details_capacity_ = 0;
}
namespace {
// TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen.
grpc_metadata* FillMetadata(
std::multimap<grpc::string, grpc::string>* metadata) {
if (metadata->empty()) { return nullptr; }
grpc_metadata* metadata_array = (grpc_metadata*)gpr_malloc(
metadata->size()* sizeof(grpc_metadata));
size_t i = 0;
for (auto iter = metadata->cbegin();
iter != metadata->cend();
++iter, ++i) {
metadata_array[i].key = iter->first.c_str();
metadata_array[i].value = iter->second.c_str();
metadata_array[i].value_length = iter->second.size();
}
return metadata_array;
} }
} // namespace
void CallOpBuffer::AddSendInitialMetadata( void CallOpBuffer::AddSendInitialMetadata(
std::multimap<igrpc::string, grpc::string>* metadata) { std::multimap<grpc::string, grpc::string>* metadata) {
metadata_ = metadata; initial_metadata_count_ = metadata->size();
initial_metadata_ = FillMetadata(metadata);
} }
void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) {
@ -59,16 +100,55 @@ void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message) {
} }
void CallOpBuffer::AddClientSendClose() { void CallOpBuffer::AddClientSendClose() {
client_sent_close_ = true; client_send_close_ = true;
} }
void CallOpBuffer::AddClientRecvStatus(Status *status) { void CallOpBuffer::AddClientRecvStatus(Status *status) {
status_ = status; recv_status_ = status;
} }
void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
*nops = 0;
if (initial_metadata_count_) {
ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[*nops].data.send_initial_metadata.count = initial_metadata_count_;
ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
(*nops)++;
}
if (send_message_) {
bool success = SerializeProto(*send_message_, &write_buffer_);
if (!success) {
// TODO handle parse failure
}
ops[*nops].op = GRPC_OP_SEND_MESSAGE;
ops[*nops].data.send_message = write_buffer_;
(*nops)++;
}
if (recv_message_) {
ops[*nops].op = GRPC_OP_RECV_MESSAGE;
ops[*nops].data.recv_message = &recv_message_buf_;
(*nops)++;
}
if (client_send_close_) {
ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
(*nops)++;
}
if (recv_status_) {
ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
// ops[*nops].data.recv_status_on_client.trailing_metadata =
ops[*nops].data.recv_status_on_client.status = &status_code_;
ops[*nops].data.recv_status_on_client.status_details = &status_details_;
ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_;
(*nops)++;
}
}
void CallOpBuffer::ReleaseSendBuffer() {
if (write_buffer_) {
grpc_byte_buffer_destroy(write_buffer_);
write_buffer_ = nullptr;
}
} }
void CallOpBuffer::FinalizeResult(void *tag, bool *status) { void CallOpBuffer::FinalizeResult(void *tag, bool *status) {
@ -84,6 +164,7 @@ Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq)
void Call::PerformOps(CallOpBuffer* buffer) { void Call::PerformOps(CallOpBuffer* buffer) {
channel_->PerformOpsOnCall(buffer, this); channel_->PerformOpsOnCall(buffer, this);
buffer->ReleaseSendBuffer();
} }
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save