Address reviewer comments.

pull/16988/head
Vijay Pai 6 years ago
parent 2f47137a6e
commit 932abf48a3
  1. 2
      include/grpcpp/impl/codegen/async_unary_call.h
  2. 22
      include/grpcpp/impl/codegen/call_op_set.h
  3. 4
      include/grpcpp/impl/codegen/call_op_set_interface.h
  4. 2
      include/grpcpp/impl/codegen/client_callback.h
  5. 11
      include/grpcpp/impl/codegen/rpc_service_method.h
  6. 80
      include/grpcpp/impl/codegen/server_callback.h
  7. 6
      src/compiler/cpp_generator.cc
  8. 4
      src/cpp/client/channel_cc.cc
  9. 10
      src/cpp/common/completion_queue_cc.cc
  10. 4
      src/cpp/server/server_cc.cc
  11. 16
      src/cpp/server/server_context.cc

@ -240,7 +240,7 @@ class ServerAsyncResponseWriter final
/// metadata.
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.set_output_tag(tag);
finish_buf_.set_cq_tag(&finish_buf_);
finish_buf_.set_core_cq_tag(&finish_buf_);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());

@ -770,19 +770,19 @@ class CallOpSet : public CallOpSetInterface,
public Op5,
public Op6 {
public:
CallOpSet() : cq_tag_(this), return_tag_(this) {}
CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
// The copy constructor and assignment operator reset the value of
// cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ since
// those are only meaningful on a specific object, not across objects.
// core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
// since those are only meaningful on a specific object, not across objects.
CallOpSet(const CallOpSet& other)
: cq_tag_(this),
: core_cq_tag_(this),
return_tag_(this),
call_(other.call_),
done_intercepting_(false),
interceptor_methods_(InterceptorBatchMethodsImpl()) {}
CallOpSet& operator=(const CallOpSet& other) {
cq_tag_ = this;
core_cq_tag_ = this;
return_tag_ = this;
call_ = other.call_;
done_intercepting_ = false;
@ -834,13 +834,13 @@ class CallOpSet : public CallOpSetInterface,
void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
void* cq_tag() override { return cq_tag_; }
void* core_cq_tag() override { return core_cq_tag_; }
/// set_cq_tag is used to provide a different core CQ tag than "this".
/// set_core_cq_tag is used to provide a different core CQ tag than "this".
/// This is used for callback-based tags, where the core tag is the core
/// callback function. It does not change the use or behavior of any other
/// function (such as FinalizeResult)
void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
// This will be called while interceptors are run if the RPC is a hijacked
// RPC. This should set hijacking state for each of the ops.
@ -866,7 +866,7 @@ class CallOpSet : public CallOpSetInterface,
this->Op6::AddOp(ops, &nops);
GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
g_core_codegen_interface->grpc_call_start_batch(
call_.call(), ops, nops, cq_tag(), nullptr));
call_.call(), ops, nops, core_cq_tag(), nullptr));
}
// Should be called after interceptors are done running on the finalize result
@ -875,7 +875,7 @@ class CallOpSet : public CallOpSetInterface,
done_intercepting_ = true;
GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
g_core_codegen_interface->grpc_call_start_batch(
call_.call(), nullptr, 0, cq_tag(), nullptr));
call_.call(), nullptr, 0, core_cq_tag(), nullptr));
}
private:
@ -906,7 +906,7 @@ class CallOpSet : public CallOpSetInterface,
return interceptor_methods_.RunInterceptors();
}
void* cq_tag_;
void* core_cq_tag_;
void* return_tag_;
Call call_;
bool done_intercepting_ = false;

@ -38,9 +38,9 @@ class CallOpSetInterface : public CompletionQueueTag {
virtual void FillOps(internal::Call* call) = 0;
/// Get the tag to be used at the core completion queue. Generally, the
/// value of cq_tag will be "this". However, it can be overridden if we
/// value of core_cq_tag will be "this". However, it can be overridden if we
/// want core to process the tag differently (e.g., as a core callback)
virtual void* cq_tag() = 0;
virtual void* core_cq_tag() = 0;
// This will be called while interceptors are run if the RPC is a hijacked
// RPC. This should set hijacking state for each of the ops.

@ -84,7 +84,7 @@ class CallbackUnaryCallImpl {
ops->AllowNoMessage();
ops->ClientSendClose();
ops->ClientRecvStatus(context, tag->status_ptr());
ops->set_cq_tag(tag);
ops->set_core_cq_tag(tag);
call.PerformOps(ops);
}
};

