Merge pull request #19002 from markdroth/health_check_client_refcount_fix

Fix ref-counting bug in health check client.
pull/19007/head
Mark D. Roth 6 years ago committed by GitHub
commit 9f57c32b1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 43
      src/core/ext/filters/client_channel/health/health_check_client.cc
  2. 8
      src/core/ext/filters/client_channel/health/health_check_client.h

@ -277,8 +277,7 @@ bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error** error) {
HealthCheckClient::CallState::CallState( HealthCheckClient::CallState::CallState(
RefCountedPtr<HealthCheckClient> health_check_client, RefCountedPtr<HealthCheckClient> health_check_client,
grpc_pollset_set* interested_parties) grpc_pollset_set* interested_parties)
: InternallyRefCounted<CallState>(&grpc_health_check_client_trace), : health_check_client_(std::move(health_check_client)),
health_check_client_(std::move(health_check_client)),
pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)), pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
arena_(Arena::Create(health_check_client_->connected_subchannel_ arena_(Arena::Create(health_check_client_->connected_subchannel_
->GetInitialCallSizeEstimate(0))), ->GetInitialCallSizeEstimate(0))),
@ -322,7 +321,8 @@ void HealthCheckClient::CallState::StartCall() {
0, // parent_data_size 0, // parent_data_size
}; };
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
call_ = health_check_client_->connected_subchannel_->CreateCall(args, &error); call_ = health_check_client_->connected_subchannel_->CreateCall(args, &error)
.release();
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"HealthCheckClient %p CallState %p: error creating health " "HealthCheckClient %p CallState %p: error creating health "
@ -331,18 +331,22 @@ void HealthCheckClient::CallState::StartCall() {
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
// Schedule instead of running directly, since we must not be // Schedule instead of running directly, since we must not be
// holding health_check_client_->mu_ when CallEnded() is called. // holding health_check_client_->mu_ when CallEnded() is called.
Ref(DEBUG_LOCATION, "call_end_closure").release(); call_->Ref(DEBUG_LOCATION, "call_end_closure").release();
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&batch_.handler_private.closure, CallEndedRetry, this, GRPC_CLOSURE_INIT(&batch_.handler_private.closure, CallEndedRetry, this,
grpc_schedule_on_exec_ctx), grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
return; return;
} }
// Register after-destruction callback.
GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
this, grpc_schedule_on_exec_ctx);
call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
// Initialize payload and batch. // Initialize payload and batch.
payload_.context = context_; payload_.context = context_;
batch_.payload = &payload_; batch_.payload = &payload_;
// on_complete callback takes ref, handled manually. // on_complete callback takes ref, handled manually.
Ref(DEBUG_LOCATION, "on_complete").release(); call_->Ref(DEBUG_LOCATION, "on_complete").release();
batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this, batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
// Add send_initial_metadata op. // Add send_initial_metadata op.
@ -375,7 +379,7 @@ void HealthCheckClient::CallState::StartCall() {
payload_.recv_initial_metadata.trailing_metadata_available = nullptr; payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
payload_.recv_initial_metadata.peer_string = nullptr; payload_.recv_initial_metadata.peer_string = nullptr;
// recv_initial_metadata_ready callback takes ref, handled manually. // recv_initial_metadata_ready callback takes ref, handled manually.
Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release(); call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
payload_.recv_initial_metadata.recv_initial_metadata_ready = payload_.recv_initial_metadata.recv_initial_metadata_ready =
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady, GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
this, grpc_schedule_on_exec_ctx); this, grpc_schedule_on_exec_ctx);
@ -383,7 +387,7 @@ void HealthCheckClient::CallState::StartCall() {
// Add recv_message op. // Add recv_message op.
payload_.recv_message.recv_message = &recv_message_; payload_.recv_message.recv_message = &recv_message_;
// recv_message callback takes ref, handled manually. // recv_message callback takes ref, handled manually.
Ref(DEBUG_LOCATION, "recv_message_ready").release(); call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT( payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
&recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx); &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
batch_.recv_message = true; batch_.recv_message = true;
@ -419,7 +423,7 @@ void HealthCheckClient::CallState::StartBatchInCallCombiner(void* arg,
void HealthCheckClient::CallState::StartBatch( void HealthCheckClient::CallState::StartBatch(
grpc_transport_stream_op_batch* batch) { grpc_transport_stream_op_batch* batch) {
batch->handler_private.extra_arg = call_.get(); batch->handler_private.extra_arg = call_;
GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
batch, grpc_schedule_on_exec_ctx); batch, grpc_schedule_on_exec_ctx);
GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure, GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
@ -430,7 +434,7 @@ void HealthCheckClient::CallState::AfterCallStackDestruction(
void* arg, grpc_error* error) { void* arg, grpc_error* error) {
HealthCheckClient::CallState* self = HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg); static_cast<HealthCheckClient::CallState*>(arg);
self->Unref(DEBUG_LOCATION, "cancel"); Delete(self);
} }
void HealthCheckClient::CallState::OnCancelComplete(void* arg, void HealthCheckClient::CallState::OnCancelComplete(void* arg,
@ -438,10 +442,7 @@ void HealthCheckClient::CallState::OnCancelComplete(void* arg,
HealthCheckClient::CallState* self = HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg); static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel"); GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
GRPC_CLOSURE_INIT(&self->after_call_stack_destruction_, self->call_->Unref(DEBUG_LOCATION, "cancel");
AfterCallStackDestruction, self, grpc_schedule_on_exec_ctx);
self->call_->SetAfterCallStackDestroy(&self->after_call_stack_destruction_);
self->call_.reset();
} }
void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) { void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) {
@ -458,7 +459,7 @@ void HealthCheckClient::CallState::Cancel() {
bool expected = false; bool expected = false;
if (cancelled_.CompareExchangeStrong(&expected, true, MemoryOrder::ACQ_REL, if (cancelled_.CompareExchangeStrong(&expected, true, MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE)) { MemoryOrder::ACQUIRE)) {
Ref(DEBUG_LOCATION, "cancel").release(); call_->Ref(DEBUG_LOCATION, "cancel").release();
GRPC_CALL_COMBINER_START( GRPC_CALL_COMBINER_START(
&call_combiner_, &call_combiner_,
GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx), GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
@ -472,7 +473,7 @@ void HealthCheckClient::CallState::OnComplete(void* arg, grpc_error* error) {
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete"); GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
grpc_metadata_batch_destroy(&self->send_initial_metadata_); grpc_metadata_batch_destroy(&self->send_initial_metadata_);
grpc_metadata_batch_destroy(&self->send_trailing_metadata_); grpc_metadata_batch_destroy(&self->send_trailing_metadata_);
self->Unref(DEBUG_LOCATION, "on_complete"); self->call_->Unref(DEBUG_LOCATION, "on_complete");
} }
void HealthCheckClient::CallState::RecvInitialMetadataReady(void* arg, void HealthCheckClient::CallState::RecvInitialMetadataReady(void* arg,
@ -481,7 +482,7 @@ void HealthCheckClient::CallState::RecvInitialMetadataReady(void* arg,
static_cast<HealthCheckClient::CallState*>(arg); static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready"); GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
grpc_metadata_batch_destroy(&self->recv_initial_metadata_); grpc_metadata_batch_destroy(&self->recv_initial_metadata_);
self->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready"); self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
} }
void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) { void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) {
@ -490,7 +491,7 @@ void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) {
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
Cancel(); Cancel();
grpc_slice_buffer_destroy_internal(&recv_message_buffer_); grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
Unref(DEBUG_LOCATION, "recv_message_ready"); call_->Unref(DEBUG_LOCATION, "recv_message_ready");
return; return;
} }
const bool healthy = DecodeResponse(&recv_message_buffer_, &error); const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
@ -563,7 +564,7 @@ void HealthCheckClient::CallState::RecvMessageReady(void* arg,
static_cast<HealthCheckClient::CallState*>(arg); static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready"); GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
if (self->recv_message_ == nullptr) { if (self->recv_message_ == nullptr) {
self->Unref(DEBUG_LOCATION, "recv_message_ready"); self->call_->Unref(DEBUG_LOCATION, "recv_message_ready");
return; return;
} }
grpc_slice_buffer_init(&self->recv_message_buffer_); grpc_slice_buffer_init(&self->recv_message_buffer_);
@ -621,7 +622,7 @@ void HealthCheckClient::CallState::CallEndedRetry(void* arg,
HealthCheckClient::CallState* self = HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg); static_cast<HealthCheckClient::CallState*>(arg);
self->CallEnded(true /* retry */); self->CallEnded(true /* retry */);
self->Unref(DEBUG_LOCATION, "call_end_closure"); self->call_->Unref(DEBUG_LOCATION, "call_end_closure");
} }
void HealthCheckClient::CallState::CallEnded(bool retry) { void HealthCheckClient::CallState::CallEnded(bool retry) {
@ -644,7 +645,9 @@ void HealthCheckClient::CallState::CallEnded(bool retry) {
} }
} }
} }
Unref(DEBUG_LOCATION, "call_ended"); // When the last ref to the call stack goes away, the CallState object
// will be automatically destroyed.
call_->Unref(DEBUG_LOCATION, "call_ended");
} }
} // namespace grpc_core } // namespace grpc_core

@ -61,7 +61,7 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
private: private:
// Contains a call to the backend and all the data related to the call. // Contains a call to the backend and all the data related to the call.
class CallState : public InternallyRefCounted<CallState> { class CallState : public Orphanable {
public: public:
CallState(RefCountedPtr<HealthCheckClient> health_check_client, CallState(RefCountedPtr<HealthCheckClient> health_check_client,
grpc_pollset_set* interested_parties_); grpc_pollset_set* interested_parties_);
@ -101,8 +101,10 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
grpc_core::CallCombiner call_combiner_; grpc_core::CallCombiner call_combiner_;
grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
// The streaming call to the backend. Always non-NULL. // The streaming call to the backend. Always non-null.
RefCountedPtr<SubchannelCall> call_; // Refs are tracked manually; when the last ref is released, the
// CallState object will be automatically destroyed.
SubchannelCall* call_;
grpc_transport_stream_op_batch_payload payload_; grpc_transport_stream_op_batch_payload payload_;
grpc_transport_stream_op_batch batch_; grpc_transport_stream_op_batch batch_;

Loading…
Cancel
Save