Merge pull request #10 from yang-g/c++api

Put in SendInitialMetadata and WaitForInitialMetadata in some places.
pull/501/head
Craig Tiller 10 years ago
commit 2e70dc18c6
  1. 1
      include/grpc++/client_context.h
  2. 2
      include/grpc++/server_context.h
  3. 82
      include/grpc++/stream.h
  4. 16
      src/compiler/cpp_generator.cc
  5. 6
      src/cpp/common/call.cc
  6. 1
      src/cpp/server/server.cc
  7. 7
      src/cpp/server/server_context.cc

@ -95,6 +95,7 @@ class ClientContext {
gpr_timespec RawDeadline() { return absolute_deadline_; } gpr_timespec RawDeadline() { return absolute_deadline_; }
bool initial_metadata_received_ = false;
grpc_call *call_; grpc_call *call_;
grpc_completion_queue *cq_; grpc_completion_queue *cq_;
gpr_timespec absolute_deadline_; gpr_timespec absolute_deadline_;

@ -75,8 +75,6 @@ class ServerContext {
ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count); ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count);
void SendInitialMetadataIfNeeded(CallOpBuffer *buf);
const std::chrono::system_clock::time_point deadline_; const std::chrono::system_clock::time_point deadline_;
bool sent_initial_metadata_ = false; bool sent_initial_metadata_ = false;
std::multimap<grpc::string, grpc::string> client_metadata_; std::multimap<grpc::string, grpc::string> client_metadata_;