@ -46,21 +46,22 @@ class MethodHandler {
/// \param context : the ServerContext structure for this server call
/// \param req : the request payload, if appropriate for this RPC
/// \param req_status : the request status after any interceptors have run
/// \param renew : used only by the callback API. It is a function
/// called by the RPC Controller to request another RPC
/// \param rpc_requester : used only by the callback API. It is a function
/// called by the RPC Controller to request another RPC (and also
/// to set up the state required to make that request possible)
HandlerParameter(Call* c, ServerContext* context, void* req,
Status req_status, std::function<void()> renew)
Status req_status, std::function<void()> requester)
: call(c),
server_context(context),
request(req),
status(req_status),
renewer(std::move(renew)) {}
call_requester(std::move(requester)) {}
~HandlerParameter() {}
Call* call;
ServerContext* server_context;
void* request;
Status status;
std::function<void()> renewer;
std::function<void()> call_requester;
};
virtual void RunHandler(const HandlerParameter& param) = 0;

@ -48,7 +48,6 @@ class ServerCallbackRpcController {
// The method handler must call this function when it is done so that
// the library knows to free its resources
virtual void Finish(Status s) = 0;
virtual void FinishWithError(Status s) = 0;
// Allow the method handler to push out the initial metadata before
// the response and status are ready
@ -75,7 +74,8 @@ class CallbackUnaryHandler : public MethodHandler {
param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
ServerCallbackRpcControllerImpl(
param.server_context, param.call,
static_cast<RequestType*>(param.request), std::move(param.renewer));
static_cast<RequestType*>(param.request),
std::move(param.call_requester));
Status status = param.status;
if (status.ok()) {
@ -112,52 +112,19 @@ class CallbackUnaryHandler : public MethodHandler {
// The implementation class of ServerCallbackRpcController is a private member
// of CallbackUnaryHandler since it is never exposed anywhere, and this allows
// it to take advantage of CallbackUnaryHandler's friendships.
class ServerCallbackRpcControllerImpl
: public experimental::ServerCallbackRpcController {
public:
void Finish(Status s) override { FinishInternal(std::move(s), false); }
void FinishWithError(Status s) override {
FinishInternal(std::move(s), true);
}
void SendInitialMetadata(std::function<void(bool)> f) override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_tag_ =
CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_);
meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
meta_buf_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
meta_buf_.set_cq_tag(&meta_tag_);
call_.PerformOps(&meta_buf_);
}
private:
template <class SrvType, class ReqType, class RespType>
friend class CallbackUnaryHandler;
ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call,
RequestType* req,
std::function<void()> renewer)
: ctx_(ctx), call_(*call), req_(req), renewer_(std::move(renewer)) {}
~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
void FinishInternal(Status s, bool allow_error) {
void Finish(Status s) override {
finish_tag_ = CallbackWithSuccessTag(
call_.call(),
[this](bool) {
grpc_call* call = call_.call();
auto renewer = std::move(renewer_);
auto call_requester = std::move(call_requester_);
this->~ServerCallbackRpcControllerImpl(); // explicitly call
// destructor
g_core_codegen_interface->grpc_call_unref(call);
renewer();
call_requester();
},
&finish_buf_);
if (!ctx_->sent_initial_metadata_) {
@ -168,17 +135,46 @@ class CallbackUnaryHandler : public MethodHandler {
}
ctx_->sent_initial_metadata_ = true;
}
// The response may be dropped if the status is not OK.
if (allow_error || s.ok()) {
// The response is dropped if the status is not OK.
if (s.ok()) {
finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
finish_buf_.SendMessage(resp_));
} else {
finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s);
}
finish_buf_.set_cq_tag(&finish_tag_);
finish_buf_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_buf_);
}
void SendInitialMetadata(std::function<void(bool)> f) override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_tag_ =
CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_);
meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
meta_buf_.set_compression_level(ctx_->compression_level());
}
ctx_->sent_initial_metadata_ = true;
meta_buf_.set_core_cq_tag(&meta_tag_);
call_.PerformOps(&meta_buf_);
}
private:
template <class SrvType, class ReqType, class RespType>
friend class CallbackUnaryHandler;
ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call,
RequestType* req,
std::function<void()> call_requester)
: ctx_(ctx),
call_(*call),
req_(req),
call_requester_(std::move(call_requester)) {}
~ServerCallbackRpcControllerImpl() { req_->~RequestType(); }
RequestType* request() { return req_; }
ResponseType* response() { return &resp_; }
@ -193,7 +189,7 @@ class CallbackUnaryHandler : public MethodHandler {
Call call_;
RequestType* req_;
ResponseType resp_;
std::function<void()> renewer_;
std::function<void()> call_requester_;
};
};

