mirror of https://github.com/grpc/grpc.git
refactor health check client to allow reuse for OOB backend metric reporting (#29024)
* refactor health check client to allow reuse for OOB backend metric reporting * clang-format * improve commentspull/28958/head^2
parent
e8c5c39143
commit
0c7b37a4a3
19 changed files with 911 additions and 727 deletions
@ -1,177 +1,41 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <atomic> |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/sync.h> |
||||
#include <string> |
||||
|
||||
#include "src/core/ext/filters/client_channel/client_channel_channelz.h" |
||||
#include "src/core/ext/filters/client_channel/subchannel.h" |
||||
#include "src/core/lib/backoff/backoff.h" |
||||
#include "src/core/ext/filters/client_channel/subchannel_stream_client.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/iomgr/call_combiner.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/polling_entity.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/transport/byte_stream.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> { |
||||
public: |
||||
HealthCheckClient(std::string service_name, |
||||
OrphanablePtr<SubchannelStreamClient> MakeHealthCheckClient( |
||||
std::string service_name, |
||||
RefCountedPtr<ConnectedSubchannel> connected_subchannel, |
||||
grpc_pollset_set* interested_parties, |
||||
RefCountedPtr<channelz::SubchannelNode> channelz_node, |
||||
RefCountedPtr<ConnectivityStateWatcherInterface> watcher); |
||||
|
||||
~HealthCheckClient() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
private: |
||||
// Contains a call to the backend and all the data related to the call.
|
||||
class CallState : public Orphanable { |
||||
public: |
||||
CallState(RefCountedPtr<HealthCheckClient> health_check_client, |
||||
grpc_pollset_set* interested_parties); |
||||
~CallState() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
void StartCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthCheckClient::mu_); |
||||
|
||||
private: |
||||
void Cancel(); |
||||
|
||||
void StartBatch(grpc_transport_stream_op_batch* batch); |
||||
static void StartBatchInCallCombiner(void* arg, grpc_error_handle error); |
||||
|
||||
void CallEndedLocked(bool retry) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(health_check_client_->mu_); |
||||
|
||||
static void OnComplete(void* arg, grpc_error_handle error); |
||||
static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); |
||||
static void RecvMessageReady(void* arg, grpc_error_handle error); |
||||
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); |
||||
static void StartCancel(void* arg, grpc_error_handle error); |
||||
static void OnCancelComplete(void* arg, grpc_error_handle error); |
||||
|
||||
static void OnByteStreamNext(void* arg, grpc_error_handle error); |
||||
void ContinueReadingRecvMessage(); |
||||
grpc_error_handle PullSliceFromRecvMessage(); |
||||
void DoneReadingRecvMessage(grpc_error_handle error); |
||||
|
||||
static void AfterCallStackDestruction(void* arg, grpc_error_handle error); |
||||
|
||||
RefCountedPtr<HealthCheckClient> health_check_client_; |
||||
grpc_polling_entity pollent_; |
||||
|
||||
ScopedArenaPtr arena_; |
||||
CallCombiner call_combiner_; |
||||
grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; |
||||
|
||||
// The streaming call to the backend. Always non-null.
|
||||
// Refs are tracked manually; when the last ref is released, the
|
||||
// CallState object will be automatically destroyed.
|
||||
SubchannelCall* call_; |
||||
|
||||
grpc_transport_stream_op_batch_payload payload_; |
||||
grpc_transport_stream_op_batch batch_; |
||||
grpc_transport_stream_op_batch recv_message_batch_; |
||||
grpc_transport_stream_op_batch recv_trailing_metadata_batch_; |
||||
|
||||
grpc_closure on_complete_; |
||||
|
||||
// send_initial_metadata
|
||||
grpc_metadata_batch send_initial_metadata_; |
||||
|
||||
// send_message
|
||||
ManualConstructor<SliceBufferByteStream> send_message_; |
||||
|
||||
// send_trailing_metadata
|
||||
grpc_metadata_batch send_trailing_metadata_; |
||||
|
||||
// recv_initial_metadata
|
||||
grpc_metadata_batch recv_initial_metadata_; |
||||
grpc_closure recv_initial_metadata_ready_; |
||||
|
||||
// recv_message
|
||||
OrphanablePtr<ByteStream> recv_message_; |
||||
grpc_closure recv_message_ready_; |
||||
grpc_slice_buffer recv_message_buffer_; |
||||
std::atomic<bool> seen_response_{false}; |
||||
|
||||
// True if the cancel_stream batch has been started.
|
||||
std::atomic<bool> cancelled_{false}; |
||||
|
||||
// recv_trailing_metadata
|
||||
grpc_metadata_batch recv_trailing_metadata_; |
||||
grpc_transport_stream_stats collect_stats_; |
||||
grpc_closure recv_trailing_metadata_ready_; |
||||
|
||||
// Closure for call stack destruction.
|
||||
grpc_closure after_call_stack_destruction_; |
||||
}; |
||||
|
||||
void StartCall(); |
||||
void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
|
||||
void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
static void OnRetryTimer(void* arg, grpc_error_handle error); |
||||
|
||||
void SetHealthStatus(grpc_connectivity_state state, const char* reason); |
||||
void SetHealthStatusLocked(grpc_connectivity_state state, const char* reason) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); |
||||
|
||||
std::string service_name_; |
||||
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; |
||||
grpc_pollset_set* interested_parties_; // Do not own.
|
||||
RefCountedPtr<channelz::SubchannelNode> channelz_node_; |
||||
MemoryAllocator call_allocator_; |
||||
|
||||
Mutex mu_; |
||||
RefCountedPtr<ConnectivityStateWatcherInterface> watcher_ |
||||
ABSL_GUARDED_BY(mu_); |
||||
bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; |
||||
|
||||
// The data associated with the current health check call. It holds a ref
|
||||
// to this HealthCheckClient object.
|
||||
OrphanablePtr<CallState> call_state_ ABSL_GUARDED_BY(mu_); |
||||
|
||||
// Call retry state.
|
||||
BackOff retry_backoff_ ABSL_GUARDED_BY(mu_); |
||||
grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_); |
||||
grpc_closure retry_timer_callback_ ABSL_GUARDED_BY(mu_); |
||||
bool retry_timer_callback_pending_ ABSL_GUARDED_BY(mu_) = false; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H */ |
||||
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
|
||||
|
@ -0,0 +1,531 @@ |
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/subchannel_stream_client.h" |
||||
|
||||
#include <stdint.h> |
||||
#include <stdio.h> |
||||
|
||||
#include <grpc/status.h> |
||||
|
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/resource_quota/api.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/transport/error_utils.h" |
||||
|
||||
#define SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS 1 |
||||
#define SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER 1.6 |
||||
#define SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS 120 |
||||
#define SUBCHANNEL_STREAM_RECONNECT_JITTER 0.2 |
||||
|
||||
namespace grpc_core { |
||||
|
||||
//
|
||||
// SubchannelStreamClient
|
||||
//
|
||||
|
||||
SubchannelStreamClient::SubchannelStreamClient( |
||||
RefCountedPtr<ConnectedSubchannel> connected_subchannel, |
||||
grpc_pollset_set* interested_parties, |
||||
std::unique_ptr<CallEventHandler> event_handler, const char* tracer) |
||||
: InternallyRefCounted<SubchannelStreamClient>(tracer), |
||||
connected_subchannel_(std::move(connected_subchannel)), |
||||
interested_parties_(interested_parties), |
||||
tracer_(tracer), |
||||
call_allocator_( |
||||
ResourceQuotaFromChannelArgs(connected_subchannel_->args()) |
||||
->memory_quota() |
||||
->CreateMemoryAllocator(tracer)), |
||||
event_handler_(std::move(event_handler)), |
||||
retry_backoff_( |
||||
BackOff::Options() |
||||
.set_initial_backoff(Duration::Seconds( |
||||
SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS)) |
||||
.set_multiplier(SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER) |
||||
.set_jitter(SUBCHANNEL_STREAM_RECONNECT_JITTER) |
||||
.set_max_backoff(Duration::Seconds( |
||||
SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS))) { |
||||
if (GPR_UNLIKELY(tracer_ != nullptr)) { |
||||
gpr_log(GPR_INFO, "%s %p: created SubchannelStreamClient", tracer_, this); |
||||
} |
||||
GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this, |
||||
grpc_schedule_on_exec_ctx); |
||||
StartCall(); |
||||
} |
||||
|
||||
SubchannelStreamClient::~SubchannelStreamClient() { |
||||
if (GPR_UNLIKELY(tracer_ != nullptr)) { |
||||
gpr_log(GPR_INFO, "%s %p: destroying SubchannelStreamClient", tracer_, |
||||
this); |
||||
} |
||||
} |
||||
|
||||
void SubchannelStreamClient::Orphan() { |
||||
if (GPR_UNLIKELY(tracer_ != nullptr)) { |
||||
gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient shutting down", tracer_, |
||||
this); |
||||
} |
||||
{ |
||||
MutexLock lock(&mu_); |
||||
event_handler_.reset(); |
||||
call_state_.reset(); |
||||
if (retry_timer_callback_pending_) { |
||||
grpc_timer_cancel(&retry_timer_); |
||||
} |
||||
} |
||||
Unref(DEBUG_LOCATION, "orphan"); |
||||
} |
||||
|
||||
void SubchannelStreamClient::StartCall() { |
||||
MutexLock lock(&mu_); |
||||
StartCallLocked(); |
||||
} |
||||
|
||||
void SubchannelStreamClient::StartCallLocked() { |
||||
if (event_handler_ == nullptr) return; |
||||
GPR_ASSERT(call_state_ == nullptr); |
||||
if (event_handler_ != nullptr) { |
||||
event_handler_->OnCallStartLocked(this); |
||||
} |
||||
call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_); |
||||
if (GPR_UNLIKELY(tracer_ != nullptr)) { |
||||
gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient created CallState %p", |
||||
tracer_, this, call_state_.get()); |
||||
} |
||||
call_state_->StartCallLocked(); |
||||
} |
||||
|
||||
void SubchannelStreamClient::StartRetryTimerLocked() { |
||||
if (event_handler_ != nullptr) { |
||||
event_handler_->OnRetryTimerStartLocked(this); |
||||
} |
||||
Timestamp next_try = retry_backoff_.NextAttemptTime(); |
||||
if (GPR_UNLIKELY(tracer_ != nullptr)) { |
||||
gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient health check call lost...", |
||||
tracer_, this); |
||||
Duration timeout = next_try - ExecCtx::Get()->Now(); |
||||
if (timeout > Duration::Zero()) { |
||||
gpr_log(GPR_INFO, "%s %p: ... will retry in %" PRId64 "ms.", tracer_, |
||||
this, timeout.millis()); |
||||
} else { |
||||
gpr_log(GPR_INFO, "%s %p: ... retrying immediately.", tracer_, this); |
||||
} |
||||
} |
||||
// Ref for callback, tracked manually.
|
||||
Ref(DEBUG_LOCATION, "health_retry_timer").release(); |
||||
retry_timer_callback_pending_ = true; |
||||
grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_); |
||||
} |
||||
|
||||
void SubchannelStreamClient::OnRetryTimer(void* arg, grpc_error_handle error) { |
||||
auto* self = static_cast<SubchannelStreamClient*>(arg); |
||||
{ |
||||
MutexLock lock(&self->mu_); |
||||
self->retry_timer_callback_pending_ = false; |
||||
if (self->event_handler_ != nullptr && error == GRPC_ERROR_NONE && |
||||
self->call_state_ == nullptr) { |
||||
if (GPR_UNLIKELY(self->tracer_ != nullptr)) { |
||||
gpr_log(GPR_INFO, |
||||
"%s %p: SubchannelStreamClient restarting health check call", |
||||
self->tracer_, self); |
||||
} |
||||
self->StartCallLocked(); |
||||
} |
||||
} |
||||
self->Unref(DEBUG_LOCATION, "health_retry_timer"); |
||||
} |
||||
|
||||
//
|
||||
// SubchannelStreamClient::CallState
|
||||
//
|
||||
|
||||
SubchannelStreamClient::CallState::CallState( |
||||
RefCountedPtr<SubchannelStreamClient> health_check_client, |
||||
grpc_pollset_set* interested_parties) |
||||
: subchannel_stream_client_(std::move(health_check_client)), |
||||
pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)), |
||||
arena_(Arena::Create(subchannel_stream_client_->connected_subchannel_ |
||||
->GetInitialCallSizeEstimate(), |
||||
&subchannel_stream_client_->call_allocator_)), |
||||
payload_(context_), |
||||
send_initial_metadata_(arena_.get()), |
||||
send_trailing_metadata_(arena_.get()), |
||||
recv_initial_metadata_(arena_.get()), |
||||
recv_trailing_metadata_(arena_.get()) {} |
||||
|
||||
SubchannelStreamClient::CallState::~CallState() { |
||||
if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) { |
||||
gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient destroying CallState %p", |
||||
subchannel_stream_client_->tracer_, subchannel_stream_client_.get(), |
||||
this); |
||||
} |
||||
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { |
||||
if (context_[i].destroy != nullptr) { |
||||
context_[i].destroy(context_[i].value); |
||||
} |
||||
} |
||||
// Unset the call combiner cancellation closure. This has the
|
||||
// effect of scheduling the previously set cancellation closure, if
|
||||
// any, so that it can release any internal references it may be
|
||||
// holding to the call stack.
|
||||
call_combiner_.SetNotifyOnCancel(nullptr); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::Orphan() { |
||||
call_combiner_.Cancel(GRPC_ERROR_CANCELLED); |
||||
Cancel(); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::StartCallLocked() { |
||||
SubchannelCall::Args args = { |
||||
subchannel_stream_client_->connected_subchannel_, |
||||
&pollent_, |
||||
Slice::FromStaticString("/grpc.health.v1.Health/Watch"), |
||||
gpr_get_cycle_counter(), // start_time
|
||||
Timestamp::InfFuture(), // deadline
|
||||
arena_.get(), |
||||
context_, |
||||
&call_combiner_, |
||||
}; |
||||
grpc_error_handle error = GRPC_ERROR_NONE; |
||||
call_ = SubchannelCall::Create(std::move(args), &error).release(); |
||||
// Register after-destruction callback.
|
||||
GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction, |
||||
this, grpc_schedule_on_exec_ctx); |
||||
call_->SetAfterCallStackDestroy(&after_call_stack_destruction_); |
||||
// Check if creation failed.
|
||||
if (error != GRPC_ERROR_NONE || |
||||
subchannel_stream_client_->event_handler_ == nullptr) { |
||||
gpr_log(GPR_ERROR, |
||||
"SubchannelStreamClient %p CallState %p: error creating " |
||||
"stream on subchannel (%s); will retry", |
||||
subchannel_stream_client_.get(), this, |
||||
grpc_error_std_string(error).c_str()); |
||||
GRPC_ERROR_UNREF(error); |
||||
CallEndedLocked(/*retry=*/true); |
||||
return; |
||||
} |
||||
// Initialize payload and batch.
|
||||
payload_.context = context_; |
||||
batch_.payload = &payload_; |
||||
// on_complete callback takes ref, handled manually.
|
||||
call_->Ref(DEBUG_LOCATION, "on_complete").release(); |
||||
batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this, |
||||
grpc_schedule_on_exec_ctx); |
||||
// Add send_initial_metadata op.
|
||||
send_initial_metadata_.Set( |
||||
HttpPathMetadata(), |
||||
subchannel_stream_client_->event_handler_->GetPathLocked()); |
||||
GPR_ASSERT(error == GRPC_ERROR_NONE); |
||||
payload_.send_initial_metadata.send_initial_metadata = |
||||
&send_initial_metadata_; |
||||
payload_.send_initial_metadata.send_initial_metadata_flags = 0; |
||||
payload_.send_initial_metadata.peer_string = nullptr; |
||||
batch_.send_initial_metadata = true; |
||||
// Add send_message op.
|
||||
grpc_slice request_slice = |
||||
subchannel_stream_client_->event_handler_->EncodeSendMessageLocked(); |
||||
grpc_slice_buffer slice_buffer; |
||||
grpc_slice_buffer_init(&slice_buffer); |
||||
grpc_slice_buffer_add(&slice_buffer, request_slice); |
||||
send_message_.emplace(&slice_buffer, 0); |
||||
grpc_slice_buffer_destroy_internal(&slice_buffer); |
||||
payload_.send_message.send_message.reset(&*send_message_); |
||||
batch_.send_message = true; |
||||
// Add send_trailing_metadata op.
|
||||
payload_.send_trailing_metadata.send_trailing_metadata = |
||||
&send_trailing_metadata_; |
||||
batch_.send_trailing_metadata = true; |
||||
// Add recv_initial_metadata op.
|
||||
payload_.recv_initial_metadata.recv_initial_metadata = |
||||
&recv_initial_metadata_; |
||||
payload_.recv_initial_metadata.recv_flags = nullptr; |
||||
payload_.recv_initial_metadata.trailing_metadata_available = nullptr; |
||||
payload_.recv_initial_metadata.peer_string = nullptr; |
||||
// recv_initial_metadata_ready callback takes ref, handled manually.
|
||||
call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release(); |
||||
payload_.recv_initial_metadata.recv_initial_metadata_ready = |
||||
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady, |
||||
this, grpc_schedule_on_exec_ctx); |
||||
batch_.recv_initial_metadata = true; |
||||
// Add recv_message op.
|
||||
payload_.recv_message.recv_message = &recv_message_; |
||||
payload_.recv_message.call_failed_before_recv_message = nullptr; |
||||
// recv_message callback takes ref, handled manually.
|
||||
call_->Ref(DEBUG_LOCATION, "recv_message_ready").release(); |
||||
payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT( |
||||
&recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx); |
||||
batch_.recv_message = true; |
||||
// Start batch.
|
||||
StartBatch(&batch_); |
||||
// Initialize recv_trailing_metadata batch.
|
||||
recv_trailing_metadata_batch_.payload = &payload_; |
||||
// Add recv_trailing_metadata op.
|
||||
payload_.recv_trailing_metadata.recv_trailing_metadata = |
||||
&recv_trailing_metadata_; |
||||
payload_.recv_trailing_metadata.collect_stats = &collect_stats_; |
||||
// This callback signals the end of the call, so it relies on the
|
||||
// initial ref instead of taking a new ref. When it's invoked, the
|
||||
// initial ref is released.
|
||||
payload_.recv_trailing_metadata.recv_trailing_metadata_ready = |
||||
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, |
||||
RecvTrailingMetadataReady, this, |
||||
grpc_schedule_on_exec_ctx); |
||||
recv_trailing_metadata_batch_.recv_trailing_metadata = true; |
||||
// Start recv_trailing_metadata batch.
|
||||
StartBatch(&recv_trailing_metadata_batch_); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::StartBatchInCallCombiner( |
||||
void* arg, grpc_error_handle /*error*/) { |
||||
auto* batch = static_cast<grpc_transport_stream_op_batch*>(arg); |
||||
auto* call = static_cast<SubchannelCall*>(batch->handler_private.extra_arg); |
||||
call->StartTransportStreamOpBatch(batch); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::StartBatch( |
||||
grpc_transport_stream_op_batch* batch) { |
||||
batch->handler_private.extra_arg = call_; |
||||
GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, |
||||
batch, grpc_schedule_on_exec_ctx); |
||||
GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure, |
||||
GRPC_ERROR_NONE, "start_subchannel_batch"); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::AfterCallStackDestruction( |
||||
void* arg, grpc_error_handle /*error*/) { |
||||
auto* self = static_cast<SubchannelStreamClient::CallState*>(arg); |
||||
delete self; |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::OnCancelComplete( |
||||
void* arg, grpc_error_handle /*error*/) { |
||||
auto* self = static_cast<SubchannelStreamClient::CallState*>(arg); |
||||
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel"); |
||||
self->call_->Unref(DEBUG_LOCATION, "cancel"); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::StartCancel( |
||||
void* arg, grpc_error_handle /*error*/) { |
||||
auto* self = static_cast<SubchannelStreamClient::CallState*>(arg); |
||||
auto* batch = grpc_make_transport_stream_op( |
||||
GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx)); |
||||
batch->cancel_stream = true; |
||||
batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; |
||||
self->call_->StartTransportStreamOpBatch(batch); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::Cancel() { |
||||
bool expected = false; |
||||
if (cancelled_.compare_exchange_strong(expected, true, |
||||
std::memory_order_acq_rel, |
||||
std::memory_order_acquire)) { |
||||
call_->Ref(DEBUG_LOCATION, "cancel").release(); |
||||
GRPC_CALL_COMBINER_START( |
||||
&call_combiner_, |
||||
GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx), |
||||
GRPC_ERROR_NONE, "health_cancel"); |
||||
} |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::OnComplete( |
||||
void* arg, grpc_error_handle /*error*/) { |
||||
auto* self = static_cast<SubchannelStreamClient::CallState*>(arg); |
||||
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete"); |
||||
self->send_initial_metadata_.Clear(); |
||||
self->send_trailing_metadata_.Clear(); |
||||
self->call_->Unref(DEBUG_LOCATION, "on_complete"); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::RecvInitialMetadataReady( |
||||
void* arg, grpc_error_handle /*error*/) { |
||||
auto* self = static_cast<SubchannelStreamClient::CallState*>(arg); |
||||
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready"); |
||||
self->recv_initial_metadata_.Clear(); |
||||
self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready"); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::DoneReadingRecvMessage( |
||||
grpc_error_handle error) { |
||||
recv_message_.reset(); |
||||
if (error != GRPC_ERROR_NONE) { |
||||
GRPC_ERROR_UNREF(error); |
||||
Cancel(); |
||||
grpc_slice_buffer_destroy_internal(&recv_message_buffer_); |
||||
call_->Unref(DEBUG_LOCATION, "recv_message_ready"); |
||||
return; |
||||
} |
||||
// Concatenate the slices to form a single string.
|
||||
std::unique_ptr<uint8_t> recv_message_deleter; |
||||
uint8_t* recv_message; |
||||
if (recv_message_buffer_.count == 1) { |
||||
recv_message = GRPC_SLICE_START_PTR(recv_message_buffer_.slices[0]); |
||||
} else { |
||||
recv_message = |
||||
static_cast<uint8_t*>(gpr_malloc(recv_message_buffer_.length)); |
||||
recv_message_deleter.reset(recv_message); |
||||
size_t offset = 0; |
||||
for (size_t i = 0; i < recv_message_buffer_.count; ++i) { |
||||
memcpy(recv_message + offset, |
||||
GRPC_SLICE_START_PTR(recv_message_buffer_.slices[i]), |
||||
GRPC_SLICE_LENGTH(recv_message_buffer_.slices[i])); |
||||
offset += GRPC_SLICE_LENGTH(recv_message_buffer_.slices[i]); |
||||
} |
||||
} |
||||
// Report payload.
|
||||
{ |
||||
MutexLock lock(&subchannel_stream_client_->mu_); |
||||
if (subchannel_stream_client_->event_handler_ != nullptr) { |
||||
subchannel_stream_client_->event_handler_->RecvMessageReadyLocked( |
||||
subchannel_stream_client_.get(), |
||||
reinterpret_cast<char*>(recv_message), recv_message_buffer_.length); |
||||
} |
||||
} |
||||
seen_response_.store(true, std::memory_order_release); |
||||
grpc_slice_buffer_destroy_internal(&recv_message_buffer_); |
||||
// Start another recv_message batch.
|
||||
// This re-uses the ref we're holding.
|
||||
// Note: Can't just reuse batch_ here, since we don't know that all
|
||||
// callbacks from the original batch have completed yet.
|
||||
recv_message_batch_.payload = &payload_; |
||||
payload_.recv_message.recv_message = &recv_message_; |
||||
payload_.recv_message.call_failed_before_recv_message = nullptr; |
||||
payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT( |
||||
&recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx); |
||||
recv_message_batch_.recv_message = true; |
||||
StartBatch(&recv_message_batch_); |
||||
} |
||||
|
||||
grpc_error_handle |
||||
SubchannelStreamClient::CallState::PullSliceFromRecvMessage() { |
||||
grpc_slice slice; |
||||
grpc_error_handle error = recv_message_->Pull(&slice); |
||||
if (error == GRPC_ERROR_NONE) { |
||||
grpc_slice_buffer_add(&recv_message_buffer_, slice); |
||||
} |
||||
return error; |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::ContinueReadingRecvMessage() { |
||||
while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) { |
||||
grpc_error_handle error = PullSliceFromRecvMessage(); |
||||
if (error != GRPC_ERROR_NONE) { |
||||
DoneReadingRecvMessage(error); |
||||
return; |
||||
} |
||||
if (recv_message_buffer_.length == recv_message_->length()) { |
||||
DoneReadingRecvMessage(GRPC_ERROR_NONE); |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::OnByteStreamNext( |
||||
void* arg, grpc_error_handle error) { |
||||
auto* self = static_cast<SubchannelStreamClient::CallState*>(arg); |
||||
if (error != GRPC_ERROR_NONE) { |
||||
self->DoneReadingRecvMessage(GRPC_ERROR_REF(error)); |
||||
return; |
||||
} |
||||
error = self->PullSliceFromRecvMessage(); |
||||
if (error != GRPC_ERROR_NONE) { |
||||
self->DoneReadingRecvMessage(error); |
||||
return; |
||||
} |
||||
if (self->recv_message_buffer_.length == self->recv_message_->length()) { |
||||
self->DoneReadingRecvMessage(GRPC_ERROR_NONE); |
||||
} else { |
||||
self->ContinueReadingRecvMessage(); |
||||
} |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::RecvMessageReady( |
||||
void* arg, grpc_error_handle /*error*/) { |
||||
auto* self = static_cast<SubchannelStreamClient::CallState*>(arg); |
||||
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready"); |
||||
if (self->recv_message_ == nullptr) { |
||||
self->call_->Unref(DEBUG_LOCATION, "recv_message_ready"); |
||||
return; |
||||
} |
||||
grpc_slice_buffer_init(&self->recv_message_buffer_); |
||||
GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self, |
||||
grpc_schedule_on_exec_ctx); |
||||
self->ContinueReadingRecvMessage(); |
||||
// Ref will continue to be held until we finish draining the byte stream.
|
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::RecvTrailingMetadataReady( |
||||
void* arg, grpc_error_handle error) { |
||||
auto* self = static_cast<SubchannelStreamClient::CallState*>(arg); |
||||
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, |
||||
"recv_trailing_metadata_ready"); |
||||
// Get call status.
|
||||
grpc_status_code status = |
||||
self->recv_trailing_metadata_.get(GrpcStatusMetadata()) |
||||
.value_or(GRPC_STATUS_UNKNOWN); |
||||
if (error != GRPC_ERROR_NONE) { |
||||
grpc_error_get_status(error, Timestamp::InfFuture(), &status, |
||||
nullptr /* slice */, nullptr /* http_error */, |
||||
nullptr /* error_string */); |
||||
} |
||||
if (GPR_UNLIKELY(self->subchannel_stream_client_->tracer_ != nullptr)) { |
||||
gpr_log(GPR_INFO, |
||||
"%s %p: SubchannelStreamClient CallState %p: health watch failed " |
||||
"with status %d", |
||||
self->subchannel_stream_client_->tracer_, |
||||
self->subchannel_stream_client_.get(), self, status); |
||||
} |
||||
// Clean up.
|
||||
self->recv_trailing_metadata_.Clear(); |
||||
// Report call end.
|
||||
MutexLock lock(&self->subchannel_stream_client_->mu_); |
||||
if (self->subchannel_stream_client_->event_handler_ != nullptr) { |
||||
self->subchannel_stream_client_->event_handler_ |
||||
->RecvTrailingMetadataReadyLocked(self->subchannel_stream_client_.get(), |
||||
status); |
||||
} |
||||
// For status UNIMPLEMENTED, give up and assume always healthy.
|
||||
self->CallEndedLocked(/*retry=*/status != GRPC_STATUS_UNIMPLEMENTED); |
||||
} |
||||
|
||||
void SubchannelStreamClient::CallState::CallEndedLocked(bool retry) { |
||||
// If this CallState is still in use, this call ended because of a failure,
|
||||
// so we need to stop using it and optionally create a new one.
|
||||
// Otherwise, we have deliberately ended this call, and no further action
|
||||
// is required.
|
||||
if (this == subchannel_stream_client_->call_state_.get()) { |
||||
subchannel_stream_client_->call_state_.reset(); |
||||
if (retry) { |
||||
GPR_ASSERT(subchannel_stream_client_->event_handler_ != nullptr); |
||||
if (seen_response_.load(std::memory_order_acquire)) { |
||||
// If the call fails after we've gotten a successful response, reset
|
||||
// the backoff and restart the call immediately.
|
||||
subchannel_stream_client_->retry_backoff_.Reset(); |
||||
subchannel_stream_client_->StartCallLocked(); |
||||
} else { |
||||
// If the call failed without receiving any messages, retry later.
|
||||
subchannel_stream_client_->StartRetryTimerLocked(); |
||||
} |
||||
} |
||||
} |
||||
// When the last ref to the call stack goes away, the CallState object
|
||||
// will be automatically destroyed.
|
||||
call_->Unref(DEBUG_LOCATION, "call_ended"); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,211 @@ |
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <atomic> |
||||
|
||||
#include "src/core/ext/filters/client_channel/subchannel.h" |
||||
#include "src/core/lib/backoff/backoff.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/iomgr/call_combiner.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/polling_entity.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/transport/byte_stream.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Represents a streaming call on a subchannel that should be maintained
|
||||
// open at all times.
|
||||
// If the call fails with UNIMPLEMENTED, no further attempts are made.
|
||||
// If the call fails with any other status (including OK), we retry the
|
||||
// call with appropriate backoff.
|
||||
// The backoff state is reset when we receive a message on a stream.
|
||||
//
|
||||
// Currently, this assumes server-side streaming, but it could be extended
|
||||
// to support full bidi streaming if there is a need in the future.
|
||||
class SubchannelStreamClient |
||||
: public InternallyRefCounted<SubchannelStreamClient> { |
||||
public: |
||||
// Interface implemented by caller. Thread safety is provided for the
|
||||
// implementation; only one method will be called by any thread at any
|
||||
// one time (including destruction).
|
||||
//
|
||||
// The address of the SubchannelStreamClient object is passed to most
|
||||
// methods for logging purposes.
|
||||
class CallEventHandler { |
||||
public: |
||||
virtual ~CallEventHandler() = default; |
||||
|
||||
// Returns the path for the streaming call.
|
||||
virtual Slice GetPathLocked() |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
||||
// Called when a new call attempt is being started.
|
||||
virtual void OnCallStartLocked(SubchannelStreamClient* client) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
||||
// Called when a previous call attempt has failed and the retry
|
||||
// timer is started before the next attempt.
|
||||
virtual void OnRetryTimerStartLocked(SubchannelStreamClient* client) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
||||
// Returns the message payload to send from the client.
|
||||
virtual grpc_slice EncodeSendMessageLocked() |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
||||
// Called whenever a message is received from the server.
|
||||
virtual void RecvMessageReadyLocked(SubchannelStreamClient* client, |
||||
char* message, size_t size) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
||||
// Called when a stream fails.
|
||||
virtual void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client, |
||||
grpc_status_code status) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; |
||||
}; |
||||
|
||||
// If tracer is non-null, it enables trace logging, with the specified
|
||||
// string being the first part of the log message.
|
||||
// Does not take ownership of interested_parties; the caller is responsible
|
||||
// for ensuring that it will outlive the SubchannelStreamClient.
|
||||
SubchannelStreamClient( |
||||
RefCountedPtr<ConnectedSubchannel> connected_subchannel, |
||||
grpc_pollset_set* interested_parties, |
||||
std::unique_ptr<CallEventHandler> event_handler, const char* tracer); |
||||
|
||||
~SubchannelStreamClient() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
private: |
||||
// Contains a call to the backend and all the data related to the call.
|
||||
class CallState : public Orphanable { |
||||
public: |
||||
CallState(RefCountedPtr<SubchannelStreamClient> client, |
||||
grpc_pollset_set* interested_parties); |
||||
~CallState() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
void StartCallLocked() |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_); |
||||
|
||||
private: |
||||
void Cancel(); |
||||
|
||||
void StartBatch(grpc_transport_stream_op_batch* batch); |
||||
static void StartBatchInCallCombiner(void* arg, grpc_error_handle error); |
||||
|
||||
void CallEndedLocked(bool retry) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&subchannel_stream_client_->mu_); |
||||
|
||||
static void OnComplete(void* arg, grpc_error_handle error); |
||||
static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); |
||||
static void RecvMessageReady(void* arg, grpc_error_handle error); |
||||
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); |
||||
static void StartCancel(void* arg, grpc_error_handle error); |
||||
static void OnCancelComplete(void* arg, grpc_error_handle error); |
||||
|
||||
static void OnByteStreamNext(void* arg, grpc_error_handle error); |
||||
void ContinueReadingRecvMessage(); |
||||
grpc_error_handle PullSliceFromRecvMessage(); |
||||
void DoneReadingRecvMessage(grpc_error_handle error); |
||||
|
||||
static void AfterCallStackDestruction(void* arg, grpc_error_handle error); |
||||
|
||||
RefCountedPtr<SubchannelStreamClient> subchannel_stream_client_; |
||||
grpc_polling_entity pollent_; |
||||
|
||||
ScopedArenaPtr arena_; |
||||
CallCombiner call_combiner_; |
||||
grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; |
||||
|
||||
// The streaming call to the backend. Always non-null.
|
||||
// Refs are tracked manually; when the last ref is released, the
|
||||
// CallState object will be automatically destroyed.
|
||||
SubchannelCall* call_; |
||||
|
||||
grpc_transport_stream_op_batch_payload payload_; |
||||
grpc_transport_stream_op_batch batch_; |
||||
grpc_transport_stream_op_batch recv_message_batch_; |
||||
grpc_transport_stream_op_batch recv_trailing_metadata_batch_; |
||||
|
||||
grpc_closure on_complete_; |
||||
|
||||
// send_initial_metadata
|
||||
grpc_metadata_batch send_initial_metadata_; |
||||
|
||||
// send_message
|
||||
absl::optional<SliceBufferByteStream> send_message_; |
||||
|
||||
// send_trailing_metadata
|
||||
grpc_metadata_batch send_trailing_metadata_; |
||||
|
||||
// recv_initial_metadata
|
||||
grpc_metadata_batch recv_initial_metadata_; |
||||
grpc_closure recv_initial_metadata_ready_; |
||||
|
||||
// recv_message
|
||||
OrphanablePtr<ByteStream> recv_message_; |
||||
grpc_closure recv_message_ready_; |
||||
grpc_slice_buffer recv_message_buffer_; |
||||
std::atomic<bool> seen_response_{false}; |
||||
|
||||
// True if the cancel_stream batch has been started.
|
||||
std::atomic<bool> cancelled_{false}; |
||||
|
||||
// recv_trailing_metadata
|
||||
grpc_metadata_batch recv_trailing_metadata_; |
||||
grpc_transport_stream_stats collect_stats_; |
||||
grpc_closure recv_trailing_metadata_ready_; |
||||
|
||||
// Closure for call stack destruction.
|
||||
grpc_closure after_call_stack_destruction_; |
||||
}; |
||||
|
||||
void StartCall(); |
||||
void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
||||
|
||||
void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); |
||||
static void OnRetryTimer(void* arg, grpc_error_handle error); |
||||
|
||||
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; |
||||
grpc_pollset_set* interested_parties_; // Do not own.
|
||||
const char* tracer_; |
||||
MemoryAllocator call_allocator_; |
||||
|
||||
Mutex mu_; |
||||
std::unique_ptr<CallEventHandler> event_handler_ ABSL_GUARDED_BY(mu_); |
||||
|
||||
// The data associated with the current health check call. It holds a ref
|
||||
// to this SubchannelStreamClient object.
|
||||
OrphanablePtr<CallState> call_state_ ABSL_GUARDED_BY(mu_); |
||||
|
||||
// Call retry state.
|
||||
BackOff retry_backoff_ ABSL_GUARDED_BY(mu_); |
||||
grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_); |
||||
grpc_closure retry_timer_callback_ ABSL_GUARDED_BY(mu_); |
||||
bool retry_timer_callback_pending_ ABSL_GUARDED_BY(mu_) = false; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
|
Loading…
Reference in new issue