Merge pull request #7 from yang-g/c++api

C++api
pull/501/head
Craig Tiller 10 years ago
commit 06ed31e976
  1. 5
      include/grpc++/impl/call.h
  2. 7
      src/core/surface/server.c
  3. 43
      src/cpp/common/call.cc

@ -82,7 +82,7 @@ class CallOpBuffer final : public CompletionQueueTag {
size_t initial_metadata_count_ = 0;
grpc_metadata* initial_metadata_ = nullptr;
const google::protobuf::Message* send_message_ = nullptr;
grpc_byte_buffer* write_buffer_ = nullptr;
grpc_byte_buffer* send_message_buf_ = nullptr;
google::protobuf::Message* recv_message_ = nullptr;
grpc_byte_buffer* recv_message_buf_ = nullptr;
bool client_send_close_ = false;
@ -90,6 +90,9 @@ class CallOpBuffer final : public CompletionQueueTag {
grpc_status_code status_code_ = GRPC_STATUS_OK;
char* status_details_ = nullptr;
size_t status_details_capacity_ = 0;
Status* send_status_ = nullptr;
size_t trailing_metadata_count_ = 0;
grpc_metadata* trailing_metadata_ = nullptr;
};
class CCallDeleter {

@ -819,7 +819,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld;
call_data *calld = NULL;
gpr_mu_lock(&server->mu);
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
@ -896,6 +896,9 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server,
static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag);
static void publish_was_not_set(grpc_call *call, grpc_op_error status, void *tag) {
abort();
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
@ -910,7 +913,7 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc) {
grpc_ioreq_completion_func publish;
grpc_ioreq_completion_func publish = publish_was_not_set;
grpc_ioreq req[2];
grpc_ioreq *r = req;

@ -46,9 +46,9 @@ void CallOpBuffer::Reset(void* next_return_tag) {
gpr_free(initial_metadata_);
}
send_message_ = nullptr;
if (write_buffer_) {
grpc_byte_buffer_destroy(write_buffer_);
write_buffer_ = nullptr;
if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_);
send_message_buf_ = nullptr;
}
recv_message_ = nullptr;
if (recv_message_buf_) {
@ -107,6 +107,10 @@ void CallOpBuffer::AddClientRecvStatus(Status *status) {
recv_status_ = status;
}
void CallOpBuffer::AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata,
const Status& status) {
}
void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
*nops = 0;
@ -117,12 +121,12 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
(*nops)++;
}
if (send_message_) {
bool success = SerializeProto(*send_message_, &write_buffer_);
bool success = SerializeProto(*send_message_, &send_message_buf_);
if (!success) {
// TODO handle parse failure
}
ops[*nops].op = GRPC_OP_SEND_MESSAGE;
ops[*nops].data.send_message = write_buffer_;
ops[*nops].data.send_message = send_message_buf_;
(*nops)++;
}
if (recv_message_) {
@ -136,7 +140,7 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
}
if (recv_status_) {
ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
// ops[*nops].data.recv_status_on_client.trailing_metadata =
// TODO ops[*nops].data.recv_status_on_client.trailing_metadata =
ops[*nops].data.recv_status_on_client.status = &status_code_;
ops[*nops].data.recv_status_on_client.status_details = &status_details_;
ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_;
@ -145,10 +149,29 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
}
void CallOpBuffer::FinalizeResult(void **tag, bool *status) {
// Release send buffers
if (write_buffer_) {
grpc_byte_buffer_destroy(write_buffer_);
write_buffer_ = nullptr;
// Release send buffers.
if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_);
send_message_buf_ = nullptr;
}
if (initial_metadata_) {
gpr_free(initial_metadata_);
initial_metadata_ = nullptr;
}
// Set user-facing tag.
*tag = return_tag_;
// Parse received message if any.
if (recv_message_ && recv_message_buf_) {
*status = DeserializeProto(recv_message_buf_, recv_message_);
grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr;
}
// Parse received status.
if (recv_status_) {
*recv_status_ = Status(
static_cast<StatusCode>(status_code_),
status_details_ ? grpc::string(status_details_, status_details_capacity_)
: grpc::string());
}
}

Loading…
Cancel
Save