Server side cancellation receive support

pull/581/head
Craig Tiller 10 years ago
parent 645466e089
commit 492968f7d9
  1. 2
      include/grpc++/completion_queue.h
  2. 20
      include/grpc++/server_context.h
  3. 12
      include/grpc++/stream.h
  4. 5
      src/cpp/common/completion_queue.cc
  5. 8
      src/cpp/server/server.cc
  6. 70
      src/cpp/server/server_context.cc
  7. 12
      test/cpp/end2end/async_end2end_test.cc

@ -114,7 +114,7 @@ class CompletionQueue {
bool Pluck(CompletionQueueTag *tag);
// Does a single polling pluck on tag
void TryPluck(CompletionQueueTag *tag);
void TryPluck(CompletionQueueTag *tag, bool forever);
grpc_completion_queue *cq_; // owned
};

@ -34,8 +34,6 @@
#ifndef __GRPCPP_SERVER_CONTEXT_H_
#define __GRPCPP_SERVER_CONTEXT_H_
#include <grpc++/completion_queue.h>
#include <chrono>
#include <map>
#include <mutex>
@ -63,7 +61,9 @@ class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
class Call;
class CallOpBuffer;
class CompletionQueue;
class Server;
// Interface of server side rpc context.
@ -79,7 +79,7 @@ class ServerContext final {
void AddInitialMetadata(const grpc::string& key, const grpc::string& value);
void AddTrailingMetadata(const grpc::string& key, const grpc::string& value);
bool IsCancelled() { return completion_op_.CheckCancelled(cq_); }
bool IsCancelled();
std::multimap<grpc::string, grpc::string> client_metadata() {
return client_metadata_;
@ -102,22 +102,14 @@ class ServerContext final {
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
class CompletionOp final : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) override;
bool CheckCancelled(CompletionQueue* cq);
class CompletionOp;
private:
std::mutex mu_;
bool finalized_ = false;
int cancelled_ = 0;
};
void BeginCompletionOp(Call* call);
ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count);
CompletionOp completion_op_;
CompletionOp* completion_op_ = nullptr;
std::chrono::system_clock::time_point deadline_;
grpc_call* call_ = nullptr;

@ -576,8 +576,6 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -589,8 +587,6 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -636,8 +632,6 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface,
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -649,8 +643,6 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface,
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -697,8 +689,6 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface,
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
@ -753,8 +743,6 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface,
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}

@ -88,10 +88,11 @@ bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
}
}
void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
void CompletionQueue::TryPluck(CompletionQueueTag* tag, bool forever) {
std::unique_ptr<grpc_event, EventDeleter> ev;
ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_past));
ev.reset(grpc_completion_queue_pluck(
cq_, tag, forever ? gpr_inf_future : gpr_inf_past));
if (!ev) return;
bool ok = ev->data.op_complete == GRPC_OP_OK;
void* ignored = tag;

@ -205,6 +205,7 @@ class Server::SyncRequest final : public CompletionQueueTag {
if (has_response_payload_) {
res.reset(method_->AllocateResponseProto());
}
ctx_.BeginCompletionOp(&call_);
auto status = method_->handler()->RunHandler(
MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
CallOpBuffer buf;
@ -215,10 +216,12 @@ class Server::SyncRequest final : public CompletionQueueTag {
buf.AddSendMessage(*res);
}
buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
bool cancelled;
buf.AddServerRecvClose(&cancelled);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
}
private:
@ -332,6 +335,7 @@ class Server::AsyncRequest final : public CompletionQueueTag {
}
ctx_->call_ = call_;
Call call(call_, server_, cq_);
ctx_->BeginCompletionOp(&call);
// just the pointers inside call are copied here
stream_->BindCall(&call);
delete this;

@ -34,10 +34,59 @@
#include <grpc++/server_context.h>
#include <grpc++/impl/call.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/cpp/util/time.h"
namespace grpc {
// CompletionOp
class ServerContext::CompletionOp final : public CallOpBuffer {
public:
CompletionOp();
bool FinalizeResult(void** tag, bool* status) override;
bool CheckCancelled(CompletionQueue* cq);
void Unref();
private:
std::mutex mu_;
int refs_ = 2; // initial refs: one in the server context, one in the cq
bool finalized_ = false;
bool cancelled_ = false;
};
ServerContext::CompletionOp::CompletionOp() { AddServerRecvClose(&cancelled_); }
void ServerContext::CompletionOp::Unref() {
std::unique_lock<std::mutex> lock(mu_);
if (--refs_ == 0) {
lock.unlock();
delete this;
}
}
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this, false);
std::lock_guard<std::mutex> g(mu_);
return finalized_ ? cancelled_ : false;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
std::unique_lock<std::mutex> lock(mu_);
finalized_ = true;
if (!*status) cancelled_ = true;
if (--refs_ == 0) {
lock.unlock();
delete this;
}
return false;
}
// ServerContext body
ServerContext::ServerContext() {}
ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
@ -55,6 +104,15 @@ ServerContext::~ServerContext() {
if (call_) {
grpc_call_destroy(call_);
}
if (completion_op_) {
completion_op_->Unref();
}
}
void ServerContext::BeginCompletionOp(Call* call) {
GPR_ASSERT(!completion_op_);
completion_op_ = new CompletionOp();
call->PerformOps(completion_op_);
}
void ServerContext::AddInitialMetadata(const grpc::string& key,
@ -67,16 +125,8 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key,
trailing_metadata_.insert(std::make_pair(key, value));
}
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
std::lock_guard<std::mutex> g(mu_);
return finalized_ ? cancelled_ != 0 : false;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
std::lock_guard<std::mutex> g(mu_);
finalized_ = true;
return false;
bool ServerContext::IsCancelled() {
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
} // namespace grpc

@ -90,7 +90,17 @@ class AsyncEnd2endTest : public ::testing::Test {
server_ = builder.BuildAndStart();
}
void TearDown() override { server_->Shutdown(); }
void TearDown() override {
server_->Shutdown();
void* ignored_tag;
bool ignored_ok;
cli_cq_.Shutdown();
srv_cq_.Shutdown();
while (cli_cq_.Next(&ignored_tag, &ignored_ok))
;
while (srv_cq_.Next(&ignored_tag, &ignored_ok))
;
}
void ResetStub() {
std::shared_ptr<ChannelInterface> channel =

Loading…
Cancel
Save