change AddRecvMessage signature

pull/501/head
Yang Gao 10 years ago
parent c17e861aa0
commit 0c7aafaa0c
  1. 4
      include/grpc++/impl/call.h
  2. 31
      include/grpc++/stream.h
  3. 7
      src/cpp/client/client_unary_call.cc
  4. 15
      src/cpp/common/call.cc

@ -68,7 +68,7 @@ class CallOpBuffer : public CompletionQueueTag {
void AddRecvInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message, bool* got_message);
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata,
Status *status);
@ -84,6 +84,7 @@ class CallOpBuffer : public CompletionQueueTag {
// Called by completion queue just prior to returning from Next() or Pluck()
void FinalizeResult(void **tag, bool *status) override;
bool got_message = false;
private:
void *return_tag_ = nullptr;
// Send initial metadata
@ -98,7 +99,6 @@ class CallOpBuffer : public CompletionQueueTag {
grpc_byte_buffer* send_message_buf_ = nullptr;
// Recv message
google::protobuf::Message* recv_message_ = nullptr;
bool* got_message_ = nullptr;
grpc_byte_buffer* recv_message_buf_ = nullptr;
// Client send close
bool client_send_close_ = false;

@ -119,10 +119,9 @@ class ClientReader final : public ClientStreamingInterface,
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
bool got_message;
buf.AddRecvMessage(msg, &got_message);
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf) && got_message;
return cq_.Pluck(&buf) && buf.got_message;
}
virtual Status Finish() override {
@ -174,11 +173,10 @@ class ClientWriter final : public ClientStreamingInterface,
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
bool got_message;
buf.AddRecvMessage(response_, &got_message);
buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf) && got_message);
GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message);
return status;
}
@ -225,10 +223,9 @@ class ClientReaderWriter final : public ClientStreamingInterface,
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
bool got_message;
buf.AddRecvMessage(msg, &got_message);
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
return cq_.Pluck(&buf) && got_message;
return cq_.Pluck(&buf) && buf.got_message;
}
virtual bool Write(const W& msg) override {
@ -277,10 +274,9 @@ class ServerReader final : public ReaderInterface<R> {
virtual bool Read(R* msg) override {
CallOpBuffer buf;
bool got_message;
buf.AddRecvMessage(msg, &got_message);
buf.AddRecvMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf) && got_message;
return call_->cq()->Pluck(&buf) && buf.got_message;
}
private:
@ -338,10 +334,9 @@ class ServerReaderWriter final : public WriterInterface<W>,
virtual bool Read(R* msg) override {
CallOpBuffer buf;
bool got_message;
buf.AddRecvMessage(msg, &got_message);
buf.AddRecvMessage(msg);
call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf) && got_message;
return call_->cq()->Pluck(&buf) && buf.got_message;
}
virtual bool Write(const W& msg) override {
@ -420,7 +415,7 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface,
read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
read_buf_.AddRecvMessage(msg, nullptr);
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
}
@ -484,7 +479,7 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
finish_buf_.AddRecvMessage(response_, nullptr);
finish_buf_.AddRecvMessage(response_);
finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -529,7 +524,7 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
read_buf_.AddRecvMessage(msg, nullptr);
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
}

@ -53,12 +53,11 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
buf.AddSendInitialMetadata(context);
buf.AddSendMessage(request);
buf.AddRecvInitialMetadata(&context->recv_initial_metadata_);
bool got_message;
buf.AddRecvMessage(result, &got_message);
buf.AddRecvMessage(result);
buf.AddClientSendClose();
buf.AddClientRecvStatus(&context->trailing_metadata_, &status);
call.PerformOps(&buf);
GPR_ASSERT(cq.Pluck(&buf) && (got_message || !status.IsOk()));
GPR_ASSERT(cq.Pluck(&buf) && (buf.got_message || !status.IsOk()));
return status;
}
@ -81,7 +80,7 @@ void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
buf->AddSendInitialMetadata(context);
buf->AddSendMessage(request);
buf->AddRecvInitialMetadata(&context->recv_initial_metadata_);
buf->AddRecvMessage(result, nullptr);
buf->AddRecvMessage(result);
buf->AddClientSendClose();
buf->AddClientRecvStatus(&context->trailing_metadata_, status);
call.PerformOps(buf);

@ -57,7 +57,7 @@ void CallOpBuffer::Reset(void* next_return_tag) {
}
recv_message_ = nullptr;
got_message_ = nullptr;
got_message = false;
if (recv_message_buf_) {
grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr;
@ -142,9 +142,8 @@ void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) {
send_message_ = &message;
}
void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message, bool* got_message) {
void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message) {
recv_message_ = message;
got_message_ = got_message;
}
void CallOpBuffer::AddClientSendClose() {
@ -256,16 +255,14 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) {
// Parse received message if any.
if (recv_message_) {
if (recv_message_buf_) {
if (got_message_) {
*got_message_ = true;
}
got_message = true;
*status = DeserializeProto(recv_message_buf_, recv_message_);
grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr;
} else {
if (got_message_) {
*got_message_ = false;
}
// Read failed
got_message = false;
*status = false;
}
}
// Parse received status.

Loading…
Cancel
Save