From fd7199f64ec05c46f6aebd8644bbcb78f2889898 Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Wed, 11 Feb 2015 23:14:49 -0800 Subject: [PATCH 1/5] Add client side WaitForInitialMetadata for streaming. --- include/grpc++/impl/call.h | 9 +++++++++ include/grpc++/stream.h | 29 +++++++++++++++++++++++++++++ src/cpp/common/call.cc | 6 ++++++ 3 files changed, 44 insertions(+) diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index a1ef9268f0c..5fafd0e8904 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -134,7 +134,16 @@ class Call final { grpc_call *call() { return call_; } CompletionQueue *cq() { return cq_; } + // TODO(yangg) change it to a general state query function. + bool initial_metadata_received() { + return initial_metadata_received_; + } + void set_initial_metadata_received() { + initial_metadata_received_ = true; + } + private: + bool initial_metadata_received_ = false; CallHook *call_hook_; CompletionQueue *cq_; grpc_call* call_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index a59ccd8c051..6da1be4e9fe 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -98,7 +98,22 @@ class ClientReader final : public ClientStreamingInterface, cq_.Pluck(&buf); } + // Blocking wait for initial metadata from server. The received metadata + // can only be accessed after this call returns. Calling this method is + // optional as it will be called internally before the first Read. + void WaitForInitialMetadata() { + if (!call_.initial_metadata_received()) { + CallOpBuffer buf; + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + call_.set_initial_metadata_received(); + } + } + + virtual bool Read(R *msg) override { + WaitForInitialMetadata(); CallOpBuffer buf; bool got_message; buf.AddRecvMessage(msg, &got_message); @@ -186,7 +201,21 @@ class ClientReaderWriter final : public ClientStreamingInterface, GPR_ASSERT(cq_.Pluck(&buf)); } + // Blocking wait for initial metadata from server. The received metadata + // can only be accessed after this call returns. Calling this method is + // optional as it will be called internally before the first Read. + void WaitForInitialMetadata() { + if (!call_.initial_metadata_received()) { + CallOpBuffer buf; + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + call_.set_initial_metadata_received(); + } + } + virtual bool Read(R *msg) override { + WaitForInitialMetadata(); CallOpBuffer buf; bool got_message; buf.AddRecvMessage(msg, &got_message); diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index a20d4a0d9a6..aae69084eb4 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -121,6 +121,12 @@ void CallOpBuffer::AddSendInitialMetadata( initial_metadata_ = FillMetadataArray(metadata); } +void CallOpBuffer::AddRecvInitialMetadata( + std::multimap* metadata) { + recv_initial_metadata_ = metadata; +} + + void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) { AddSendInitialMetadata(&ctx->send_initial_metadata_); } From b492f06c9dc45c62f1cb46b9320a25f0f4be4300 Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Wed, 11 Feb 2015 23:43:20 -0800 Subject: [PATCH 2/5] Add SendInitialMetadata() to server streaming interfaces --- include/grpc++/stream.h | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 6da1be4e9fe..be518c61ed6 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -111,7 +111,6 @@ class ClientReader final : public ClientStreamingInterface, } } - virtual bool Read(R *msg) override { WaitForInitialMetadata(); CallOpBuffer buf; @@ -255,7 +254,14 @@ class ClientReaderWriter final : public ClientStreamingInterface, template class ServerReader final : public ReaderInterface { public: - explicit ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + CallOpBuffer buf; + ctx_->SendInitialMetadataIfNeeded(&buf); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } virtual bool Read(R* msg) override { CallOpBuffer buf; @@ -273,7 +279,14 @@ class ServerReader final : public ReaderInterface { template class ServerWriter final : public WriterInterface { public: - explicit ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + CallOpBuffer buf; + ctx_->SendInitialMetadataIfNeeded(&buf); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } virtual bool Write(const W& msg) override { CallOpBuffer buf; @@ -293,7 +306,14 @@ template class ServerReaderWriter final : public WriterInterface, public ReaderInterface { public: - explicit ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + CallOpBuffer buf; + ctx_->SendInitialMetadataIfNeeded(&buf); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } virtual bool Read(R* msg) override { CallOpBuffer buf; From 3ccdbe9bcc66f5769348ca5279f9bf5b7e700613 Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Wed, 11 Feb 2015 23:56:30 -0800 Subject: [PATCH 3/5] Make SendInitialMetadata work. --- include/grpc++/server_context.h | 2 -- include/grpc++/stream.h | 37 ++++++++++++++++++++------------ src/cpp/server/server.cc | 1 - src/cpp/server/server_context.cc | 7 ------ 4 files changed, 23 insertions(+), 24 deletions(-) diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index a30f6cb51d0..6cc3716291c 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -75,8 +75,6 @@ class ServerContext { ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count); - void SendInitialMetadataIfNeeded(CallOpBuffer *buf); - const std::chrono::system_clock::time_point deadline_; bool sent_initial_metadata_ = false; std::multimap client_metadata_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index be518c61ed6..6265310c5a8 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -257,10 +257,13 @@ class ServerReader final : public ReaderInterface { ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} void SendInitialMetadata() { - CallOpBuffer buf; - ctx_->SendInitialMetadataIfNeeded(&buf); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + if (!ctx_->sent_initial_metadata_) { + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); + } } virtual bool Read(R* msg) override { @@ -282,15 +285,18 @@ class ServerWriter final : public WriterInterface { ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} void SendInitialMetadata() { - CallOpBuffer buf; - ctx_->SendInitialMetadataIfNeeded(&buf); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + if (!ctx_->sent_initial_metadata_) { + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); + } } virtual bool Write(const W& msg) override { + SendInitialMetadata(); CallOpBuffer buf; - ctx_->SendInitialMetadataIfNeeded(&buf); buf.AddSendMessage(msg); call_->PerformOps(&buf); return call_->cq()->Pluck(&buf); @@ -309,10 +315,13 @@ class ServerReaderWriter final : public WriterInterface, ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} void SendInitialMetadata() { - CallOpBuffer buf; - ctx_->SendInitialMetadataIfNeeded(&buf); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + if (!ctx_->sent_initial_metadata_) { + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); + } } virtual bool Read(R* msg) override { @@ -324,8 +333,8 @@ class ServerReaderWriter final : public WriterInterface, } virtual bool Write(const W& msg) override { + SendInitialMetadata(); CallOpBuffer buf; - ctx_->SendInitialMetadataIfNeeded(&buf); buf.AddSendMessage(msg); call_->PerformOps(&buf); return call_->cq()->Pluck(&buf); diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 17b0543bcd3..630b16ba0b2 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -177,7 +177,6 @@ class Server::MethodRequestData final : public CompletionQueueTag { auto status = method_->handler()->RunHandler( MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); CallOpBuffer buf; - ctx_.SendInitialMetadataIfNeeded(&buf); if (has_response_payload_) { buf.AddSendMessage(*res); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 06eeb392975..1823dfc81b0 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -49,11 +49,4 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata *metadata, } } -void ServerContext::SendInitialMetadataIfNeeded(CallOpBuffer* buf) { - if (!sent_initial_metadata_) { - buf->AddSendInitialMetadata(&initial_metadata_); - sent_initial_metadata_ = true; - } -} - } // namespace grpc From ca3cb3e19a618157ea70a5054be5b11e5dc1203c Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Thu, 12 Feb 2015 00:05:11 -0800 Subject: [PATCH 4/5] Prefix Request to async server method names --- src/compiler/cpp_generator.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index d04c2efcc49..e29cfadcefb 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -251,26 +251,26 @@ void PrintHeaderServerMethodAsync( grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { printer->Print(*vars, - "void $Method$(" + "void Request$Method$(" "::grpc::ServerContext* context, $Request$* request, " "::grpc::ServerAsyncResponseWriter< $Response$>* response, " "::grpc::CompletionQueue* cq, void *tag);\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, - "void $Method$(" + "void Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReader< $Request$>* reader, " "::grpc::CompletionQueue* cq, void *tag);\n"); } else if (ServerOnlyStreaming(method)) { printer->Print(*vars, - "void $Method$(" + "void Request$Method$(" "::grpc::ServerContext* context, $Request$* request, " "::grpc::ServerAsyncWriter< $Response$>* writer, " "::grpc::CompletionQueue* cq, void *tag);\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, - "void $Method$(" + "void Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " "::grpc::CompletionQueue* cq, void *tag);\n"); @@ -472,7 +472,7 @@ void PrintSourceServerAsyncMethod(google::protobuf::io::Printer *printer, grpc_cpp_generator::ClassName(method->output_type(), true); if (NoStreaming(method)) { printer->Print(*vars, - "void $Service$::AsyncService::$Method$(" + "void $Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "$Request$* request, " "::grpc::ServerAsyncResponseWriter< $Response$>* response, " @@ -480,14 +480,14 @@ void PrintSourceServerAsyncMethod(google::protobuf::io::Printer *printer, printer->Print("}\n\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, - "void $Service$::AsyncService::$Method$(" + "void $Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReader< $Request$>* reader, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print("}\n\n"); } else if (ServerOnlyStreaming(method)) { printer->Print(*vars, - "void $Service$::AsyncService::$Method$(" + "void $Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "$Request$* request, " "::grpc::ServerAsyncWriter< $Response$>* writer, " @@ -495,7 +495,7 @@ void PrintSourceServerAsyncMethod(google::protobuf::io::Printer *printer, printer->Print("}\n\n"); } else if (BidiStreaming(method)) { printer->Print(*vars, - "void $Service$::AsyncService::$Method$(" + "void $Service$::AsyncService::Request$Method$(" "::grpc::ServerContext* context, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " "::grpc::CompletionQueue* cq, void *tag) {\n"); From ea222b2001bddc573f9151847cdd339ff517e54c Mon Sep 17 00:00:00 2001 From: Yang Gao Date: Thu, 12 Feb 2015 09:38:51 -0800 Subject: [PATCH 5/5] resolve comments, the context_ member needs protection for thread safety --- include/grpc++/client_context.h | 1 + include/grpc++/impl/call.h | 9 --- include/grpc++/stream.h | 100 ++++++++++++++++++-------------- 3 files changed, 58 insertions(+), 52 deletions(-) diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 0a81f6a3666..f74de8fad4f 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -95,6 +95,7 @@ class ClientContext { gpr_timespec RawDeadline() { return absolute_deadline_; } + bool initial_metadata_received_ = false; grpc_call *call_; grpc_completion_queue *cq_; gpr_timespec absolute_deadline_; diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 5fafd0e8904..a1ef9268f0c 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -134,16 +134,7 @@ class Call final { grpc_call *call() { return call_; } CompletionQueue *cq() { return cq_; } - // TODO(yangg) change it to a general state query function. - bool initial_metadata_received() { - return initial_metadata_received_; - } - void set_initial_metadata_received() { - initial_metadata_received_ = true; - } - private: - bool initial_metadata_received_ = false; CallHook *call_hook_; CompletionQueue *cq_; grpc_call* call_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 6265310c5a8..74e7539aa47 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -99,21 +99,25 @@ class ClientReader final : public ClientStreamingInterface, } // Blocking wait for initial metadata from server. The received metadata - // can only be accessed after this call returns. Calling this method is - // optional as it will be called internally before the first Read. + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. void WaitForInitialMetadata() { - if (!call_.initial_metadata_received()) { - CallOpBuffer buf; - buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); - call_.set_initial_metadata_received(); - } + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + context_->initial_metadata_received_ = true; } virtual bool Read(R *msg) override { - WaitForInitialMetadata(); CallOpBuffer buf; + if (!context_->initial_metadata_received_) { + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } bool got_message; buf.AddRecvMessage(msg, &got_message); call_.PerformOps(&buf); @@ -201,21 +205,25 @@ class ClientReaderWriter final : public ClientStreamingInterface, } // Blocking wait for initial metadata from server. The received metadata - // can only be accessed after this call returns. Calling this method is - // optional as it will be called internally before the first Read. + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. void WaitForInitialMetadata() { - if (!call_.initial_metadata_received()) { - CallOpBuffer buf; - buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); - call_.set_initial_metadata_received(); - } + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + context_->initial_metadata_received_ = true; } virtual bool Read(R *msg) override { - WaitForInitialMetadata(); CallOpBuffer buf; + if (!context_->initial_metadata_received_) { + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } bool got_message; buf.AddRecvMessage(msg, &got_message); call_.PerformOps(&buf); @@ -257,13 +265,13 @@ class ServerReader final : public ReaderInterface { ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} void SendInitialMetadata() { - if (!ctx_->sent_initial_metadata_) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); - } + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); } virtual bool Read(R* msg) override { @@ -285,18 +293,21 @@ class ServerWriter final : public WriterInterface { ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} void SendInitialMetadata() { - if (!ctx_->sent_initial_metadata_) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); - } + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); } virtual bool Write(const W& msg) override { - SendInitialMetadata(); CallOpBuffer buf; + if (!ctx_->sent_initial_metadata_) { + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } buf.AddSendMessage(msg); call_->PerformOps(&buf); return call_->cq()->Pluck(&buf); @@ -315,13 +326,13 @@ class ServerReaderWriter final : public WriterInterface, ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} void SendInitialMetadata() { - if (!ctx_->sent_initial_metadata_) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); - } + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpBuffer buf; + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&buf); + call_->cq()->Pluck(&buf); } virtual bool Read(R* msg) override { @@ -333,8 +344,11 @@ class ServerReaderWriter final : public WriterInterface, } virtual bool Write(const W& msg) override { - SendInitialMetadata(); CallOpBuffer buf; + if (!ctx_->sent_initial_metadata_) { + buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } buf.AddSendMessage(msg); call_->PerformOps(&buf); return call_->cq()->Pluck(&buf);