@ -926,8 +926,11 @@ void PrintHeaderServerMethodCallback(
"Method$(context, request, response, controller);\n"
" }, this));\n");
} else if (ClientOnlyStreaming(method)) {
// TODO(vjpai): Add in code generation for all streaming methods
} else if (ServerOnlyStreaming(method)) {
// TODO(vjpai): Add in code generation for all streaming methods
} else if (method->BidiStreaming()) {
// TODO(vjpai): Add in code generation for all streaming methods
}
printer->Print(*vars, "}\n");
printer->Print(*vars,
@ -975,8 +978,11 @@ void PrintHeaderServerMethodRawCallback(
"Method$(context, request, response, controller);\n"
" }, this));\n");
} else if (ClientOnlyStreaming(method)) {
// TODO(vjpai): Add in code generation for all streaming methods
} else if (ServerOnlyStreaming(method)) {
// TODO(vjpai): Add in code generation for all streaming methods
} else if (method->BidiStreaming()) {
// TODO(vjpai): Add in code generation for all streaming methods
}
printer->Print(*vars, "}\n");
printer->Print(*vars,

@ -225,7 +225,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
static void Run(grpc_experimental_completion_queue_functor* cb, int) {
auto* callback = static_cast<ShutdownCallback*>(cb);
delete callback->cq_;
grpc_core::Delete(callback);
delete callback;
}
private:
@ -238,7 +238,7 @@ CompletionQueue* Channel::CallbackCQ() {
// if there is no explicit per-channel CQ registered
std::lock_guard<std::mutex> l(mu_);
if (callback_cq_ == nullptr) {
auto* shutdown_callback = grpc_core::New<ShutdownCallback>();
auto* shutdown_callback = new ShutdownCallback;
callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});

@ -60,10 +60,10 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
case GRPC_QUEUE_SHUTDOWN:
return SHUTDOWN;
case GRPC_OP_COMPLETE:
auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag);
auto core_cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag);
*ok = ev.success != 0;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
*tag = core_cq_tag;
if (core_cq_tag->FinalizeResult(tag, ok)) {
return GOT_EVENT;
}
break;
@ -87,9 +87,9 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
flushed_ = true;
if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
&res)) {
auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag);
auto core_cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag);
*ok = res == 1;
if (cq_tag->FinalizeResult(tag, ok)) {
if (core_cq_tag->FinalizeResult(tag, ok)) {
return true;
}
}

@ -1111,7 +1111,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
static void Run(grpc_experimental_completion_queue_functor* cb, int) {
auto* callback = static_cast<ShutdownCallback*>(cb);
delete callback->cq_;
grpc_core::Delete(callback);
delete callback;
}
private:
@ -1124,7 +1124,7 @@ CompletionQueue* Server::CallbackCQ() {
// if there is no explicit per-server CQ registered
std::lock_guard<std::mutex> l(mu_);
if (callback_cq_ == nullptr) {
auto* shutdown_callback = grpc_core::New<ShutdownCallback>();
auto* shutdown_callback = new ShutdownCallback;
callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
shutdown_callback});

@ -45,7 +45,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
: call_(*call),
has_tag_(false),
tag_(nullptr),
cq_tag_(this),
core_cq_tag_(this),
refs_(2),
finalized_(false),
cancelled_(0),
@ -92,9 +92,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
tag_ = tag;
}
void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
void* cq_tag() override { return cq_tag_; }
void* core_cq_tag() override { return core_cq_tag_; }
void Unref();
@ -138,7 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
internal::Call call_;
bool has_tag_;
void* tag_;
void* cq_tag_;
void* core_cq_tag_;
std::mutex mu_;
int refs_;
bool finalized_;
@ -166,8 +166,8 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) {
interceptor_methods_.SetCall(&call_);
interceptor_methods_.SetReverse();
interceptor_methods_.SetCallOpSetInterface(this);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), &ops, 1, cq_tag_, nullptr));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1,
core_cq_tag_, nullptr));
/* No interceptors to run here */
}
@ -272,7 +272,7 @@ void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) {
if (callback) {
completion_tag_ =
internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_);
completion_op_->set_cq_tag(&completion_tag_);
completion_op_->set_core_cq_tag(&completion_tag_);
} else if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}
@ -305,7 +305,7 @@ bool ServerContext::IsCancelled() const {
if (completion_tag_) {
// When using callback API, this result is always valid.
return completion_op_->CheckCancelledAsync();
} else if (completion_tag_ || has_notify_when_done_tag_) {
} else if (has_notify_when_done_tag_) {
// When using async API, the result is only valid
// if the tag has already been delivered at the completion queue
return completion_op_ && completion_op_->CheckCancelledAsync();

Loading…
Cancel
Save