Eliminate ServerContextBase::Clear/Setup and fix unref process for core call

pull/24207/head
Vijay Pai 4 years ago
parent 889ea7cd09
commit f8b046e819
  1. 39
      include/grpcpp/impl/codegen/server_context.h
  2. 2
      src/cpp/client/client_context.cc
  3. 62
      src/cpp/server/server_context.cc
  4. 9
      test/cpp/interop/server_helper.cc

@ -261,7 +261,7 @@ class ServerContextBase {
/// \see grpc::AuthContext.
std::shared_ptr<const ::grpc::AuthContext> auth_context() const {
if (auth_context_.get() == nullptr) {
auth_context_ = ::grpc::CreateAuthContext(call_);
auth_context_ = ::grpc::CreateAuthContext(call_.call);
}
return auth_context_;
}
@ -277,7 +277,7 @@ class ServerContextBase {
/// Should be used for framework-level extensions only.
/// Applications never need to call this method.
grpc_call* c_call() { return call_; }
grpc_call* c_call() { return call_.call; }
protected:
/// Async only. Has to be called before the rpc starts.
@ -395,14 +395,10 @@ class ServerContextBase {
/// Return the tag queued by BeginCompletionOp()
::grpc::internal::CompletionQueueTag* GetCompletionOpTag();
void set_call(grpc_call* call) { call_ = call; }
void set_call(grpc_call* call) { call_.call = call; }
void BindDeadlineAndMetadata(gpr_timespec deadline, grpc_metadata_array* arr);
void Clear();
void Setup(gpr_timespec deadline);
uint32_t initial_metadata_flags() const { return 0; }
::grpc::experimental::ServerRpcInfo* set_server_rpc_info(
@ -421,30 +417,41 @@ class ServerContextBase {
message_allocator_state_ = allocator_state;
}
CompletionOp* completion_op_;
bool has_notify_when_done_tag_;
void* async_notify_when_done_tag_;
struct CallWrapper {
~CallWrapper();
grpc_call* call = nullptr;
};
// NOTE: call_ must be the first data member of this object so that its
// destructor is the last to be called, since its destructor may unref
// the underlying core call which holds the arena that may be used to
// hold this object.
CallWrapper call_;
CompletionOp* completion_op_ = nullptr;
bool has_notify_when_done_tag_ = false;
void* async_notify_when_done_tag_ = nullptr;
::grpc::internal::CallbackWithSuccessTag completion_tag_;
gpr_timespec deadline_;
grpc_call* call_;
::grpc::CompletionQueue* cq_;
bool sent_initial_metadata_;
::grpc::CompletionQueue* cq_ = nullptr;
bool sent_initial_metadata_ = false;
mutable std::shared_ptr<const ::grpc::AuthContext> auth_context_;
mutable ::grpc::internal::MetadataMap client_metadata_;
std::multimap<std::string, std::string> initial_metadata_;
std::multimap<std::string, std::string> trailing_metadata_;
bool compression_level_set_;
bool compression_level_set_ = false;
grpc_compression_level compression_level_;
grpc_compression_algorithm compression_algorithm_;
::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
::grpc::internal::CallOpSendMessage>
pending_ops_;
bool has_pending_ops_;
bool has_pending_ops_ = false;
::grpc::experimental::ServerRpcInfo* rpc_info_;
::grpc::experimental::ServerRpcInfo* rpc_info_ = nullptr;
::grpc::experimental::RpcAllocatorState* message_allocator_state_ = nullptr;
class Reactor : public ::grpc::ServerUnaryReactor {

@ -91,7 +91,7 @@ void ClientContext::set_credentials(
std::unique_ptr<ClientContext> ClientContext::FromInternalServerContext(
const grpc::ServerContextBase& context, PropagationOptions options) {
std::unique_ptr<ClientContext> ctx(new ClientContext);
ctx->propagate_from_call_ = context.call_;
ctx->propagate_from_call_ = context.call_.call;
ctx->propagation_options_ = options;
return ctx;
}

@ -222,61 +222,39 @@ bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
// ServerContextBase body
ServerContextBase::ServerContextBase() {
Setup(gpr_inf_future(GPR_CLOCK_REALTIME));
}
ServerContextBase::ServerContextBase()
: deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
ServerContextBase::ServerContextBase(gpr_timespec deadline,
grpc_metadata_array* arr) {
Setup(deadline);
grpc_metadata_array* arr)
: deadline_(deadline) {
std::swap(*client_metadata_.arr(), *arr);
}
void ServerContextBase::Setup(gpr_timespec deadline) {
completion_op_ = nullptr;
has_notify_when_done_tag_ = false;
async_notify_when_done_tag_ = nullptr;
deadline_ = deadline;
call_ = nullptr;
cq_ = nullptr;
sent_initial_metadata_ = false;
compression_level_set_ = false;
has_pending_ops_ = false;
rpc_info_ = nullptr;
}
void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
grpc_metadata_array* arr) {
deadline_ = deadline;
std::swap(*client_metadata_.arr(), *arr);
}
ServerContextBase::~ServerContextBase() { Clear(); }
void ServerContextBase::Clear() {
auth_context_.reset();
initial_metadata_.clear();
trailing_metadata_.clear();
client_metadata_.Reset();
ServerContextBase::~ServerContextBase() {
if (completion_op_) {
completion_op_->Unref();
completion_op_ = nullptr;
completion_tag_.Clear();
}
if (rpc_info_) {
rpc_info_->Unref();
rpc_info_ = nullptr;
}
if (call_) {
auto* call = call_;
call_ = nullptr;
grpc_call_unref(call);
}
if (default_reactor_used_.load(std::memory_order_relaxed)) {
reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
default_reactor_used_.store(false, std::memory_order_relaxed);
}
test_unary_.reset();
}
ServerContextBase::CallWrapper::~CallWrapper() {
if (call) {
// If the ServerContext is part of the call's arena, this could free the
// object itself.
grpc_call_unref(call);
}
}
void ServerContextBase::BeginCompletionOp(
@ -322,8 +300,9 @@ void ServerContextBase::TryCancel() const {
rpc_info_->RunInterceptor(&cancel_methods, i);
}
}
grpc_call_error err = grpc_call_cancel_with_status(
call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
grpc_call_error err =
grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
"Cancelled on the server side", nullptr);
if (err != GRPC_CALL_OK) {
gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
}
@ -358,8 +337,8 @@ void ServerContextBase::set_compression_algorithm(
std::string ServerContextBase::peer() const {
std::string peer;
if (call_) {
char* c_peer = grpc_call_get_peer(call_);
if (call_.call) {
char* c_peer = grpc_call_get_peer(call_.call);
peer = c_peer;
gpr_free(c_peer);
}
@ -367,12 +346,13 @@ std::string ServerContextBase::peer() const {
}
const struct census_context* ServerContextBase::census_context() const {
return call_ == nullptr ? nullptr : grpc_census_call_get_context(call_);
return call_.call == nullptr ? nullptr
: grpc_census_call_get_context(call_.call);
}
void ServerContextBase::SetLoadReportingCosts(
const std::vector<std::string>& cost_data) {
if (call_ == nullptr) return;
if (call_.call == nullptr) return;
for (const auto& cost_datum : cost_data) {
AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
}

@ -54,17 +54,18 @@ InteropServerContextInspector::InteropServerContextInspector(
grpc_compression_algorithm
InteropServerContextInspector::GetCallCompressionAlgorithm() const {
return grpc_call_test_only_get_compression_algorithm(context_.call_);
return grpc_call_test_only_get_compression_algorithm(context_.call_.call);
}
uint32_t InteropServerContextInspector::GetEncodingsAcceptedByClient() const {
return grpc_call_test_only_get_encodings_accepted_by_peer(context_.call_);
return grpc_call_test_only_get_encodings_accepted_by_peer(
context_.call_.call);
}
bool InteropServerContextInspector::WasCompressed() const {
return (grpc_call_test_only_get_message_flags(context_.call_) &
return (grpc_call_test_only_get_message_flags(context_.call_.call) &
GRPC_WRITE_INTERNAL_COMPRESS) ||
(grpc_call_test_only_get_message_flags(context_.call_) &
(grpc_call_test_only_get_message_flags(context_.call_.call) &
GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED);
}

Loading…
Cancel
Save