diff --git a/BUILD b/BUILD
index dba48fbbaa1..57a812031ba 100644
--- a/BUILD
+++ b/BUILD
@@ -2438,6 +2438,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc",
"src/core/ext/filters/client_channel/subchannel.cc",
"src/core/ext/filters/client_channel/subchannel_pool_interface.cc",
+ "src/core/ext/filters/client_channel/subchannel_stream_client.cc",
],
hdrs = [
"src/core/ext/filters/client_channel/backend_metric.h",
@@ -2466,6 +2467,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/subchannel.h",
"src/core/ext/filters/client_channel/subchannel_interface.h",
"src/core/ext/filters/client_channel/subchannel_pool_interface.h",
+ "src/core/ext/filters/client_channel/subchannel_stream_client.h",
],
external_deps = [
"absl/container:inlined_vector",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3fbb60ebdd2..58f1a6a700f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1493,6 +1493,7 @@ add_library(grpc
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
src/core/ext/filters/client_channel/subchannel.cc
src/core/ext/filters/client_channel/subchannel_pool_interface.cc
+ src/core/ext/filters/client_channel/subchannel_stream_client.cc
src/core/ext/filters/client_idle/client_idle_filter.cc
src/core/ext/filters/client_idle/idle_filter_state.cc
src/core/ext/filters/deadline/deadline_filter.cc
@@ -2420,6 +2421,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
src/core/ext/filters/client_channel/subchannel.cc
src/core/ext/filters/client_channel/subchannel_pool_interface.cc
+ src/core/ext/filters/client_channel/subchannel_stream_client.cc
src/core/ext/filters/client_idle/client_idle_filter.cc
src/core/ext/filters/client_idle/idle_filter_state.cc
src/core/ext/filters/deadline/deadline_filter.cc
diff --git a/Makefile b/Makefile
index 9f8a71a4fb1..25b4b4e6eb0 100644
--- a/Makefile
+++ b/Makefile
@@ -1080,6 +1080,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
+ src/core/ext/filters/client_channel/subchannel_stream_client.cc \
src/core/ext/filters/client_idle/client_idle_filter.cc \
src/core/ext/filters/client_idle/idle_filter_state.cc \
src/core/ext/filters/deadline/deadline_filter.cc \
@@ -1856,6 +1857,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
+ src/core/ext/filters/client_channel/subchannel_stream_client.cc \
src/core/ext/filters/client_idle/client_idle_filter.cc \
src/core/ext/filters/client_idle/idle_filter_state.cc \
src/core/ext/filters/deadline/deadline_filter.cc \
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 48fe8799e17..0d2bb6f05a9 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -354,6 +354,7 @@ libs:
- src/core/ext/filters/client_channel/subchannel.h
- src/core/ext/filters/client_channel/subchannel_interface.h
- src/core/ext/filters/client_channel/subchannel_pool_interface.h
+ - src/core/ext/filters/client_channel/subchannel_stream_client.h
- src/core/ext/filters/client_idle/idle_filter_state.h
- src/core/ext/filters/deadline/deadline_filter.h
- src/core/ext/filters/fault_injection/fault_injection_filter.h
@@ -1021,6 +1022,7 @@ libs:
- src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
- src/core/ext/filters/client_channel/subchannel.cc
- src/core/ext/filters/client_channel/subchannel_pool_interface.cc
+ - src/core/ext/filters/client_channel/subchannel_stream_client.cc
- src/core/ext/filters/client_idle/client_idle_filter.cc
- src/core/ext/filters/client_idle/idle_filter_state.cc
- src/core/ext/filters/deadline/deadline_filter.cc
@@ -1814,6 +1816,7 @@ libs:
- src/core/ext/filters/client_channel/subchannel.h
- src/core/ext/filters/client_channel/subchannel_interface.h
- src/core/ext/filters/client_channel/subchannel_pool_interface.h
+ - src/core/ext/filters/client_channel/subchannel_stream_client.h
- src/core/ext/filters/client_idle/idle_filter_state.h
- src/core/ext/filters/deadline/deadline_filter.h
- src/core/ext/filters/fault_injection/fault_injection_filter.h
@@ -2132,6 +2135,7 @@ libs:
- src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
- src/core/ext/filters/client_channel/subchannel.cc
- src/core/ext/filters/client_channel/subchannel_pool_interface.cc
+ - src/core/ext/filters/client_channel/subchannel_stream_client.cc
- src/core/ext/filters/client_idle/client_idle_filter.cc
- src/core/ext/filters/client_idle/idle_filter_state.cc
- src/core/ext/filters/deadline/deadline_filter.cc
diff --git a/config.m4 b/config.m4
index 3b15ede67e8..705bb715137 100644
--- a/config.m4
+++ b/config.m4
@@ -98,6 +98,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
+ src/core/ext/filters/client_channel/subchannel_stream_client.cc \
src/core/ext/filters/client_idle/client_idle_filter.cc \
src/core/ext/filters/client_idle/idle_filter_state.cc \
src/core/ext/filters/deadline/deadline_filter.cc \
diff --git a/config.w32 b/config.w32
index 6a5085c9fbd..18cfe1aaf74 100644
--- a/config.w32
+++ b/config.w32
@@ -64,6 +64,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\service_config_channel_arg_filter.cc " +
"src\\core\\ext\\filters\\client_channel\\subchannel.cc " +
"src\\core\\ext\\filters\\client_channel\\subchannel_pool_interface.cc " +
+ "src\\core\\ext\\filters\\client_channel\\subchannel_stream_client.cc " +
"src\\core\\ext\\filters\\client_idle\\client_idle_filter.cc " +
"src\\core\\ext\\filters\\client_idle\\idle_filter_state.cc " +
"src\\core\\ext\\filters\\deadline\\deadline_filter.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 85f5127fec8..15ed9eca191 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -256,6 +256,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/subchannel.h',
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
+ 'src/core/ext/filters/client_channel/subchannel_stream_client.h',
'src/core/ext/filters/client_idle/idle_filter_state.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_filter.h',
@@ -1077,6 +1078,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/subchannel.h',
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
+ 'src/core/ext/filters/client_channel/subchannel_stream_client.h',
'src/core/ext/filters/client_idle/idle_filter_state.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_filter.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index ed49c6db102..470d41337bd 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -291,6 +291,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
+ 'src/core/ext/filters/client_channel/subchannel_stream_client.cc',
+ 'src/core/ext/filters/client_channel/subchannel_stream_client.h',
'src/core/ext/filters/client_idle/client_idle_filter.cc',
'src/core/ext/filters/client_idle/idle_filter_state.cc',
'src/core/ext/filters/client_idle/idle_filter_state.h',
@@ -1695,6 +1697,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/subchannel.h',
'src/core/ext/filters/client_channel/subchannel_interface.h',
'src/core/ext/filters/client_channel/subchannel_pool_interface.h',
+ 'src/core/ext/filters/client_channel/subchannel_stream_client.h',
'src/core/ext/filters/client_idle/idle_filter_state.h',
'src/core/ext/filters/deadline/deadline_filter.h',
'src/core/ext/filters/fault_injection/fault_injection_filter.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 6ee6c58294c..4416790add4 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -210,6 +210,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/subchannel_interface.h )
s.files += %w( src/core/ext/filters/client_channel/subchannel_pool_interface.cc )
s.files += %w( src/core/ext/filters/client_channel/subchannel_pool_interface.h )
+ s.files += %w( src/core/ext/filters/client_channel/subchannel_stream_client.cc )
+ s.files += %w( src/core/ext/filters/client_channel/subchannel_stream_client.h )
s.files += %w( src/core/ext/filters/client_idle/client_idle_filter.cc )
s.files += %w( src/core/ext/filters/client_idle/idle_filter_state.cc )
s.files += %w( src/core/ext/filters/client_idle/idle_filter_state.h )
diff --git a/grpc.gyp b/grpc.gyp
index db4afa16857..d67e931b15d 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -428,6 +428,7 @@
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',
+ 'src/core/ext/filters/client_channel/subchannel_stream_client.cc',
'src/core/ext/filters/client_idle/client_idle_filter.cc',
'src/core/ext/filters/client_idle/idle_filter_state.cc',
'src/core/ext/filters/deadline/deadline_filter.cc',
@@ -1175,6 +1176,7 @@
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',
+ 'src/core/ext/filters/client_channel/subchannel_stream_client.cc',
'src/core/ext/filters/client_idle/client_idle_filter.cc',
'src/core/ext/filters/client_idle/idle_filter_state.cc',
'src/core/ext/filters/deadline/deadline_filter.cc',
diff --git a/package.xml b/package.xml
index d064ac37ba6..b0f233e99d1 100644
--- a/package.xml
+++ b/package.xml
@@ -190,6 +190,8 @@
+
+
diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc
index 82aac56cfc6..5cf197563ca 100644
--- a/src/core/ext/filters/client_channel/health/health_check_client.cc
+++ b/src/core/ext/filters/client_channel/health/health_check_client.cc
@@ -1,20 +1,18 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
+//
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
#include
@@ -43,578 +41,130 @@ namespace grpc_core {
TraceFlag grpc_health_check_client_trace(false, "health_check_client");
-//
-// HealthCheckClient
-//
-
-HealthCheckClient::HealthCheckClient(
- std::string service_name,
- RefCountedPtr connected_subchannel,
- grpc_pollset_set* interested_parties,
- RefCountedPtr channelz_node,
- RefCountedPtr watcher)
- : InternallyRefCounted(
- GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
- ? "HealthCheckClient"
- : nullptr),
- service_name_(std::move(service_name)),
- connected_subchannel_(std::move(connected_subchannel)),
- interested_parties_(interested_parties),
- channelz_node_(std::move(channelz_node)),
- call_allocator_(
- ResourceQuotaFromChannelArgs(connected_subchannel_->args())
- ->memory_quota()
- ->CreateMemoryAllocator(service_name_)),
- watcher_(std::move(watcher)),
- retry_backoff_(
- BackOff::Options()
- .set_initial_backoff(Duration::Seconds(
- HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS))
- .set_multiplier(HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER)
- .set_jitter(HEALTH_CHECK_RECONNECT_JITTER)
- .set_max_backoff(Duration::Seconds(
- HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS))) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
- gpr_log(GPR_INFO, "created HealthCheckClient %p", this);
- }
- GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
- grpc_schedule_on_exec_ctx);
- StartCall();
-}
-
-HealthCheckClient::~HealthCheckClient() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
- gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
- }
-}
-
-void HealthCheckClient::SetHealthStatus(grpc_connectivity_state state,
- const char* reason) {
- MutexLock lock(&mu_);
- SetHealthStatusLocked(state, reason);
-}
-
-void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state,
- const char* reason) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
- gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this,
- ConnectivityStateName(state), reason);
- }
- if (watcher_ != nullptr) {
- watcher_->Notify(state,
- state == GRPC_CHANNEL_TRANSIENT_FAILURE
- ? absl::Status(absl::StatusCode::kUnavailable, reason)
- : absl::Status());
- }
-}
-
-void HealthCheckClient::Orphan() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
- gpr_log(GPR_INFO, "HealthCheckClient %p: shutting down", this);
- }
- {
- MutexLock lock(&mu_);
- shutting_down_ = true;
- watcher_.reset();
- call_state_.reset();
- if (retry_timer_callback_pending_) {
- grpc_timer_cancel(&retry_timer_);
- }
- }
- Unref(DEBUG_LOCATION, "orphan");
-}
-
-void HealthCheckClient::StartCall() {
- MutexLock lock(&mu_);
- StartCallLocked();
-}
-
-void HealthCheckClient::StartCallLocked() {
- if (shutting_down_) return;
- GPR_ASSERT(call_state_ == nullptr);
- SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, "starting health watch");
- call_state_ = MakeOrphanable(Ref(), interested_parties_);
- if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
- gpr_log(GPR_INFO, "HealthCheckClient %p: created CallState %p", this,
- call_state_.get());
- }
- call_state_->StartCall();
-}
-
-void HealthCheckClient::StartRetryTimerLocked() {
- SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
- "health check call failed; will retry after backoff");
- Timestamp next_try = retry_backoff_.NextAttemptTime();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
- gpr_log(GPR_INFO, "HealthCheckClient %p: health check call lost...", this);
- Duration timeout = next_try - ExecCtx::Get()->Now();
- if (timeout > Duration::Zero()) {
- gpr_log(GPR_INFO,
- "HealthCheckClient %p: ... will retry in %" PRId64 "ms.", this,
- timeout.millis());
- } else {
- gpr_log(GPR_INFO, "HealthCheckClient %p: ... retrying immediately.",
- this);
- }
- }
- // Ref for callback, tracked manually.
- Ref(DEBUG_LOCATION, "health_retry_timer").release();
- retry_timer_callback_pending_ = true;
- grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
-}
-
-void HealthCheckClient::OnRetryTimer(void* arg, grpc_error_handle error) {
- HealthCheckClient* self = static_cast(arg);
- {
- MutexLock lock(&self->mu_);
- self->retry_timer_callback_pending_ = false;
- if (!self->shutting_down_ && error == GRPC_ERROR_NONE &&
- self->call_state_ == nullptr) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
- gpr_log(GPR_INFO, "HealthCheckClient %p: restarting health check call",
- self);
- }
- self->StartCallLocked();
- }
- }
- self->Unref(DEBUG_LOCATION, "health_retry_timer");
-}
-
-//
-// protobuf helpers
-//
-
namespace {
-void EncodeRequest(const std::string& service_name,
- ManualConstructor* send_message) {
- upb::Arena arena;
- grpc_health_v1_HealthCheckRequest* request_struct =
- grpc_health_v1_HealthCheckRequest_new(arena.ptr());
- grpc_health_v1_HealthCheckRequest_set_service(
- request_struct,
- upb_StringView_FromDataAndSize(service_name.data(), service_name.size()));
- size_t buf_length;
- char* buf = grpc_health_v1_HealthCheckRequest_serialize(
- request_struct, arena.ptr(), &buf_length);
- grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
- memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
- grpc_slice_buffer slice_buffer;
- grpc_slice_buffer_init(&slice_buffer);
- grpc_slice_buffer_add(&slice_buffer, request_slice);
- send_message->Init(&slice_buffer, 0);
- grpc_slice_buffer_destroy_internal(&slice_buffer);
-}
-
// Returns true if healthy.
-// If there was an error parsing the response, sets *error and returns false.
-bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error_handle* error) {
+absl::StatusOr DecodeResponse(char* message, size_t size) {
// If message is empty, assume unhealthy.
- if (slice_buffer->length == 0) {
- *error =
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("health check response was empty");
- return false;
- }
- // Concatenate the slices to form a single string.
- std::unique_ptr recv_message_deleter;
- uint8_t* recv_message;
- if (slice_buffer->count == 1) {
- recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
- } else {
- recv_message = static_cast(gpr_malloc(slice_buffer->length));
- recv_message_deleter.reset(recv_message);
- size_t offset = 0;
- for (size_t i = 0; i < slice_buffer->count; ++i) {
- memcpy(recv_message + offset,
- GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
- GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
- offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
- }
+ if (size == 0) {
+ return absl::InvalidArgumentError("health check response was empty");
}
// Deserialize message.
upb::Arena arena;
- grpc_health_v1_HealthCheckResponse* response_struct =
- grpc_health_v1_HealthCheckResponse_parse(
- reinterpret_cast(recv_message), slice_buffer->length,
- arena.ptr());
+ auto* response_struct = grpc_health_v1_HealthCheckResponse_parse(
+ reinterpret_cast(message), size, arena.ptr());
if (response_struct == nullptr) {
// Can't parse message; assume unhealthy.
- *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "cannot parse health check response");
- return false;
+ return absl::InvalidArgumentError("cannot parse health check response");
}
int32_t status = grpc_health_v1_HealthCheckResponse_status(response_struct);
return status == grpc_health_v1_HealthCheckResponse_SERVING;
}
-} // namespace
-
-//
-// HealthCheckClient::CallState
-//
-
-HealthCheckClient::CallState::CallState(
- RefCountedPtr health_check_client,
- grpc_pollset_set* interested_parties)
- : health_check_client_(std::move(health_check_client)),
- pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
- arena_(Arena::Create(health_check_client_->connected_subchannel_
- ->GetInitialCallSizeEstimate(),
- &health_check_client_->call_allocator_)),
- payload_(context_),
- send_initial_metadata_(arena_.get()),
- send_trailing_metadata_(arena_.get()),
- recv_initial_metadata_(arena_.get()),
- recv_trailing_metadata_(arena_.get()) {}
-
-HealthCheckClient::CallState::~CallState() {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
- gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p",
- health_check_client_.get(), this);
- }
- for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
- if (context_[i].destroy != nullptr) {
- context_[i].destroy(context_[i].value);
+class HealthStreamEventHandler
+ : public SubchannelStreamClient::CallEventHandler {
+ public:
+ HealthStreamEventHandler(
+ std::string service_name,
+ RefCountedPtr channelz_node,
+ RefCountedPtr watcher)
+ : service_name_(std::move(service_name)),
+ channelz_node_(std::move(channelz_node)),
+ watcher_(std::move(watcher)) {}
+
+ Slice GetPathLocked() override {
+ return Slice::FromStaticString("/grpc.health.v1.Health/Watch");
+ }
+
+ void OnCallStartLocked(SubchannelStreamClient* client) override {
+ SetHealthStatusLocked(client, GRPC_CHANNEL_CONNECTING,
+ "starting health watch");
+ }
+
+ void OnRetryTimerStartLocked(SubchannelStreamClient* client) override {
+ SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "health check call failed; will retry after backoff");
+ }
+
+ grpc_slice EncodeSendMessageLocked() override {
+ upb::Arena arena;
+ grpc_health_v1_HealthCheckRequest* request_struct =
+ grpc_health_v1_HealthCheckRequest_new(arena.ptr());
+ grpc_health_v1_HealthCheckRequest_set_service(
+ request_struct, upb_StringView_FromDataAndSize(service_name_.data(),
+ service_name_.size()));
+ size_t buf_length;
+ char* buf = grpc_health_v1_HealthCheckRequest_serialize(
+ request_struct, arena.ptr(), &buf_length);
+ grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
+ memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
+ return request_slice;
+ }
+
+ void RecvMessageReadyLocked(SubchannelStreamClient* client, char* message,
+ size_t size) override {
+ auto healthy = DecodeResponse(message, size);
+ if (!healthy.ok()) {
+ SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ healthy.status().ToString().c_str());
+ } else if (!*healthy) {
+ SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "backend unhealthy");
+ } else {
+ SetHealthStatusLocked(client, GRPC_CHANNEL_READY, "OK");
}
}
- // Unset the call combiner cancellation closure. This has the
- // effect of scheduling the previously set cancellation closure, if
- // any, so that it can release any internal references it may be
- // holding to the call stack.
- call_combiner_.SetNotifyOnCancel(nullptr);
-}
-
-void HealthCheckClient::CallState::Orphan() {
- call_combiner_.Cancel(GRPC_ERROR_CANCELLED);
- Cancel();
-}
-
-void HealthCheckClient::CallState::StartCall() {
- SubchannelCall::Args args = {
- health_check_client_->connected_subchannel_,
- &pollent_,
- Slice::FromStaticString("/grpc.health.v1.Health/Watch"),
- gpr_get_cycle_counter(), // start_time
- Timestamp::InfFuture(), // deadline
- arena_.get(),
- context_,
- &call_combiner_,
- };
- grpc_error_handle error = GRPC_ERROR_NONE;
- call_ = SubchannelCall::Create(std::move(args), &error).release();
- // Register after-destruction callback.
- GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
- this, grpc_schedule_on_exec_ctx);
- call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
- // Check if creation failed.
- if (error != GRPC_ERROR_NONE) {
- gpr_log(GPR_ERROR,
- "HealthCheckClient %p CallState %p: error creating health "
- "checking call on subchannel (%s); will retry",
- health_check_client_.get(), this,
- grpc_error_std_string(error).c_str());
- GRPC_ERROR_UNREF(error);
- CallEndedLocked(/*retry=*/true);
- return;
- }
- // Initialize payload and batch.
- payload_.context = context_;
- batch_.payload = &payload_;
- // on_complete callback takes ref, handled manually.
- call_->Ref(DEBUG_LOCATION, "on_complete").release();
- batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
- grpc_schedule_on_exec_ctx);
- // Add send_initial_metadata op.
- send_initial_metadata_.Set(
- HttpPathMetadata(),
- Slice::FromStaticString("/grpc.health.v1.Health/Watch"));
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- payload_.send_initial_metadata.send_initial_metadata =
- &send_initial_metadata_;
- payload_.send_initial_metadata.send_initial_metadata_flags = 0;
- payload_.send_initial_metadata.peer_string = nullptr;
- batch_.send_initial_metadata = true;
- // Add send_message op.
- EncodeRequest(health_check_client_->service_name_, &send_message_);
- payload_.send_message.send_message.reset(send_message_.get());
- batch_.send_message = true;
- // Add send_trailing_metadata op.
- payload_.send_trailing_metadata.send_trailing_metadata =
- &send_trailing_metadata_;
- batch_.send_trailing_metadata = true;
- // Add recv_initial_metadata op.
- payload_.recv_initial_metadata.recv_initial_metadata =
- &recv_initial_metadata_;
- payload_.recv_initial_metadata.recv_flags = nullptr;
- payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
- payload_.recv_initial_metadata.peer_string = nullptr;
- // recv_initial_metadata_ready callback takes ref, handled manually.
- call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
- payload_.recv_initial_metadata.recv_initial_metadata_ready =
- GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
- this, grpc_schedule_on_exec_ctx);
- batch_.recv_initial_metadata = true;
- // Add recv_message op.
- payload_.recv_message.recv_message = &recv_message_;
- payload_.recv_message.call_failed_before_recv_message = nullptr;
- // recv_message callback takes ref, handled manually.
- call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
- payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
- &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
- batch_.recv_message = true;
- // Start batch.
- StartBatch(&batch_);
- // Initialize recv_trailing_metadata batch.
- recv_trailing_metadata_batch_.payload = &payload_;
- // Add recv_trailing_metadata op.
- payload_.recv_trailing_metadata.recv_trailing_metadata =
- &recv_trailing_metadata_;
- payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
- // This callback signals the end of the call, so it relies on the
- // initial ref instead of taking a new ref. When it's invoked, the
- // initial ref is released.
- payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
- GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
- RecvTrailingMetadataReady, this,
- grpc_schedule_on_exec_ctx);
- recv_trailing_metadata_batch_.recv_trailing_metadata = true;
- // Start recv_trailing_metadata batch.
- StartBatch(&recv_trailing_metadata_batch_);
-}
-
-void HealthCheckClient::CallState::StartBatchInCallCombiner(
- void* arg, grpc_error_handle /*error*/) {
- grpc_transport_stream_op_batch* batch =
- static_cast(arg);
- SubchannelCall* call =
- static_cast(batch->handler_private.extra_arg);
- call->StartTransportStreamOpBatch(batch);
-}
-
-void HealthCheckClient::CallState::StartBatch(
- grpc_transport_stream_op_batch* batch) {
- batch->handler_private.extra_arg = call_;
- GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
- batch, grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
- GRPC_ERROR_NONE, "start_subchannel_batch");
-}
-
-void HealthCheckClient::CallState::AfterCallStackDestruction(
- void* arg, grpc_error_handle /*error*/) {
- HealthCheckClient::CallState* self =
- static_cast(arg);
- delete self;
-}
-
-void HealthCheckClient::CallState::OnCancelComplete(
- void* arg, grpc_error_handle /*error*/) {
- HealthCheckClient::CallState* self =
- static_cast(arg);
- GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
- self->call_->Unref(DEBUG_LOCATION, "cancel");
-}
-
-void HealthCheckClient::CallState::StartCancel(void* arg,
- grpc_error_handle /*error*/) {
- HealthCheckClient::CallState* self =
- static_cast(arg);
- auto* batch = grpc_make_transport_stream_op(
- GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
- batch->cancel_stream = true;
- batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
- self->call_->StartTransportStreamOpBatch(batch);
-}
-
-void HealthCheckClient::CallState::Cancel() {
- bool expected = false;
- if (cancelled_.compare_exchange_strong(expected, true,
- std::memory_order_acq_rel,
- std::memory_order_acquire)) {
- call_->Ref(DEBUG_LOCATION, "cancel").release();
- GRPC_CALL_COMBINER_START(
- &call_combiner_,
- GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE, "health_cancel");
- }
-}
-
-void HealthCheckClient::CallState::OnComplete(void* arg,
- grpc_error_handle /*error*/) {
- HealthCheckClient::CallState* self =
- static_cast(arg);
- GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
- self->send_initial_metadata_.Clear();
- self->send_trailing_metadata_.Clear();
- self->call_->Unref(DEBUG_LOCATION, "on_complete");
-}
-void HealthCheckClient::CallState::RecvInitialMetadataReady(
- void* arg, grpc_error_handle /*error*/) {
- HealthCheckClient::CallState* self =
- static_cast(arg);
- GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
- self->recv_initial_metadata_.Clear();
- self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
-}
-
-void HealthCheckClient::CallState::DoneReadingRecvMessage(
- grpc_error_handle error) {
- recv_message_.reset();
- if (error != GRPC_ERROR_NONE) {
- GRPC_ERROR_UNREF(error);
- Cancel();
- grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
- call_->Unref(DEBUG_LOCATION, "recv_message_ready");
- return;
- }
- const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
- const grpc_connectivity_state state =
- healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
- health_check_client_->SetHealthStatus(
- state, error == GRPC_ERROR_NONE && !healthy
- ? "backend unhealthy"
- : grpc_error_std_string(error).c_str());
- seen_response_.store(true, std::memory_order_release);
- grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
- // Start another recv_message batch.
- // This re-uses the ref we're holding.
- // Note: Can't just reuse batch_ here, since we don't know that all
- // callbacks from the original batch have completed yet.
- recv_message_batch_.payload = &payload_;
- payload_.recv_message.recv_message = &recv_message_;
- payload_.recv_message.call_failed_before_recv_message = nullptr;
- payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
- &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
- recv_message_batch_.recv_message = true;
- StartBatch(&recv_message_batch_);
-}
-
-grpc_error_handle HealthCheckClient::CallState::PullSliceFromRecvMessage() {
- grpc_slice slice;
- grpc_error_handle error = recv_message_->Pull(&slice);
- if (error == GRPC_ERROR_NONE) {
- grpc_slice_buffer_add(&recv_message_buffer_, slice);
- }
- return error;
-}
-
-void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
- while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
- grpc_error_handle error = PullSliceFromRecvMessage();
- if (error != GRPC_ERROR_NONE) {
- DoneReadingRecvMessage(error);
- return;
- }
- if (recv_message_buffer_.length == recv_message_->length()) {
- DoneReadingRecvMessage(GRPC_ERROR_NONE);
- break;
+ void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
+ grpc_status_code status) override {
+ if (status == GRPC_STATUS_UNIMPLEMENTED) {
+ static const char kErrorMessage[] =
+ "health checking Watch method returned UNIMPLEMENTED; "
+ "disabling health checks but assuming server is healthy";
+ gpr_log(GPR_ERROR, kErrorMessage);
+ if (channelz_node_ != nullptr) {
+ channelz_node_->AddTraceEvent(
+ channelz::ChannelTrace::Error,
+ grpc_slice_from_static_string(kErrorMessage));
+ }
+ SetHealthStatusLocked(client, GRPC_CHANNEL_READY, kErrorMessage);
}
}
-}
-void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
- grpc_error_handle error) {
- HealthCheckClient::CallState* self =
- static_cast(arg);
- if (error != GRPC_ERROR_NONE) {
- self->DoneReadingRecvMessage(GRPC_ERROR_REF(error));
- return;
- }
- error = self->PullSliceFromRecvMessage();
- if (error != GRPC_ERROR_NONE) {
- self->DoneReadingRecvMessage(error);
- return;
- }
- if (self->recv_message_buffer_.length == self->recv_message_->length()) {
- self->DoneReadingRecvMessage(GRPC_ERROR_NONE);
- } else {
- self->ContinueReadingRecvMessage();
+ private:
+ void SetHealthStatusLocked(SubchannelStreamClient* client,
+ grpc_connectivity_state state,
+ const char* reason) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
+ gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s",
+ client, ConnectivityStateName(state), reason);
+ }
+ watcher_->Notify(state, state == GRPC_CHANNEL_TRANSIENT_FAILURE
+ ? absl::UnavailableError(reason)
+ : absl::Status());
}
-}
-void HealthCheckClient::CallState::RecvMessageReady(
- void* arg, grpc_error_handle /*error*/) {
- HealthCheckClient::CallState* self =
- static_cast(arg);
- GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
- if (self->recv_message_ == nullptr) {
- self->call_->Unref(DEBUG_LOCATION, "recv_message_ready");
- return;
- }
- grpc_slice_buffer_init(&self->recv_message_buffer_);
- GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self,
- grpc_schedule_on_exec_ctx);
- self->ContinueReadingRecvMessage();
- // Ref will continue to be held until we finish draining the byte stream.
-}
+ std::string service_name_;
+ RefCountedPtr channelz_node_;
+ RefCountedPtr watcher_;
+};
-void HealthCheckClient::CallState::RecvTrailingMetadataReady(
- void* arg, grpc_error_handle error) {
- HealthCheckClient::CallState* self =
- static_cast(arg);
- GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
- "recv_trailing_metadata_ready");
- // Get call status.
- grpc_status_code status =
- self->recv_trailing_metadata_.get(GrpcStatusMetadata())
- .value_or(GRPC_STATUS_UNKNOWN);
- if (error != GRPC_ERROR_NONE) {
- grpc_error_get_status(error, Timestamp::InfFuture(), &status,
- nullptr /* slice */, nullptr /* http_error */,
- nullptr /* error_string */);
- }
- if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
- gpr_log(GPR_INFO,
- "HealthCheckClient %p CallState %p: health watch failed with "
- "status %d",
- self->health_check_client_.get(), self, status);
- }
- // Clean up.
- self->recv_trailing_metadata_.Clear();
- // For status UNIMPLEMENTED, give up and assume always healthy.
- bool retry = true;
- if (status == GRPC_STATUS_UNIMPLEMENTED) {
- static const char kErrorMessage[] =
- "health checking Watch method returned UNIMPLEMENTED; "
- "disabling health checks but assuming server is healthy";
- gpr_log(GPR_ERROR, kErrorMessage);
- if (self->health_check_client_->channelz_node_ != nullptr) {
- self->health_check_client_->channelz_node_->AddTraceEvent(
- channelz::ChannelTrace::Error,
- grpc_slice_from_static_string(kErrorMessage));
- }
- self->health_check_client_->SetHealthStatus(GRPC_CHANNEL_READY,
- kErrorMessage);
- retry = false;
- }
- MutexLock lock(&self->health_check_client_->mu_);
- self->CallEndedLocked(retry);
-}
+} // namespace
-void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
- // If this CallState is still in use, this call ended because of a failure,
- // so we need to stop using it and optionally create a new one.
- // Otherwise, we have deliberately ended this call, and no further action
- // is required.
- if (this == health_check_client_->call_state_.get()) {
- health_check_client_->call_state_.reset();
- if (retry) {
- GPR_ASSERT(!health_check_client_->shutting_down_);
- if (seen_response_.load(std::memory_order_acquire)) {
- // If the call fails after we've gotten a successful response, reset
- // the backoff and restart the call immediately.
- health_check_client_->retry_backoff_.Reset();
- health_check_client_->StartCallLocked();
- } else {
- // If the call failed without receiving any messages, retry later.
- health_check_client_->StartRetryTimerLocked();
- }
- }
- }
- // When the last ref to the call stack goes away, the CallState object
- // will be automatically destroyed.
- call_->Unref(DEBUG_LOCATION, "call_ended");
+OrphanablePtr MakeHealthCheckClient(
+ std::string service_name,
+ RefCountedPtr connected_subchannel,
+ grpc_pollset_set* interested_parties,
+ RefCountedPtr channelz_node,
+ RefCountedPtr watcher) {
+ return MakeOrphanable(
+ std::move(connected_subchannel), interested_parties,
+ absl::make_unique(std::move(service_name),
+ std::move(channelz_node),
+ std::move(watcher)),
+ GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
+ ? "HealthCheckClient"
+ : nullptr);
}
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h
index 3b04a90f698..ac4e920f36f 100644
--- a/src/core/ext/filters/client_channel/health/health_check_client.h
+++ b/src/core/ext/filters/client_channel/health/health_check_client.h
@@ -1,177 +1,41 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
+//
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
#include
-#include
-
-#include
-#include
+#include
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
-#include "src/core/lib/backoff/backoff.h"
+#include "src/core/ext/filters/client_channel/subchannel_stream_client.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
-#include "src/core/lib/gprpp/sync.h"
-#include "src/core/lib/iomgr/call_combiner.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/polling_entity.h"
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/resource_quota/arena.h"
-#include "src/core/lib/transport/byte_stream.h"
-#include "src/core/lib/transport/metadata_batch.h"
-#include "src/core/lib/transport/transport.h"
namespace grpc_core {
-class HealthCheckClient : public InternallyRefCounted {
- public:
- HealthCheckClient(std::string service_name,
- RefCountedPtr connected_subchannel,
- grpc_pollset_set* interested_parties,
- RefCountedPtr channelz_node,
- RefCountedPtr watcher);
-
- ~HealthCheckClient() override;
-
- void Orphan() override;
-
- private:
- // Contains a call to the backend and all the data related to the call.
- class CallState : public Orphanable {
- public:
- CallState(RefCountedPtr health_check_client,
- grpc_pollset_set* interested_parties);
- ~CallState() override;
-
- void Orphan() override;
-
- void StartCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthCheckClient::mu_);
-
- private:
- void Cancel();
-
- void StartBatch(grpc_transport_stream_op_batch* batch);
- static void StartBatchInCallCombiner(void* arg, grpc_error_handle error);
-
- void CallEndedLocked(bool retry)
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(health_check_client_->mu_);
-
- static void OnComplete(void* arg, grpc_error_handle error);
- static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
- static void RecvMessageReady(void* arg, grpc_error_handle error);
- static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
- static void StartCancel(void* arg, grpc_error_handle error);
- static void OnCancelComplete(void* arg, grpc_error_handle error);
-
- static void OnByteStreamNext(void* arg, grpc_error_handle error);
- void ContinueReadingRecvMessage();
- grpc_error_handle PullSliceFromRecvMessage();
- void DoneReadingRecvMessage(grpc_error_handle error);
-
- static void AfterCallStackDestruction(void* arg, grpc_error_handle error);
-
- RefCountedPtr health_check_client_;
- grpc_polling_entity pollent_;
-
- ScopedArenaPtr arena_;
- CallCombiner call_combiner_;
- grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
-
- // The streaming call to the backend. Always non-null.
- // Refs are tracked manually; when the last ref is released, the
- // CallState object will be automatically destroyed.
- SubchannelCall* call_;
-
- grpc_transport_stream_op_batch_payload payload_;
- grpc_transport_stream_op_batch batch_;
- grpc_transport_stream_op_batch recv_message_batch_;
- grpc_transport_stream_op_batch recv_trailing_metadata_batch_;
-
- grpc_closure on_complete_;
-
- // send_initial_metadata
- grpc_metadata_batch send_initial_metadata_;
-
- // send_message
- ManualConstructor send_message_;
-
- // send_trailing_metadata
- grpc_metadata_batch send_trailing_metadata_;
-
- // recv_initial_metadata
- grpc_metadata_batch recv_initial_metadata_;
- grpc_closure recv_initial_metadata_ready_;
-
- // recv_message
- OrphanablePtr recv_message_;
- grpc_closure recv_message_ready_;
- grpc_slice_buffer recv_message_buffer_;
- std::atomic seen_response_{false};
-
- // True if the cancel_stream batch has been started.
- std::atomic cancelled_{false};
-
- // recv_trailing_metadata
- grpc_metadata_batch recv_trailing_metadata_;
- grpc_transport_stream_stats collect_stats_;
- grpc_closure recv_trailing_metadata_ready_;
-
- // Closure for call stack destruction.
- grpc_closure after_call_stack_destruction_;
- };
-
- void StartCall();
- void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
-
- void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
- static void OnRetryTimer(void* arg, grpc_error_handle error);
-
- void SetHealthStatus(grpc_connectivity_state state, const char* reason);
- void SetHealthStatusLocked(grpc_connectivity_state state, const char* reason)
- ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
-
- std::string service_name_;
- RefCountedPtr connected_subchannel_;
- grpc_pollset_set* interested_parties_; // Do not own.
- RefCountedPtr channelz_node_;
- MemoryAllocator call_allocator_;
-
- Mutex mu_;
- RefCountedPtr watcher_
- ABSL_GUARDED_BY(mu_);
- bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;
-
- // The data associated with the current health check call. It holds a ref
- // to this HealthCheckClient object.
- OrphanablePtr call_state_ ABSL_GUARDED_BY(mu_);
-
- // Call retry state.
- BackOff retry_backoff_ ABSL_GUARDED_BY(mu_);
- grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_);
- grpc_closure retry_timer_callback_ ABSL_GUARDED_BY(mu_);
- bool retry_timer_callback_pending_ ABSL_GUARDED_BY(mu_) = false;
-};
+OrphanablePtr MakeHealthCheckClient(
+ std::string service_name,
+ RefCountedPtr connected_subchannel,
+ grpc_pollset_set* interested_parties,
+ RefCountedPtr channelz_node,
+ RefCountedPtr watcher);
} // namespace grpc_core
-#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H */
+#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index ba45e1bff12..7520b183f73 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -487,14 +487,14 @@ class Subchannel::HealthWatcherMap::HealthWatcher
void StartHealthCheckingLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_->mu_) {
GPR_ASSERT(health_check_client_ == nullptr);
- health_check_client_ = MakeOrphanable(
+ health_check_client_ = MakeHealthCheckClient(
health_check_service_name_, subchannel_->connected_subchannel_,
subchannel_->pollset_set_, subchannel_->channelz_node_, Ref());
}
WeakRefCountedPtr subchannel_;
std::string health_check_service_name_;
- OrphanablePtr health_check_client_;
+ OrphanablePtr health_check_client_;
grpc_connectivity_state state_;
absl::Status status_;
ConnectivityStateWatcherList watcher_list_;
diff --git a/src/core/ext/filters/client_channel/subchannel_stream_client.cc b/src/core/ext/filters/client_channel/subchannel_stream_client.cc
new file mode 100644
index 00000000000..17fd26c6514
--- /dev/null
+++ b/src/core/ext/filters/client_channel/subchannel_stream_client.cc
@@ -0,0 +1,531 @@
+//
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include
+
+#include "src/core/ext/filters/client_channel/subchannel_stream_client.h"
+
+#include
+#include
+
+#include
+
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/resource_quota/api.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/transport/error_utils.h"
+
+#define SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define SUBCHANNEL_STREAM_RECONNECT_JITTER 0.2
+
+namespace grpc_core {
+
+//
+// SubchannelStreamClient
+//
+
+SubchannelStreamClient::SubchannelStreamClient(
+ RefCountedPtr connected_subchannel,
+ grpc_pollset_set* interested_parties,
+ std::unique_ptr event_handler, const char* tracer)
+ : InternallyRefCounted(tracer),
+ connected_subchannel_(std::move(connected_subchannel)),
+ interested_parties_(interested_parties),
+ tracer_(tracer),
+ call_allocator_(
+ ResourceQuotaFromChannelArgs(connected_subchannel_->args())
+ ->memory_quota()
+ ->CreateMemoryAllocator(tracer)),
+ event_handler_(std::move(event_handler)),
+ retry_backoff_(
+ BackOff::Options()
+ .set_initial_backoff(Duration::Seconds(
+ SUBCHANNEL_STREAM_INITIAL_CONNECT_BACKOFF_SECONDS))
+ .set_multiplier(SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(SUBCHANNEL_STREAM_RECONNECT_JITTER)
+ .set_max_backoff(Duration::Seconds(
+ SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS))) {
+ if (GPR_UNLIKELY(tracer_ != nullptr)) {
+ gpr_log(GPR_INFO, "%s %p: created SubchannelStreamClient", tracer_, this);
+ }
+ GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
+ grpc_schedule_on_exec_ctx);
+ StartCall();
+}
+
+SubchannelStreamClient::~SubchannelStreamClient() {
+ if (GPR_UNLIKELY(tracer_ != nullptr)) {
+ gpr_log(GPR_INFO, "%s %p: destroying SubchannelStreamClient", tracer_,
+ this);
+ }
+}
+
+void SubchannelStreamClient::Orphan() {
+ if (GPR_UNLIKELY(tracer_ != nullptr)) {
+ gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient shutting down", tracer_,
+ this);
+ }
+ {
+ MutexLock lock(&mu_);
+ event_handler_.reset();
+ call_state_.reset();
+ if (retry_timer_callback_pending_) {
+ grpc_timer_cancel(&retry_timer_);
+ }
+ }
+ Unref(DEBUG_LOCATION, "orphan");
+}
+
+void SubchannelStreamClient::StartCall() {
+ MutexLock lock(&mu_);
+ StartCallLocked();
+}
+
+void SubchannelStreamClient::StartCallLocked() {
+ if (event_handler_ == nullptr) return;
+ GPR_ASSERT(call_state_ == nullptr);
+ if (event_handler_ != nullptr) {
+ event_handler_->OnCallStartLocked(this);
+ }
+ call_state_ = MakeOrphanable(Ref(), interested_parties_);
+ if (GPR_UNLIKELY(tracer_ != nullptr)) {
+ gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient created CallState %p",
+ tracer_, this, call_state_.get());
+ }
+ call_state_->StartCallLocked();
+}
+
+void SubchannelStreamClient::StartRetryTimerLocked() {
+ if (event_handler_ != nullptr) {
+ event_handler_->OnRetryTimerStartLocked(this);
+ }
+ Timestamp next_try = retry_backoff_.NextAttemptTime();
+ if (GPR_UNLIKELY(tracer_ != nullptr)) {
+ gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient health check call lost...",
+ tracer_, this);
+ Duration timeout = next_try - ExecCtx::Get()->Now();
+ if (timeout > Duration::Zero()) {
+ gpr_log(GPR_INFO, "%s %p: ... will retry in %" PRId64 "ms.", tracer_,
+ this, timeout.millis());
+ } else {
+ gpr_log(GPR_INFO, "%s %p: ... retrying immediately.", tracer_, this);
+ }
+ }
+ // Ref for callback, tracked manually.
+ Ref(DEBUG_LOCATION, "health_retry_timer").release();
+ retry_timer_callback_pending_ = true;
+ grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
+}
+
+void SubchannelStreamClient::OnRetryTimer(void* arg, grpc_error_handle error) {
+ auto* self = static_cast(arg);
+ {
+ MutexLock lock(&self->mu_);
+ self->retry_timer_callback_pending_ = false;
+ if (self->event_handler_ != nullptr && error == GRPC_ERROR_NONE &&
+ self->call_state_ == nullptr) {
+ if (GPR_UNLIKELY(self->tracer_ != nullptr)) {
+ gpr_log(GPR_INFO,
+ "%s %p: SubchannelStreamClient restarting health check call",
+ self->tracer_, self);
+ }
+ self->StartCallLocked();
+ }
+ }
+ self->Unref(DEBUG_LOCATION, "health_retry_timer");
+}
+
+//
+// SubchannelStreamClient::CallState
+//
+
+SubchannelStreamClient::CallState::CallState(
+ RefCountedPtr health_check_client,
+ grpc_pollset_set* interested_parties)
+ : subchannel_stream_client_(std::move(health_check_client)),
+ pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
+ arena_(Arena::Create(subchannel_stream_client_->connected_subchannel_
+ ->GetInitialCallSizeEstimate(),
+ &subchannel_stream_client_->call_allocator_)),
+ payload_(context_),
+ send_initial_metadata_(arena_.get()),
+ send_trailing_metadata_(arena_.get()),
+ recv_initial_metadata_(arena_.get()),
+ recv_trailing_metadata_(arena_.get()) {}
+
+SubchannelStreamClient::CallState::~CallState() {
+ if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
+ gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient destroying CallState %p",
+ subchannel_stream_client_->tracer_, subchannel_stream_client_.get(),
+ this);
+ }
+ for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
+ if (context_[i].destroy != nullptr) {
+ context_[i].destroy(context_[i].value);
+ }
+ }
+ // Unset the call combiner cancellation closure. This has the
+ // effect of scheduling the previously set cancellation closure, if
+ // any, so that it can release any internal references it may be
+ // holding to the call stack.
+ call_combiner_.SetNotifyOnCancel(nullptr);
+}
+
+void SubchannelStreamClient::CallState::Orphan() {
+ call_combiner_.Cancel(GRPC_ERROR_CANCELLED);
+ Cancel();
+}
+
+void SubchannelStreamClient::CallState::StartCallLocked() {
+ SubchannelCall::Args args = {
+ subchannel_stream_client_->connected_subchannel_,
+ &pollent_,
+ Slice::FromStaticString("/grpc.health.v1.Health/Watch"),
+ gpr_get_cycle_counter(), // start_time
+ Timestamp::InfFuture(), // deadline
+ arena_.get(),
+ context_,
+ &call_combiner_,
+ };
+ grpc_error_handle error = GRPC_ERROR_NONE;
+ call_ = SubchannelCall::Create(std::move(args), &error).release();
+ // Register after-destruction callback.
+ GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
+ this, grpc_schedule_on_exec_ctx);
+ call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
+ // Check if creation failed.
+ if (error != GRPC_ERROR_NONE ||
+ subchannel_stream_client_->event_handler_ == nullptr) {
+ gpr_log(GPR_ERROR,
+ "SubchannelStreamClient %p CallState %p: error creating "
+ "stream on subchannel (%s); will retry",
+ subchannel_stream_client_.get(), this,
+ grpc_error_std_string(error).c_str());
+ GRPC_ERROR_UNREF(error);
+ CallEndedLocked(/*retry=*/true);
+ return;
+ }
+ // Initialize payload and batch.
+ payload_.context = context_;
+ batch_.payload = &payload_;
+ // on_complete callback takes ref, handled manually.
+ call_->Ref(DEBUG_LOCATION, "on_complete").release();
+ batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
+ grpc_schedule_on_exec_ctx);
+ // Add send_initial_metadata op.
+ send_initial_metadata_.Set(
+ HttpPathMetadata(),
+ subchannel_stream_client_->event_handler_->GetPathLocked());
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ payload_.send_initial_metadata.send_initial_metadata =
+ &send_initial_metadata_;
+ payload_.send_initial_metadata.send_initial_metadata_flags = 0;
+ payload_.send_initial_metadata.peer_string = nullptr;
+ batch_.send_initial_metadata = true;
+ // Add send_message op.
+ grpc_slice request_slice =
+ subchannel_stream_client_->event_handler_->EncodeSendMessageLocked();
+ grpc_slice_buffer slice_buffer;
+ grpc_slice_buffer_init(&slice_buffer);
+ grpc_slice_buffer_add(&slice_buffer, request_slice);
+ send_message_.emplace(&slice_buffer, 0);
+ grpc_slice_buffer_destroy_internal(&slice_buffer);
+ payload_.send_message.send_message.reset(&*send_message_);
+ batch_.send_message = true;
+ // Add send_trailing_metadata op.
+ payload_.send_trailing_metadata.send_trailing_metadata =
+ &send_trailing_metadata_;
+ batch_.send_trailing_metadata = true;
+ // Add recv_initial_metadata op.
+ payload_.recv_initial_metadata.recv_initial_metadata =
+ &recv_initial_metadata_;
+ payload_.recv_initial_metadata.recv_flags = nullptr;
+ payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
+ payload_.recv_initial_metadata.peer_string = nullptr;
+ // recv_initial_metadata_ready callback takes ref, handled manually.
+ call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
+ payload_.recv_initial_metadata.recv_initial_metadata_ready =
+ GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
+ this, grpc_schedule_on_exec_ctx);
+ batch_.recv_initial_metadata = true;
+ // Add recv_message op.
+ payload_.recv_message.recv_message = &recv_message_;
+ payload_.recv_message.call_failed_before_recv_message = nullptr;
+ // recv_message callback takes ref, handled manually.
+ call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
+ payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
+ &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
+ batch_.recv_message = true;
+ // Start batch.
+ StartBatch(&batch_);
+ // Initialize recv_trailing_metadata batch.
+ recv_trailing_metadata_batch_.payload = &payload_;
+ // Add recv_trailing_metadata op.
+ payload_.recv_trailing_metadata.recv_trailing_metadata =
+ &recv_trailing_metadata_;
+ payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
+ // This callback signals the end of the call, so it relies on the
+ // initial ref instead of taking a new ref. When it's invoked, the
+ // initial ref is released.
+ payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
+ RecvTrailingMetadataReady, this,
+ grpc_schedule_on_exec_ctx);
+ recv_trailing_metadata_batch_.recv_trailing_metadata = true;
+ // Start recv_trailing_metadata batch.
+ StartBatch(&recv_trailing_metadata_batch_);
+}
+
+void SubchannelStreamClient::CallState::StartBatchInCallCombiner(
+ void* arg, grpc_error_handle /*error*/) {
+ auto* batch = static_cast(arg);
+ auto* call = static_cast(batch->handler_private.extra_arg);
+ call->StartTransportStreamOpBatch(batch);
+}
+
+void SubchannelStreamClient::CallState::StartBatch(
+ grpc_transport_stream_op_batch* batch) {
+ batch->handler_private.extra_arg = call_;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
+ batch, grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
+ GRPC_ERROR_NONE, "start_subchannel_batch");
+}
+
+void SubchannelStreamClient::CallState::AfterCallStackDestruction(
+ void* arg, grpc_error_handle /*error*/) {
+ auto* self = static_cast(arg);
+ delete self;
+}
+
+void SubchannelStreamClient::CallState::OnCancelComplete(
+ void* arg, grpc_error_handle /*error*/) {
+ auto* self = static_cast(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
+ self->call_->Unref(DEBUG_LOCATION, "cancel");
+}
+
+void SubchannelStreamClient::CallState::StartCancel(
+ void* arg, grpc_error_handle /*error*/) {
+ auto* self = static_cast(arg);
+ auto* batch = grpc_make_transport_stream_op(
+ GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
+ batch->cancel_stream = true;
+ batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
+ self->call_->StartTransportStreamOpBatch(batch);
+}
+
+void SubchannelStreamClient::CallState::Cancel() {
+ bool expected = false;
+ if (cancelled_.compare_exchange_strong(expected, true,
+ std::memory_order_acq_rel,
+ std::memory_order_acquire)) {
+ call_->Ref(DEBUG_LOCATION, "cancel").release();
+ GRPC_CALL_COMBINER_START(
+ &call_combiner_,
+ GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE, "health_cancel");
+ }
+}
+
+void SubchannelStreamClient::CallState::OnComplete(
+ void* arg, grpc_error_handle /*error*/) {
+ auto* self = static_cast(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
+ self->send_initial_metadata_.Clear();
+ self->send_trailing_metadata_.Clear();
+ self->call_->Unref(DEBUG_LOCATION, "on_complete");
+}
+
+void SubchannelStreamClient::CallState::RecvInitialMetadataReady(
+ void* arg, grpc_error_handle /*error*/) {
+ auto* self = static_cast(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
+ self->recv_initial_metadata_.Clear();
+ self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
+}
+
+void SubchannelStreamClient::CallState::DoneReadingRecvMessage(
+ grpc_error_handle error) {
+ recv_message_.reset();
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(error);
+ Cancel();
+ grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
+ call_->Unref(DEBUG_LOCATION, "recv_message_ready");
+ return;
+ }
+ // Concatenate the slices to form a single string.
+ std::unique_ptr recv_message_deleter;
+ uint8_t* recv_message;
+ if (recv_message_buffer_.count == 1) {
+ recv_message = GRPC_SLICE_START_PTR(recv_message_buffer_.slices[0]);
+ } else {
+ recv_message =
+ static_cast(gpr_malloc(recv_message_buffer_.length));
+ recv_message_deleter.reset(recv_message);
+ size_t offset = 0;
+ for (size_t i = 0; i < recv_message_buffer_.count; ++i) {
+ memcpy(recv_message + offset,
+ GRPC_SLICE_START_PTR(recv_message_buffer_.slices[i]),
+ GRPC_SLICE_LENGTH(recv_message_buffer_.slices[i]));
+ offset += GRPC_SLICE_LENGTH(recv_message_buffer_.slices[i]);
+ }
+ }
+ // Report payload.
+ {
+ MutexLock lock(&subchannel_stream_client_->mu_);
+ if (subchannel_stream_client_->event_handler_ != nullptr) {
+ subchannel_stream_client_->event_handler_->RecvMessageReadyLocked(
+ subchannel_stream_client_.get(),
+ reinterpret_cast(recv_message), recv_message_buffer_.length);
+ }
+ }
+ seen_response_.store(true, std::memory_order_release);
+ grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
+ // Start another recv_message batch.
+ // This re-uses the ref we're holding.
+ // Note: Can't just reuse batch_ here, since we don't know that all
+ // callbacks from the original batch have completed yet.
+ recv_message_batch_.payload = &payload_;
+ payload_.recv_message.recv_message = &recv_message_;
+ payload_.recv_message.call_failed_before_recv_message = nullptr;
+ payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
+ &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
+ recv_message_batch_.recv_message = true;
+ StartBatch(&recv_message_batch_);
+}
+
+grpc_error_handle
+SubchannelStreamClient::CallState::PullSliceFromRecvMessage() {
+ grpc_slice slice;
+ grpc_error_handle error = recv_message_->Pull(&slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&recv_message_buffer_, slice);
+ }
+ return error;
+}
+
+void SubchannelStreamClient::CallState::ContinueReadingRecvMessage() {
+ while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
+ grpc_error_handle error = PullSliceFromRecvMessage();
+ if (error != GRPC_ERROR_NONE) {
+ DoneReadingRecvMessage(error);
+ return;
+ }
+ if (recv_message_buffer_.length == recv_message_->length()) {
+ DoneReadingRecvMessage(GRPC_ERROR_NONE);
+ break;
+ }
+ }
+}
+
+void SubchannelStreamClient::CallState::OnByteStreamNext(
+ void* arg, grpc_error_handle error) {
+ auto* self = static_cast(arg);
+ if (error != GRPC_ERROR_NONE) {
+ self->DoneReadingRecvMessage(GRPC_ERROR_REF(error));
+ return;
+ }
+ error = self->PullSliceFromRecvMessage();
+ if (error != GRPC_ERROR_NONE) {
+ self->DoneReadingRecvMessage(error);
+ return;
+ }
+ if (self->recv_message_buffer_.length == self->recv_message_->length()) {
+ self->DoneReadingRecvMessage(GRPC_ERROR_NONE);
+ } else {
+ self->ContinueReadingRecvMessage();
+ }
+}
+
+void SubchannelStreamClient::CallState::RecvMessageReady(
+ void* arg, grpc_error_handle /*error*/) {
+ auto* self = static_cast(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
+ if (self->recv_message_ == nullptr) {
+ self->call_->Unref(DEBUG_LOCATION, "recv_message_ready");
+ return;
+ }
+ grpc_slice_buffer_init(&self->recv_message_buffer_);
+ GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self,
+ grpc_schedule_on_exec_ctx);
+ self->ContinueReadingRecvMessage();
+ // Ref will continue to be held until we finish draining the byte stream.
+}
+
+void SubchannelStreamClient::CallState::RecvTrailingMetadataReady(
+ void* arg, grpc_error_handle error) {
+ auto* self = static_cast(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
+ "recv_trailing_metadata_ready");
+ // Get call status.
+ grpc_status_code status =
+ self->recv_trailing_metadata_.get(GrpcStatusMetadata())
+ .value_or(GRPC_STATUS_UNKNOWN);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error_get_status(error, Timestamp::InfFuture(), &status,
+ nullptr /* slice */, nullptr /* http_error */,
+ nullptr /* error_string */);
+ }
+ if (GPR_UNLIKELY(self->subchannel_stream_client_->tracer_ != nullptr)) {
+ gpr_log(GPR_INFO,
+ "%s %p: SubchannelStreamClient CallState %p: health watch failed "
+ "with status %d",
+ self->subchannel_stream_client_->tracer_,
+ self->subchannel_stream_client_.get(), self, status);
+ }
+ // Clean up.
+ self->recv_trailing_metadata_.Clear();
+ // Report call end.
+ MutexLock lock(&self->subchannel_stream_client_->mu_);
+ if (self->subchannel_stream_client_->event_handler_ != nullptr) {
+ self->subchannel_stream_client_->event_handler_
+ ->RecvTrailingMetadataReadyLocked(self->subchannel_stream_client_.get(),
+ status);
+ }
+ // For status UNIMPLEMENTED, give up and assume always healthy.
+ self->CallEndedLocked(/*retry=*/status != GRPC_STATUS_UNIMPLEMENTED);
+}
+
+void SubchannelStreamClient::CallState::CallEndedLocked(bool retry) {
+ // If this CallState is still in use, this call ended because of a failure,
+ // so we need to stop using it and optionally create a new one.
+ // Otherwise, we have deliberately ended this call, and no further action
+ // is required.
+ if (this == subchannel_stream_client_->call_state_.get()) {
+ subchannel_stream_client_->call_state_.reset();
+ if (retry) {
+ GPR_ASSERT(subchannel_stream_client_->event_handler_ != nullptr);
+ if (seen_response_.load(std::memory_order_acquire)) {
+ // If the call fails after we've gotten a successful response, reset
+ // the backoff and restart the call immediately.
+ subchannel_stream_client_->retry_backoff_.Reset();
+ subchannel_stream_client_->StartCallLocked();
+ } else {
+ // If the call failed without receiving any messages, retry later.
+ subchannel_stream_client_->StartRetryTimerLocked();
+ }
+ }
+ }
+ // When the last ref to the call stack goes away, the CallState object
+ // will be automatically destroyed.
+ call_->Unref(DEBUG_LOCATION, "call_ended");
+}
+
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/subchannel_stream_client.h b/src/core/ext/filters/client_channel/subchannel_stream_client.h
new file mode 100644
index 00000000000..65277eeecdd
--- /dev/null
+++ b/src/core/ext/filters/client_channel/subchannel_stream_client.h
@@ -0,0 +1,211 @@
+//
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
+
+#include
+
+#include
+
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/iomgr/call_combiner.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/resource_quota/arena.h"
+#include "src/core/lib/transport/byte_stream.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/transport.h"
+
+namespace grpc_core {
+
+// Represents a streaming call on a subchannel that should be maintained
+// open at all times.
+// If the call fails with UNIMPLEMENTED, no further attempts are made.
+// If the call fails with any other status (including OK), we retry the
+// call with appropriate backoff.
+// The backoff state is reset when we receive a message on a stream.
+//
+// Currently, this assumes server-side streaming, but it could be extended
+// to support full bidi streaming if there is a need in the future.
+class SubchannelStreamClient
+ : public InternallyRefCounted {
+ public:
+ // Interface implemented by caller. Thread safety is provided for the
+ // implementation; only one method will be called by any thread at any
+ // one time (including destruction).
+ //
+ // The address of the SubchannelStreamClient object is passed to most
+ // methods for logging purposes.
+ class CallEventHandler {
+ public:
+ virtual ~CallEventHandler() = default;
+
+ // Returns the path for the streaming call.
+ virtual Slice GetPathLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
+ // Called when a new call attempt is being started.
+ virtual void OnCallStartLocked(SubchannelStreamClient* client)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
+ // Called when a previous call attempt has failed and the retry
+ // timer is started before the next attempt.
+ virtual void OnRetryTimerStartLocked(SubchannelStreamClient* client)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
+ // Returns the message payload to send from the client.
+ virtual grpc_slice EncodeSendMessageLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
+ // Called whenever a message is received from the server.
+ virtual void RecvMessageReadyLocked(SubchannelStreamClient* client,
+ char* message, size_t size)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
+ // Called when a stream fails.
+ virtual void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
+ grpc_status_code status)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
+ };
+
+ // If tracer is non-null, it enables trace logging, with the specified
+ // string being the first part of the log message.
+ // Does not take ownership of interested_parties; the caller is responsible
+ // for ensuring that it will outlive the SubchannelStreamClient.
+ SubchannelStreamClient(
+ RefCountedPtr connected_subchannel,
+ grpc_pollset_set* interested_parties,
+ std::unique_ptr event_handler, const char* tracer);
+
+ ~SubchannelStreamClient() override;
+
+ void Orphan() override;
+
+ private:
+ // Contains a call to the backend and all the data related to the call.
+ class CallState : public Orphanable {
+ public:
+ CallState(RefCountedPtr client,
+ grpc_pollset_set* interested_parties);
+ ~CallState() override;
+
+ void Orphan() override;
+
+ void StartCallLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_);
+
+ private:
+ void Cancel();
+
+ void StartBatch(grpc_transport_stream_op_batch* batch);
+ static void StartBatchInCallCombiner(void* arg, grpc_error_handle error);
+
+ void CallEndedLocked(bool retry)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&subchannel_stream_client_->mu_);
+
+ static void OnComplete(void* arg, grpc_error_handle error);
+ static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
+ static void RecvMessageReady(void* arg, grpc_error_handle error);
+ static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
+ static void StartCancel(void* arg, grpc_error_handle error);
+ static void OnCancelComplete(void* arg, grpc_error_handle error);
+
+ static void OnByteStreamNext(void* arg, grpc_error_handle error);
+ void ContinueReadingRecvMessage();
+ grpc_error_handle PullSliceFromRecvMessage();
+ void DoneReadingRecvMessage(grpc_error_handle error);
+
+ static void AfterCallStackDestruction(void* arg, grpc_error_handle error);
+
+ RefCountedPtr subchannel_stream_client_;
+ grpc_polling_entity pollent_;
+
+ ScopedArenaPtr arena_;
+ CallCombiner call_combiner_;
+ grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
+
+ // The streaming call to the backend. Always non-null.
+ // Refs are tracked manually; when the last ref is released, the
+ // CallState object will be automatically destroyed.
+ SubchannelCall* call_;
+
+ grpc_transport_stream_op_batch_payload payload_;
+ grpc_transport_stream_op_batch batch_;
+ grpc_transport_stream_op_batch recv_message_batch_;
+ grpc_transport_stream_op_batch recv_trailing_metadata_batch_;
+
+ grpc_closure on_complete_;
+
+ // send_initial_metadata
+ grpc_metadata_batch send_initial_metadata_;
+
+ // send_message
+ absl::optional send_message_;
+
+ // send_trailing_metadata
+ grpc_metadata_batch send_trailing_metadata_;
+
+ // recv_initial_metadata
+ grpc_metadata_batch recv_initial_metadata_;
+ grpc_closure recv_initial_metadata_ready_;
+
+ // recv_message
+ OrphanablePtr recv_message_;
+ grpc_closure recv_message_ready_;
+ grpc_slice_buffer recv_message_buffer_;
+ std::atomic seen_response_{false};
+
+ // True if the cancel_stream batch has been started.
+ std::atomic cancelled_{false};
+
+ // recv_trailing_metadata
+ grpc_metadata_batch recv_trailing_metadata_;
+ grpc_transport_stream_stats collect_stats_;
+ grpc_closure recv_trailing_metadata_ready_;
+
+ // Closure for call stack destruction.
+ grpc_closure after_call_stack_destruction_;
+ };
+
+ void StartCall();
+ void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
+
+ void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
+ static void OnRetryTimer(void* arg, grpc_error_handle error);
+
+ RefCountedPtr connected_subchannel_;
+ grpc_pollset_set* interested_parties_; // Do not own.
+ const char* tracer_;
+ MemoryAllocator call_allocator_;
+
+ Mutex mu_;
+ std::unique_ptr event_handler_ ABSL_GUARDED_BY(mu_);
+
+ // The data associated with the current health check call. It holds a ref
+ // to this SubchannelStreamClient object.
+ OrphanablePtr call_state_ ABSL_GUARDED_BY(mu_);
+
+ // Call retry state.
+ BackOff retry_backoff_ ABSL_GUARDED_BY(mu_);
+ grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_);
+ grpc_closure retry_timer_callback_ ABSL_GUARDED_BY(mu_);
+ bool retry_timer_callback_pending_ ABSL_GUARDED_BY(mu_) = false;
+};
+
+} // namespace grpc_core
+
+#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_STREAM_CLIENT_H
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 130f41daa51..6508877eafc 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -73,6 +73,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',
+ 'src/core/ext/filters/client_channel/subchannel_stream_client.cc',
'src/core/ext/filters/client_idle/client_idle_filter.cc',
'src/core/ext/filters/client_idle/idle_filter_state.cc',
'src/core/ext/filters/deadline/deadline_filter.cc',
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 71749c3a920..954b4dfc637 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1151,6 +1151,8 @@ src/core/ext/filters/client_channel/subchannel.h \
src/core/ext/filters/client_channel/subchannel_interface.h \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.h \
+src/core/ext/filters/client_channel/subchannel_stream_client.cc \
+src/core/ext/filters/client_channel/subchannel_stream_client.h \
src/core/ext/filters/client_idle/client_idle_filter.cc \
src/core/ext/filters/client_idle/idle_filter_state.cc \
src/core/ext/filters/client_idle/idle_filter_state.h \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 7d4b7f074a8..01478efe38a 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -975,6 +975,8 @@ src/core/ext/filters/client_channel/subchannel.h \
src/core/ext/filters/client_channel/subchannel_interface.h \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.h \
+src/core/ext/filters/client_channel/subchannel_stream_client.cc \
+src/core/ext/filters/client_channel/subchannel_stream_client.h \
src/core/ext/filters/client_idle/client_idle_filter.cc \
src/core/ext/filters/client_idle/idle_filter_state.cc \
src/core/ext/filters/client_idle/idle_filter_state.h \