From ea222b2001bddc573f9151847cdd339ff517e54c Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Thu, 12 Feb 2015 09:38:51 -0800 Subject: [PATCH] resolve comments, the context_ member needs protection for thread safety --- include/grpc++/client_context.h | 1 + include/grpc++/impl/call.h | 9 --- include/grpc++/stream.h | 100 ++++++++++++++++++-------------- 3 files changed, 58 insertions(+), 52 deletions(-) diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 0a81f6a3666..f74de8fad4f 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -95,6 +95,7 @@ class ClientContext { gpr_timespec RawDeadline() { return absolute_deadline_; } + bool initial_metadata_received_ = false; grpc_call *call_; grpc_completion_queue *cq_; gpr_timespec absolute_deadline_; diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 5fafd0e8904..a1ef9268f0c 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -134,16 +134,7 @@ class Call final { grpc_call *call() { return call_; } CompletionQueue *cq() { return cq_; } - // TODO(yangg) change it to a general state query function. - bool initial_metadata_received() { - return initial_metadata_received_; - } - void set_initial_metadata_received() { - initial_metadata_received_ = true; - } - private: - bool initial_metadata_received_ = false; CallHook *call_hook_; CompletionQueue *cq_; grpc_call* call_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 6265310c5a8..74e7539aa47 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -99,21 +99,25 @@ class ClientReader final : public ClientStreamingInterface, } // Blocking wait for initial metadata from server. The received metadata - // can only be accessed after this call returns. Calling this method is - // optional as it will be called internally before the first Read. + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. void WaitForInitialMetadata() { - if (!call_.initial_metadata_received()) { - CallOpBuffer buf; - buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); - call_.set_initial_metadata_received(); - } + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + context_->initial_metadata_received_ = true; } virtual bool Read(R *msg) override { - WaitForInitialMetadata(); CallOpBuffer buf; + if (!context_->initial_metadata_received_) { + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } bool got_message; buf.AddRecvMessage(msg, &got_message); call_.PerformOps(&buf); @@ -201,21 +205,25 @@ class ClientReaderWriter final : public ClientStreamingInterface, } // Blocking wait for initial metadata from server. The received metadata - // can only be accessed after this call returns. Calling this method is - // optional as it will be called internally before the first Read. + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. void WaitForInitialMetadata() { - if (!call_.initial_metadata_received()) { - CallOpBuffer buf; - buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); - call_.set_initial_metadata_received(); - } + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + context_->initial_metadata_received_ = true; } virtual bool Read(R *msg) override { - WaitForInitialMetadata(); CallOpBuffer buf; + if (!context_->initial_metadata_received_) { + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } bool got_message; buf.AddRecvMessage(msg, &got_message); call_.PerformOps(&buf); @@ -257,13 +265,13 @@ class ServerReader final : public ReaderInterface { ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} void SendInitialMetadata() { - if (!ctx_->sent_initial_metadata_) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); - } + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); } virtual bool Read(R* msg) override { @@ -285,18 +293,21 @@ class ServerWriter final : public WriterInterface { ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} void SendInitialMetadata() { - if (!ctx_->sent_initial_metadata_) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); - } + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); } virtual bool Write(const W& msg) override { - SendInitialMetadata(); CallOpBuffer buf; + if (!ctx_->sent_initial_metadata_) { + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } buf.AddSendMessage(msg); call_->PerformOps(&buf); return call_->cq()->Pluck(&buf); @@ -315,13 +326,13 @@ class ServerReaderWriter final : public WriterInterface, ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} void SendInitialMetadata() { - if (!ctx_->sent_initial_metadata_) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); - } + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); } virtual bool Read(R* msg) override { @@ -333,8 +344,11 @@ class ServerReaderWriter final : public WriterInterface, } virtual bool Write(const W& msg) override { - SendInitialMetadata(); CallOpBuffer buf; + if (!ctx_->sent_initial_metadata_) { + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } buf.AddSendMessage(msg); call_->PerformOps(&buf); return call_->cq()->Pluck(&buf);