@ -98,8 +98,26 @@ class ClientReader final : public ClientStreamingInterface,
cq_.Pluck(&buf); cq_.Pluck(&buf);
} }
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
context_->initial_metadata_received_ = true;
}
virtual bool Read(R *msg) override { virtual bool Read(R *msg) override {
CallOpBuffer buf; CallOpBuffer buf;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
bool got_message; bool got_message;
buf.AddRecvMessage(msg, &got_message); buf.AddRecvMessage(msg, &got_message);
call_.PerformOps(&buf); call_.PerformOps(&buf);
@ -186,8 +204,26 @@ class ClientReaderWriter final : public ClientStreamingInterface,
GPR_ASSERT(cq_.Pluck(&buf)); GPR_ASSERT(cq_.Pluck(&buf));
} }
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
context_->initial_metadata_received_ = true;
}
virtual bool Read(R *msg) override { virtual bool Read(R *msg) override {
CallOpBuffer buf; CallOpBuffer buf;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
}
bool got_message; bool got_message;
buf.AddRecvMessage(msg, &got_message); buf.AddRecvMessage(msg, &got_message);
call_.PerformOps(&buf); call_.PerformOps(&buf);
@ -226,7 +262,17 @@ class ClientReaderWriter final : public ClientStreamingInterface,
template <class R> template <class R>
class ServerReader final : public ReaderInterface<R> { class ServerReader final : public ReaderInterface<R> {
public: public:
explicit ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpBuffer buf;
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&buf);
call_->cq()->Pluck(&buf);
}
virtual bool Read(R* msg) override { virtual bool Read(R* msg) override {
CallOpBuffer buf; CallOpBuffer buf;
@ -244,11 +290,24 @@ class ServerReader final : public ReaderInterface<R> {
template <class W> template <class W>
class ServerWriter final : public WriterInterface<W> { class ServerWriter final : public WriterInterface<W> {
public: public:
explicit ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpBuffer buf;
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&buf);
call_->cq()->Pluck(&buf);
}
virtual bool Write(const W& msg) override { virtual bool Write(const W& msg) override {
CallOpBuffer buf; CallOpBuffer buf;
ctx_->SendInitialMetadataIfNeeded(&buf); if (!ctx_->sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
buf.AddSendMessage(msg); buf.AddSendMessage(msg);
call_->PerformOps(&buf); call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf); return call_->cq()->Pluck(&buf);
@ -264,7 +323,17 @@ template <class W, class R>
class ServerReaderWriter final : public WriterInterface<W>, class ServerReaderWriter final : public WriterInterface<W>,
public ReaderInterface<R> { public ReaderInterface<R> {
public: public:
explicit ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpBuffer buf;
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&buf);
call_->cq()->Pluck(&buf);
}
virtual bool Read(R* msg) override { virtual bool Read(R* msg) override {
CallOpBuffer buf; CallOpBuffer buf;
@ -276,7 +345,10 @@ class ServerReaderWriter final : public WriterInterface<W>,
virtual bool Write(const W& msg) override { virtual bool Write(const W& msg) override {
CallOpBuffer buf; CallOpBuffer buf;
ctx_->SendInitialMetadataIfNeeded(&buf); if (!ctx_->sent_initial_metadata_) {
buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
buf.AddSendMessage(msg); buf.AddSendMessage(msg);
call_->PerformOps(&buf); call_->PerformOps(&buf);
return call_->cq()->Pluck(&buf); return call_->cq()->Pluck(&buf);

@ -251,26 +251,26 @@ void PrintHeaderServerMethodAsync(
grpc_cpp_generator::ClassName(method->output_type(), true); grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) { if (NoStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"void $Method$(" "void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, " "::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, " "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* cq, void *tag);\n"); "::grpc::CompletionQueue* cq, void *tag);\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"void $Method$(" "void Request$Method$("
"::grpc::ServerContext* context, " "::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Request$>* reader, " "::grpc::ServerAsyncReader< $Request$>* reader, "
"::grpc::CompletionQueue* cq, void *tag);\n"); "::grpc::CompletionQueue* cq, void *tag);\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"void $Method$(" "void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, " "::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, " "::grpc::ServerAsyncWriter< $Response$>* writer, "
"::grpc::CompletionQueue* cq, void *tag);\n"); "::grpc::CompletionQueue* cq, void *tag);\n");
} else if (BidiStreaming(method)) { } else if (BidiStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
"void $Method$(" "void Request$Method$("
"::grpc::ServerContext* context, " "::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* cq, void *tag);\n"); "::grpc::CompletionQueue* cq, void *tag);\n");
@ -472,7 +472,7 @@ void PrintSourceServerAsyncMethod(google::protobuf::io::Printer *printer,
grpc_cpp_generator::ClassName(method->output_type(), true); grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) { if (NoStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"void $Service$::AsyncService::$Method$(" "void $Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, " "::grpc::ServerContext* context, "
"$Request$* request, " "$Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, " "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
@ -480,14 +480,14 @@ void PrintSourceServerAsyncMethod(google::protobuf::io::Printer *printer,
printer->Print("}\n\n"); printer->Print("}\n\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"void $Service$::AsyncService::$Method$(" "void $Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, " "::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Request$>* reader, " "::grpc::ServerAsyncReader< $Request$>* reader, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print("}\n\n"); printer->Print("}\n\n");
} else if (ServerOnlyStreaming(method)) { } else if (ServerOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"void $Service$::AsyncService::$Method$(" "void $Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, " "::grpc::ServerContext* context, "
"$Request$* request, " "$Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, " "::grpc::ServerAsyncWriter< $Response$>* writer, "
@ -495,7 +495,7 @@ void PrintSourceServerAsyncMethod(google::protobuf::io::Printer *printer,
printer->Print("}\n\n"); printer->Print("}\n\n");
} else if (BidiStreaming(method)) { } else if (BidiStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,
"void $Service$::AsyncService::$Method$(" "void $Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, " "::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* cq, void *tag) {\n"); "::grpc::CompletionQueue* cq, void *tag) {\n");

@ -121,6 +121,12 @@ void CallOpBuffer::AddSendInitialMetadata(
initial_metadata_ = FillMetadataArray(metadata); initial_metadata_ = FillMetadataArray(metadata);
} }
void CallOpBuffer::AddRecvInitialMetadata(
std::multimap<grpc::string, grpc::string>* metadata) {
recv_initial_metadata_ = metadata;
}
void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) { void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) {
AddSendInitialMetadata(&ctx->send_initial_metadata_); AddSendInitialMetadata(&ctx->send_initial_metadata_);
} }

@ -177,7 +177,6 @@ class Server::MethodRequestData final : public CompletionQueueTag {
auto status = method_->handler()->RunHandler( auto status = method_->handler()->RunHandler(
MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
CallOpBuffer buf; CallOpBuffer buf;
ctx_.SendInitialMetadataIfNeeded(&buf);
if (has_response_payload_) { if (has_response_payload_) {
buf.AddSendMessage(*res); buf.AddSendMessage(*res);
} }

@ -49,11 +49,4 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata *metadata,
} }
} }
void ServerContext::SendInitialMetadataIfNeeded(CallOpBuffer* buf) {
if (!sent_initial_metadata_) {
buf->AddSendInitialMetadata(&initial_metadata_);
sent_initial_metadata_ = true;
}
}
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save