diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 9ac92a00dd9..f4b07625bee 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -73,6 +73,7 @@ class CallOpBuffer final : public CompletionQueueTag { Status *status); void AddServerSendStatus(std::multimap *metadata, const Status& status); + void AddServerRecvClose(bool* cancelled); // INTERNAL API: @@ -110,6 +111,8 @@ class CallOpBuffer final : public CompletionQueueTag { const Status* send_status_ = nullptr; size_t trailing_metadata_count_ = 0; grpc_metadata *trailing_metadata_ = nullptr; + int cancelled_buf_; + bool *recv_closed_ = nullptr; }; // Channel and Server implement this to allow them to hook performing ops diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index 68253986124..30bf2d0fc21 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -36,6 +36,7 @@ #include #include #include +#include namespace grpc { @@ -54,7 +55,7 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, buf.AddClientSendClose(); buf.AddClientRecvStatus(nullptr, &status); // TODO metadata call.PerformOps(&buf); - cq.Pluck(&buf); + GPR_ASSERT(cq.Pluck(&buf)); return status; } diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 6b1d99f739a..d90ef0311e4 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -78,6 +78,8 @@ void CallOpBuffer::Reset(void* next_return_tag) { send_status_ = nullptr; trailing_metadata_count_ = 0; trailing_metadata_ = nullptr; + + recv_closed_ = nullptr; } namespace { @@ -134,6 +136,10 @@ void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; } +void CallOpBuffer::AddServerRecvClose(bool* cancelled) { + recv_closed_ = cancelled; +} + void CallOpBuffer::AddClientRecvStatus( std::multimap* metadata, Status *status) { recv_trailing_metadata_ = metadata; @@ -205,6 +211,11 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { send_status_->details().c_str(); (*nops)++; } + if (recv_closed_) { + ops[*nops].op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops[*nops].data.recv_close_on_server.cancelled = &cancelled_buf_; + (*nops)++; + } } void CallOpBuffer::FinalizeResult(void **tag, bool *status) { @@ -241,6 +252,9 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) { status_details_ ? grpc::string(status_details_) : grpc::string()); } + if (recv_closed_) { + *recv_closed_ = cancelled_buf_ != 0; + } } Call::Call(grpc_call* call, CallHook *call_hook, CompletionQueue* cq) diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 1c95ab21bd6..be3d2256302 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -119,7 +119,8 @@ class Server::MethodRequestData final : public CompletionQueueTag { } static MethodRequestData *Wait(CompletionQueue *cq, bool *ok) { - void *tag; + void *tag = nullptr; + *ok = false; if (!cq->Next(&tag, ok)) { return nullptr; } @@ -183,6 +184,8 @@ class Server::MethodRequestData final : public CompletionQueueTag { buf.AddSendMessage(*res); } buf.AddServerSendStatus(&ctx_.trailing_metadata_, status); + bool cancelled; + buf.AddServerRecvClose(&cancelled); call_.PerformOps(&buf); GPR_ASSERT(cq_.Pluck(&buf)); } @@ -265,7 +268,6 @@ void Server::RunRpc() { // Wait for one more incoming rpc. bool ok; auto *mrd = MethodRequestData::Wait(&cq_, &ok); - gpr_log(GPR_DEBUG, "Wait: %p %d", mrd, ok); if (mrd) { ScheduleCallback(); if (ok) {