Merge remote-tracking branch 'upstream/master' into reorg_util

pull/36792/head
Mark D. Roth 5 months ago
commit 5866331965
  1. 5
      src/core/BUILD
  2. 2
      src/core/client_channel/client_channel.cc
  3. 3
      src/core/client_channel/client_channel_filter.cc
  4. 6
      src/core/client_channel/subchannel.h
  5. 64
      src/core/ext/transport/chaotic_good/client_transport.cc
  6. 7
      src/core/ext/transport/chaotic_good/server_transport.cc
  7. 58
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  8. 2
      src/core/lib/promise/party.cc
  9. 5
      src/core/lib/transport/call_filters.h
  10. 24
      src/core/lib/transport/call_spine.h
  11. 55
      src/core/lib/transport/call_state.h
  12. 6
      src/core/load_balancing/health_check_client.cc
  13. 29
      src/core/load_balancing/outlier_detection/outlier_detection.cc
  14. 12
      src/core/load_balancing/pick_first/pick_first.cc
  15. 8
      src/core/load_balancing/subchannel_interface.h
  16. 2
      test/core/client_channel/bm_load_balanced_call_destination.cc
  17. 2
      test/core/client_channel/load_balanced_call_destination_test.cc
  18. 7
      test/core/end2end/h2_ssl_cert_test.cc
  19. 2
      test/core/load_balancing/bm_picker.cc
  20. 2
      test/core/load_balancing/lb_policy_test_lib.h
  21. 6
      test/core/transport/call_state_test.cc
  22. 2
      test/cpp/end2end/BUILD

@ -3606,7 +3606,10 @@ grpc_cc_library(
grpc_cc_library(
name = "subchannel_interface",
hdrs = ["load_balancing/subchannel_interface.h"],
external_deps = ["absl/status"],
external_deps = [
"absl/status",
"absl/strings",
],
deps = [
"dual_ref_counted",
"iomgr_fwd",

@ -57,6 +57,7 @@
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_interface_internal.h"
#include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/config/core_configuration.h"
@ -167,6 +168,7 @@ class ClientChannel::SubchannelWrapper
void CancelDataWatcher(DataWatcherInterface* watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
void ThrottleKeepaliveTime(int new_keepalive_time);
std::string address() const override { return subchannel_->address(); }
private:
class WatcherWrapper;

@ -63,6 +63,7 @@
#include "src/core/client_channel/subchannel.h"
#include "src/core/client_channel/subchannel_interface_internal.h"
#include "src/core/handshaker/proxy_mapper_registry.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/status_util.h"
@ -616,6 +617,8 @@ class ClientChannelFilter::SubchannelWrapper final
subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
}
std::string address() const override { return subchannel_->address(); }
private:
// This wrapper provides a bridge between the internal Subchannel API
// and the SubchannelInterface API that we expose to LB policies.

@ -33,6 +33,7 @@
#include "src/core/client_channel/connector.h"
#include "src/core/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/iomgr/call_combiner.h"
@ -215,7 +216,10 @@ class Subchannel final : public DualRefCounted<Subchannel> {
channelz::SubchannelNode* channelz_node();
const grpc_resolved_address& address() const { return key_.address(); }
std::string address() const {
return grpc_sockaddr_to_uri(&key_.address())
.value_or("<unknown address type>");
}
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered ~immediately.

@ -250,19 +250,20 @@ void ChaoticGoodClientTransport::AbortWithError() {
}
uint32_t ChaoticGoodClientTransport::MakeStream(CallHandler call_handler) {
ReleasableMutexLock lock(&mu_);
MutexLock lock(&mu_);
const uint32_t stream_id = next_stream_id_++;
const bool on_done_added =
call_handler.OnDone([self = RefAsSubclass<ChaoticGoodClientTransport>(),
stream_id](bool cancelled) {
if (cancelled) {
self->outgoing_frames_.MakeSender().UnbufferedImmediateSend(
CancelFrame{stream_id});
}
MutexLock lock(&self->mu_);
self->stream_map_.erase(stream_id);
});
if (!on_done_added) return 0;
stream_map_.emplace(stream_id, call_handler);
lock.Release();
call_handler.OnDone([self = RefAsSubclass<ChaoticGoodClientTransport>(),
stream_id](bool cancelled) {
if (cancelled) {
self->outgoing_frames_.MakeSender().UnbufferedImmediateSend(
CancelFrame{stream_id});
}
MutexLock lock(&self->mu_);
self->stream_map_.erase(stream_id);
});
return stream_id;
}
@ -322,23 +323,30 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
"outbound_loop", [self = RefAsSubclass<ChaoticGoodClientTransport>(),
call_handler]() mutable {
const uint32_t stream_id = self->MakeStream(call_handler);
return Map(
self->CallOutboundLoop(stream_id, call_handler),
[stream_id, sender = self->outgoing_frames_.MakeSender()](
absl::Status result) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Call " << stream_id << " finished with "
<< result.ToString();
if (!result.ok()) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel";
if (!sender.UnbufferedImmediateSend(CancelFrame{stream_id})) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel failed";
}
}
return result;
});
return If(
stream_id != 0,
[stream_id, call_handler = std::move(call_handler),
self = std::move(self)]() {
return Map(
self->CallOutboundLoop(stream_id, call_handler),
[stream_id, sender = self->outgoing_frames_.MakeSender()](
absl::Status result) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Call " << stream_id
<< " finished with " << result.ToString();
if (!result.ok()) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel";
if (!sender.UnbufferedImmediateSend(
CancelFrame{stream_id})) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Send cancel failed";
}
}
return result;
});
},
[]() { return absl::OkStatus(); });
});
}

