Solve memory leak due to double setting of set_server_rpc_info

pull/16842/head
Yash Tibrewal 6 years ago
parent 62280b42c7
commit 281de1bb30
  1. 5
      include/grpcpp/impl/codegen/method_handler_impl.h
  2. 10
      include/grpcpp/impl/codegen/server_context.h
  3. 16
      include/grpcpp/impl/codegen/server_interceptor.h
  4. 4
      include/grpcpp/impl/codegen/server_interface.h
  5. 1
      src/cpp/server/server_cc.cc
  6. 12
      src/cpp/server/server_context.cc

@ -59,7 +59,6 @@ class RpcMethodHandler : public MethodHandler {
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
gpr_log(GPR_ERROR, "running handler");
ResponseType rsp;
Status status = param.status;
if (status.ok()) {
@ -121,7 +120,6 @@ class ClientStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
gpr_log(GPR_ERROR, "running client streaming handler");
ServerReader<RequestType> reader(param.call, param.server_context);
ResponseType rsp;
Status status = CatchingFunctionHandler([this, &param, &reader, &rsp] {
@ -165,7 +163,6 @@ class ServerStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
gpr_log(GPR_ERROR, "running server streaming handler");
Status status = param.status;
if (status.ok()) {
ServerWriter<ResponseType> writer(param.call, param.server_context);
@ -227,7 +224,6 @@ class TemplatedBidiStreamingHandler : public MethodHandler {
: func_(func), write_needed_(WriteNeeded) {}
void RunHandler(const HandlerParameter& param) final {
gpr_log(GPR_ERROR, "running bidi streaming handler");
Streamer stream(param.call, param.server_context);
Status status = CatchingFunctionHandler([this, &param, &stream] {
return func_(param.server_context, &stream);
@ -321,7 +317,6 @@ class ErrorMethodHandler : public MethodHandler {
}
void RunHandler(const HandlerParameter& param) final {
gpr_log(GPR_ERROR, "running error handler");
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
FillOps(param.server_context, &ops);
param.call->PerformOps(&ops);

@ -290,9 +290,11 @@ class ServerContext {
const std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>&
creators) {
rpc_info_ = experimental::ServerRpcInfo(this, method);
rpc_info_.RegisterInterceptors(creators);
return &rpc_info_;
if (creators.size() != 0) {
rpc_info_ = new experimental::ServerRpcInfo(this, method);
rpc_info_->RegisterInterceptors(creators);
}
return rpc_info_;
}
CompletionOp* completion_op_;
@ -317,7 +319,7 @@ class ServerContext {
pending_ops_;
bool has_pending_ops_;
experimental::ServerRpcInfo rpc_info_;
experimental::ServerRpcInfo* rpc_info_ = nullptr;
};
} // namespace grpc

@ -19,6 +19,7 @@
#ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERCEPTOR_H
#define GRPCPP_IMPL_CODEGEN_SERVER_INTERCEPTOR_H
#include <atomic>
#include <vector>
#include <grpc/impl/codegen/log.h>
@ -44,8 +45,6 @@ class ServerInterceptorFactoryInterface {
class ServerRpcInfo {
public:
ServerRpcInfo() {}
~ServerRpcInfo(){};
ServerRpcInfo(const ServerRpcInfo&) = delete;
@ -67,7 +66,9 @@ class ServerRpcInfo {
private:
ServerRpcInfo(grpc::ServerContext* ctx, const char* method)
: ctx_(ctx), method_(method) {}
: ctx_(ctx), method_(method) {
ref_.store(1);
}
void RegisterInterceptors(
const std::vector<
@ -78,8 +79,17 @@ class ServerRpcInfo {
creator->CreateServerInterceptor(this)));
}
}
void Ref() { ref_++; }
void Unref() {
if (--ref_ == 0) {
delete this;
}
}
grpc::ServerContext* ctx_ = nullptr;
const char* method_ = nullptr;
std::atomic_int ref_;
std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_;
friend class internal::InterceptorBatchMethodsImpl;

@ -270,10 +270,6 @@ class ServerInterface : public internal::CallHook {
return false;
}
}
call_wrapper_ = internal::Call(
call_, server_, call_cq_, server_->max_receive_message_size(),
context_->set_server_rpc_info(name_,
*server_->interceptor_creators()));
/* Set interception point for recv message */
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);

@ -275,7 +275,6 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
global_callbacks_->PreSynchronousRequest(&ctx_);
auto* handler = resources_ ? method_->handler()
: server_->resource_exhausted_handler_.get();
gpr_log(GPR_ERROR, "got method %s", method_->name());
handler->RunHandler(internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_, request_status_));
request_ = nullptr;

@ -48,6 +48,12 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
cancelled_(0),
done_intercepting_(false) {}
~CompletionOp() {
if (call_.server_rpc_info()) {
call_.server_rpc_info()->Unref();
}
}
void FillOps(internal::Call* call) override;
bool FinalizeResult(void** tag, bool* status) override;
@ -210,10 +216,16 @@ ServerContext::~ServerContext() {
if (completion_op_) {
completion_op_->Unref();
}
if (rpc_info_) {
rpc_info_->Unref();
}
}
void ServerContext::BeginCompletionOp(internal::Call* call) {
GPR_ASSERT(!completion_op_);
if (rpc_info_) {
rpc_info_->Ref();
}
completion_op_ = new CompletionOp();
if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);

Loading…
Cancel
Save