diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index e8492e0e954..141b16ab5bf 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -82,7 +82,7 @@ class CallOpBuffer final : public CompletionQueueTag { size_t initial_metadata_count_ = 0; grpc_metadata* initial_metadata_ = nullptr; const google::protobuf::Message* send_message_ = nullptr; - grpc_byte_buffer* write_buffer_ = nullptr; + grpc_byte_buffer* send_message_buf_ = nullptr; google::protobuf::Message* recv_message_ = nullptr; grpc_byte_buffer* recv_message_buf_ = nullptr; bool client_send_close_ = false; @@ -90,6 +90,9 @@ class CallOpBuffer final : public CompletionQueueTag { grpc_status_code status_code_ = GRPC_STATUS_OK; char* status_details_ = nullptr; size_t status_details_capacity_ = 0; + Status* send_status_ = nullptr; + size_t trailing_metadata_count_ = 0; + grpc_metadata* trailing_metadata_ = nullptr; }; class CCallDeleter { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index b28a52bcbdd..93994e6bdda 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -819,7 +819,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { - call_data *calld; + call_data *calld = NULL; gpr_mu_lock(&server->mu); if (server->shutdown) { gpr_mu_unlock(&server->mu); @@ -896,6 +896,9 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag); static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag); +static void publish_was_not_set(grpc_call *call, grpc_op_error status, void *tag) { + abort(); +} static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { gpr_slice slice = value->slice; @@ -910,7 +913,7 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { static void begin_call(grpc_server *server, call_data *calld, requested_call *rc) { - grpc_ioreq_completion_func publish; + grpc_ioreq_completion_func publish = publish_was_not_set; grpc_ioreq req[2]; grpc_ioreq *r = req; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 2b47eb74c3a..b2cd55fe245 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -46,9 +46,9 @@ void CallOpBuffer::Reset(void* next_return_tag) { gpr_free(initial_metadata_); } send_message_ = nullptr; - if (write_buffer_) { - grpc_byte_buffer_destroy(write_buffer_); - write_buffer_ = nullptr; + if (send_message_buf_) { + grpc_byte_buffer_destroy(send_message_buf_); + send_message_buf_ = nullptr; } recv_message_ = nullptr; if (recv_message_buf_) { @@ -107,6 +107,10 @@ void CallOpBuffer::AddClientRecvStatus(Status *status) { recv_status_ = status; } +void CallOpBuffer::AddServerSendStatus(std::multimap* metadata, + const Status& status) { + +} void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { *nops = 0; @@ -117,12 +121,12 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { (*nops)++; } if (send_message_) { - bool success = SerializeProto(*send_message_, &write_buffer_); + bool success = SerializeProto(*send_message_, &send_message_buf_); if (!success) { // TODO handle parse failure } ops[*nops].op = GRPC_OP_SEND_MESSAGE; - ops[*nops].data.send_message = write_buffer_; + ops[*nops].data.send_message = send_message_buf_; (*nops)++; } if (recv_message_) { @@ -136,7 +140,7 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { } if (recv_status_) { ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - // ops[*nops].data.recv_status_on_client.trailing_metadata = + // TODO ops[*nops].data.recv_status_on_client.trailing_metadata = ops[*nops].data.recv_status_on_client.status = &status_code_; ops[*nops].data.recv_status_on_client.status_details = &status_details_; ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_; @@ -145,10 +149,29 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { } void CallOpBuffer::FinalizeResult(void **tag, bool *status) { - // Release send buffers - if (write_buffer_) { - grpc_byte_buffer_destroy(write_buffer_); - write_buffer_ = nullptr; + // Release send buffers. + if (send_message_buf_) { + grpc_byte_buffer_destroy(send_message_buf_); + send_message_buf_ = nullptr; + } + if (initial_metadata_) { + gpr_free(initial_metadata_); + initial_metadata_ = nullptr; + } + // Set user-facing tag. + *tag = return_tag_; + // Parse received message if any. + if (recv_message_ && recv_message_buf_) { + *status = DeserializeProto(recv_message_buf_, recv_message_); + grpc_byte_buffer_destroy(recv_message_buf_); + recv_message_buf_ = nullptr; + } + // Parse received status. + if (recv_status_) { + *recv_status_ = Status( + static_cast(status_code_), + status_details_ ? grpc::string(status_details_, status_details_capacity_) + : grpc::string()); } }