@ -439,8 +439,7 @@ absl::Status ChaoticGoodServerTransport::NewStream(
if (stream_id <= last_seen_new_stream_id_) {
return absl::InternalError("Stream id is not increasing");
}
stream_map_.emplace(stream_id, call_initiator);
call_initiator.OnDone(
const bool on_done_added = call_initiator.OnDone(
[self = RefAsSubclass<ChaoticGoodServerTransport>(), stream_id](bool) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << self.get() << " OnDone " << stream_id;
@ -454,6 +453,10 @@ absl::Status ChaoticGoodServerTransport::NewStream(
});
}
});
if (!on_done_added) {
return absl::CancelledError();
}
stream_map_.emplace(stream_id, call_initiator);
return absl::OkStatus();
}

@ -183,24 +183,27 @@ class Chttp2ServerListener : public Server::ListenerInterface {
void Start(OrphanablePtr<grpc_endpoint> endpoint,
const ChannelArgs& args);
void ShutdownLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ActiveConnection::mu_);
// Needed to be able to grab an external ref in
// ActiveConnection::Start()
using InternallyRefCounted<HandshakingState>::Ref;
private:
void OnTimeout() ABSL_LOCKS_EXCLUDED(&connection_->mu_);
void OnTimeout() ABSL_LOCKS_EXCLUDED(&ActiveConnection::mu_);
static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
void OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result);
RefCountedPtr<ActiveConnection> const connection_;
grpc_pollset* const accepting_pollset_;
AcceptorPtr acceptor_;
RefCountedPtr<HandshakeManager> handshake_mgr_
ABSL_GUARDED_BY(&connection_->mu_);
ABSL_GUARDED_BY(&ActiveConnection::mu_);
// State for enforcing handshake timeout on receiving HTTP/2 settings.
Timestamp const deadline_;
absl::optional<EventEngine::TaskHandle> timer_handle_
ABSL_GUARDED_BY(&connection_->mu_);
grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
ABSL_GUARDED_BY(&ActiveConnection::mu_);
grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&ActiveConnection::mu_);
grpc_pollset_set* const interested_parties_;
};
@ -400,9 +403,7 @@ Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
{
MutexLock lock(&connection_->mu_);
if (handshake_mgr_ != nullptr) {
handshake_mgr_->Shutdown(GRPC_ERROR_CREATE("Listener stopped serving."));
}
ShutdownLocked(absl::UnavailableError("Listener stopped serving."));
}
Unref();
}
@ -422,6 +423,13 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
});
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::ShutdownLocked(
absl::Status status) {
if (handshake_mgr_ != nullptr) {
handshake_mgr_->Shutdown(std::move(status));
}
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout() {
grpc_chttp2_transport* transport = nullptr;
{
@ -584,20 +592,28 @@ void Chttp2ServerListener::ActiveConnection::SendGoAway() {
grpc_chttp2_transport* transport = nullptr;
{
MutexLock lock(&mu_);
if (transport_ != nullptr && !shutdown_) {
transport = transport_.get();
drain_grace_timer_handle_ = event_engine_->RunAfter(
std::max(Duration::Zero(),
listener_->args_
.GetDurationFromIntMillis(
GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS)
.value_or(Duration::Minutes(10))),
[self = Ref(DEBUG_LOCATION, "drain_grace_timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnDrainGraceTimeExpiry();
self.reset(DEBUG_LOCATION, "drain_grace_timer");
});
if (!shutdown_) {
// Send a GOAWAY if the transport exists
if (transport_ != nullptr) {
transport = transport_.get();
drain_grace_timer_handle_ = event_engine_->RunAfter(
std::max(Duration::Zero(),
listener_->args_
.GetDurationFromIntMillis(
GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS)
.value_or(Duration::Minutes(10))),
[self = Ref(DEBUG_LOCATION, "drain_grace_timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnDrainGraceTimeExpiry();
self.reset(DEBUG_LOCATION, "drain_grace_timer");
});
}
// Shutdown the handshaker if it's still in progress.
if (handshaking_state_ != nullptr) {
handshaking_state_->ShutdownLocked(
absl::UnavailableError("Connection going away"));
}
shutdown_ = true;
}
}

@ -322,7 +322,7 @@ void Party::RunPartyAndUnref(uint64_t prev_state) {
(prev_state & (kRefMask | keep_allocated_mask)) - kOneRef,
std::memory_order_acq_rel, std::memory_order_acquire)) {
LogStateChange("Run:End", prev_state,
prev_state & (kRefMask | kAllocatedMask) - kOneRef);
(prev_state & (kRefMask | keep_allocated_mask)) - kOneRef);
if ((prev_state & kRefMask) == kOneRef) {
// We're done with the party.
PartyIsOver();

@ -1485,7 +1485,6 @@ class CallFilters {
std::move(value));
}
}
call_state_.FinishPullServerTrailingMetadata();
return value;
});
}
@ -1497,6 +1496,10 @@ class CallFilters {
GRPC_MUST_USE_RESULT auto WasCancelled() {
return [this]() { return call_state_.PollWasCancelled(); };
}
// Returns true if server trailing metadata has been pulled
bool WasServerTrailingMetadataPulled() const {
return call_state_.WasServerTrailingMetadataPulled();
}
// Client & server: fill in final_info with the final status of the call.
void Finalize(const grpc_call_final_info* final_info);

@ -54,17 +54,23 @@ class CallSpine final : public Party {
CallFilters& call_filters() { return call_filters_; }
// Add a callback to be called when server trailing metadata is received.
void OnDone(absl::AnyInvocable<void(bool)> fn) {
// Add a callback to be called when server trailing metadata is received and
// return true.
// If CallOnDone has already been invoked, does nothing and returns false.
GRPC_MUST_USE_RESULT bool OnDone(absl::AnyInvocable<void(bool)> fn) {
if (call_filters().WasServerTrailingMetadataPulled()) {
return false;
}
if (on_done_ == nullptr) {
on_done_ = std::move(fn);
return;
return true;
}
on_done_ = [first = std::move(fn),
next = std::move(on_done_)](bool cancelled) mutable {
first(cancelled);
next(cancelled);
};
return true;
}
void CallOnDone(bool cancelled) {
if (on_done_ != nullptr) std::exchange(on_done_, nullptr)(cancelled);
@ -232,8 +238,8 @@ class CallInitiator {
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void(bool)> fn) {
spine_->OnDone(std::move(fn));
GRPC_MUST_USE_RESULT bool OnDone(absl::AnyInvocable<void(bool)> fn) {
return spine_->OnDone(std::move(fn));
}
template <typename PromiseFactory>
@ -281,8 +287,8 @@ class CallHandler {
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void(bool)> fn) {
spine_->OnDone(std::move(fn));
GRPC_MUST_USE_RESULT bool OnDone(absl::AnyInvocable<void(bool)> fn) {
return spine_->OnDone(std::move(fn));
}
template <typename Promise>
@ -336,8 +342,8 @@ class UnstartedCallHandler {
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void(bool)> fn) {
spine_->OnDone(std::move(fn));
GRPC_MUST_USE_RESULT bool OnDone(absl::AnyInvocable<void(bool)> fn) {
return spine_->OnDone(std::move(fn));
}
template <typename Promise>

@ -52,7 +52,7 @@ class CallState {
Poll<ValueOrFailure<bool>> PollPullServerToClientMessageAvailable();
void FinishPullServerToClientMessage();
Poll<Empty> PollServerTrailingMetadataAvailable();
void FinishPullServerTrailingMetadata();
bool WasServerTrailingMetadataPulled() const;
Poll<bool> PollWasCancelled();
// Debug
std::string DebugString() const;
@ -147,8 +147,6 @@ class CallState {
kReading,
// Main call loop: processing one message
kProcessingServerToClientMessage,
// Processing server trailing metadata
kProcessingServerTrailingMetadata,
kTerminated,
};
static const char* ServerToClientPullStateString(
@ -172,8 +170,6 @@ class CallState {
return "Reading";
case ServerToClientPullState::kProcessingServerToClientMessage:
return "ProcessingServerToClientMessage";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
return "ProcessingServerTrailingMetadata";
case ServerToClientPullState::kTerminated:
return "Terminated";
}
@ -294,7 +290,6 @@ GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void CallState::Start() {
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "Start called twice";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
case ServerToClientPullState::kTerminated:
break;
}
@ -644,7 +639,6 @@ CallState::PollPullServerInitialMetadataAvailable() {
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerInitialMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return false;
@ -703,7 +697,6 @@ CallState::FinishPullServerInitialMetadata() {
case ServerToClientPullState::kIdle:
case ServerToClientPullState::kReading:
case ServerToClientPullState::kProcessingServerToClientMessage:
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "Out of order FinishPullServerInitialMetadata";
case ServerToClientPullState::kTerminated:
return;
@ -766,9 +759,6 @@ CallState::PollPullServerToClientMessageAvailable() {
case ServerToClientPullState::kProcessingServerToClientMessage:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing a message";
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollPullServerToClientMessageAvailable called while "
"processing trailing metadata";
case ServerToClientPullState::kTerminated:
return Failure{};
}
@ -826,9 +816,6 @@ CallState::FinishPullServerToClientMessage() {
server_to_client_pull_state_ = ServerToClientPullState::kIdle;
server_to_client_pull_waiter_.Wake();
break;
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "FinishPullServerToClientMessage called while processing "
"trailing metadata";
case ServerToClientPullState::kTerminated:
break;
}
@ -875,10 +862,7 @@ CallState::PollServerTrailingMetadataAvailable() {
case ServerToClientPushState::kFinished:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
break; // Ready for processing
}
ABSL_FALLTHROUGH_INTENDED;
case ServerToClientPushState::kPushedServerInitialMetadata:
@ -894,26 +878,14 @@ CallState::PollServerTrailingMetadataAvailable() {
case ServerToClientPullState::kIdle:
if (server_trailing_metadata_state_ !=
ServerTrailingMetadataState::kNotPushed) {
server_to_client_pull_state_ =
ServerToClientPullState::kProcessingServerTrailingMetadata;
server_to_client_pull_waiter_.Wake();
return Empty{};
break; // Ready for processing
}
return server_trailing_metadata_waiter_.pending();
case ServerToClientPullState::kProcessingServerTrailingMetadata:
LOG(FATAL) << "PollServerTrailingMetadataAvailable called twice";
case ServerToClientPullState::kTerminated:
return Empty{};
break;
}
Crash("Unreachable");
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void
CallState::FinishPullServerTrailingMetadata() {
GRPC_TRACE_LOG(call_state, INFO)
<< "[call_state] FinishPullServerTrailingMetadata: "
<< GRPC_DUMP_ARGS(this, server_trailing_metadata_state_,
server_trailing_metadata_waiter_.DebugString());
server_to_client_pull_state_ = ServerToClientPullState::kTerminated;
server_to_client_pull_waiter_.Wake();
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
LOG(FATAL) << "FinishPullServerTrailingMetadata called before "
@ -931,6 +903,21 @@ CallState::FinishPullServerTrailingMetadata() {
case ServerTrailingMetadataState::kPulledCancel:
LOG(FATAL) << "FinishPullServerTrailingMetadata called twice";
}
return Empty{};
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline bool
CallState::WasServerTrailingMetadataPulled() const {
switch (server_trailing_metadata_state_) {
case ServerTrailingMetadataState::kNotPushed:
case ServerTrailingMetadataState::kPushed:
case ServerTrailingMetadataState::kPushedCancel:
return false;
case ServerTrailingMetadataState::kPulled:
case ServerTrailingMetadataState::kPulledCancel:
return true;
}
GPR_UNREACHABLE_CODE(Crash("unreachable"));
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline Poll<bool>

@ -166,11 +166,9 @@ void HealthProducer::HealthChecker::OnHealthWatchStatusChange(
// Prepend the subchannel's address to the status if needed.
absl::Status use_status;
if (!status.ok()) {
std::string address_str =
grpc_sockaddr_to_uri(&producer_->subchannel_->address())
.value_or("<unknown address type>");
use_status = absl::Status(
status.code(), absl::StrCat(address_str, ": ", status.message()));
status.code(), absl::StrCat(producer_->subchannel_->address(), ": ",
status.message()));
}
work_serializer_->Schedule(
[self = Ref(), state, status = std::move(use_status)]() mutable {

@ -159,11 +159,14 @@ class OutlierDetectionLb final : public LoadBalancingPolicy {
class WatcherWrapper final
: public SubchannelInterface::ConnectivityStateWatcherInterface {
public:
WatcherWrapper(std::shared_ptr<
WatcherWrapper(WeakRefCountedPtr<SubchannelWrapper> subchannel_wrapper,
std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
health_watcher,
bool ejected)
: watcher_(std::move(health_watcher)), ejected_(ejected) {}
: subchannel_wrapper_(std::move(subchannel_wrapper)),
watcher_(std::move(health_watcher)),
ejected_(ejected) {}
void Eject() {
ejected_ = true;
@ -171,7 +174,8 @@ class OutlierDetectionLb final : public LoadBalancingPolicy {
watcher_->OnConnectivityStateChange(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError(
"subchannel ejected by outlier detection"));
absl::StrCat(subchannel_wrapper_->address(),
": subchannel ejected by outlier detection")));
}
}
@ -192,7 +196,8 @@ class OutlierDetectionLb final : public LoadBalancingPolicy {
if (ejected_) {
new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
status = absl::UnavailableError(
"subchannel ejected by outlier detection");
absl::StrCat(subchannel_wrapper_->address(),
": subchannel ejected by outlier detection"));
}
watcher_->OnConnectivityStateChange(new_state, status);
}
@ -203,6 +208,7 @@ class OutlierDetectionLb final : public LoadBalancingPolicy {
}
private:
WeakRefCountedPtr<SubchannelWrapper> subchannel_wrapper_;
std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
absl::optional<grpc_connectivity_state> last_seen_state_;
@ -463,7 +469,8 @@ void OutlierDetectionLb::SubchannelWrapper::AddDataWatcher(
if (w->type() == HealthProducer::Type()) {
auto* health_watcher = static_cast<HealthWatcher*>(watcher.get());
auto watcher_wrapper = std::make_shared<WatcherWrapper>(
health_watcher->TakeWatcher(), ejected_);
WeakRefAsSubclass<SubchannelWrapper>(), health_watcher->TakeWatcher(),
ejected_);
watcher_wrapper_ = watcher_wrapper.get();
health_watcher->SetWatcher(std::move(watcher_wrapper));
}
@ -534,8 +541,8 @@ OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
: picker_(std::move(picker)), counting_enabled_(counting_enabled) {
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << outlier_detection_lb
<< "] constructed new picker " << this << " and counting "
<< "is " << (counting_enabled ? "enabled" : "disabled");
<< "] constructed new picker " << this << " and counting " << "is "
<< (counting_enabled ? "enabled" : "disabled");
}
LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
@ -904,8 +911,8 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.success_rate_ejection->minimum_hosts) {
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << parent_.get()
<< "] running success rate algorithm: "
<< "stdev_factor=" << config.success_rate_ejection->stdev_factor
<< "] running success rate algorithm: " << "stdev_factor="
<< config.success_rate_ejection->stdev_factor
<< ", enforcement_percentage="
<< config.success_rate_ejection->enforcement_percentage;
// calculate ejection threshold: (mean - stdev *
@ -957,8 +964,8 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.failure_percentage_ejection->minimum_hosts) {
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << parent_.get()
<< "] running failure percentage algorithm: "
<< "threshold=" << config.failure_percentage_ejection->threshold
<< "] running failure percentage algorithm: " << "threshold="
<< config.failure_percentage_ejection->threshold
<< ", enforcement_percentage="
<< config.failure_percentage_ejection->enforcement_percentage;
for (auto& candidate : failure_percentage_ejection_candidates) {

@ -648,7 +648,8 @@ void PickFirst::HealthWatcher::OnConnectivityStateChange(
case GRPC_CHANNEL_TRANSIENT_FAILURE:
policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(absl::UnavailableError(
absl::StrCat("health watch: ", status.message()))));
break;
case GRPC_CHANNEL_SHUTDOWN:
Crash("health watcher reported state SHUTDOWN");
@ -1552,7 +1553,8 @@ void OldPickFirst::HealthWatcher::OnConnectivityStateChange(
case GRPC_CHANNEL_TRANSIENT_FAILURE:
policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
MakeRefCounted<TransientFailurePicker>(absl::UnavailableError(
absl::StrCat("health watch: ", status.message()))));
break;
case GRPC_CHANNEL_SHUTDOWN:
Crash("health watcher reported state SHUTDOWN");
@ -1644,9 +1646,9 @@ void OldPickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
// If there is a pending update, switch to the pending update.
if (p->latest_pending_subchannel_list_ != nullptr) {
GRPC_TRACE_LOG(pick_first, INFO)
<< "Pick First " << p << " promoting pending subchannel "
<< "list " << p->latest_pending_subchannel_list_.get()
<< " to replace " << p->subchannel_list_.get();
<< "Pick First " << p << " promoting pending subchannel list "
<< p->latest_pending_subchannel_list_.get() << " to replace "
<< p->subchannel_list_.get();
p->UnsetSelectedSubchannel();
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.

@ -21,6 +21,7 @@
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/port_platform.h>
@ -102,6 +103,9 @@ class SubchannelInterface : public DualRefCounted<SubchannelInterface> {
// make this API public.
virtual void CancelDataWatcher(DataWatcherInterface* watcher) = 0;
// Return the address in URI format.
virtual std::string address() const = 0;
protected:
void Orphaned() override {}
};
@ -136,6 +140,10 @@ class DelegatingSubchannel : public SubchannelInterface {
wrapped_subchannel_->CancelDataWatcher(watcher);
}
std::string address() const override {
return wrapped_subchannel_->address();
}
private:
RefCountedPtr<SubchannelInterface> wrapped_subchannel_;
};

@ -80,6 +80,8 @@ class LoadBalancedCallDestinationTraits {
return call_destination_;
}
std::string address() const override { return "test"; }
private:
const RefCountedPtr<UnstartedCallDestination> call_destination_;
};

@ -118,6 +118,8 @@ class LoadBalancedCallDestinationTest : public YodelTest {
return call_destination_;
}
std::string address() const override { return "test"; }
private:
const RefCountedPtr<UnstartedCallDestination> call_destination_;
};

@ -42,6 +42,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/util/tmpfile.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/data/ssl_test_data.h"
@ -196,7 +197,7 @@ static CoreTestConfigWrapper configs[] = {
static void simple_request_body(grpc_core::CoreTestFixture* f,
test_result expected_result) {
grpc_call* c;
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30);
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
grpc_core::CqVerifier cqv(cq);
grpc_op ops[6];
@ -226,13 +227,13 @@ static void simple_request_body(grpc_core::CoreTestFixture* f,
CHECK_EQ(error, GRPC_CALL_OK);
cqv.Expect(grpc_core::CqVerifier::tag(1), expected_result == SUCCESS);
cqv.Verify();
cqv.Verify(grpc_core::Duration::Seconds(60));
grpc_call_unref(c);
grpc_channel_destroy(client);
grpc_server_shutdown_and_notify(server, cq, nullptr);
cqv.Expect(nullptr, true);
cqv.Verify();
cqv.Verify(grpc_core::Duration::Seconds(60));
grpc_server_destroy(server);
grpc_completion_queue_shutdown(cq);
CHECK(grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),

@ -124,6 +124,8 @@ class BenchmarkHelper : public std::enable_shared_from_this<BenchmarkHelper> {
void CancelDataWatcher(DataWatcherInterface* watcher) override {}
std::string address() const override { return "test"; }
private:
void AddConnectivityWatcherInternal(
std::shared_ptr<ConnectivityStateWatcherInterface> watcher) {

@ -121,6 +121,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
SubchannelState* state() const { return state_; }
std::string address() const override { return state_->address_; }
private:
// Converts between
// SubchannelInterface::ConnectivityStateWatcherInterface and

@ -245,7 +245,6 @@ TEST(CallStateTest, ReceiveTrailersOnly) {
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) {
@ -256,7 +255,6 @@ TEST(CallStateTest, ReceiveTrailersOnlySkipsInitialMetadataOnUnstartedCalls) {
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
state.FinishPullServerTrailingMetadata();
}
TEST(CallStateTest, RecallNoCancellation) {
@ -268,8 +266,6 @@ TEST(CallStateTest, RecallNoCancellation) {
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(false));
}
@ -282,8 +278,6 @@ TEST(CallStateTest, RecallCancellation) {
EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady(false));
state.FinishPullServerInitialMetadata();
EXPECT_THAT(state.PollServerTrailingMetadataAvailable(), IsReady());
EXPECT_THAT(state.PollWasCancelled(), IsPending());
EXPECT_WAKEUP(activity, state.FinishPullServerTrailingMetadata());
EXPECT_THAT(state.PollWasCancelled(), IsReady(true));
}

@ -373,7 +373,7 @@ grpc_cc_test(
name = "end2end_test",
size = "large",
flaky = True, # TODO(b/151704375)
shard_count = 10,
shard_count = 30,
tags = [
"cpp_end2end_test",
"no_test_ios",

Loading…
Cancel
Save