Add experimental API for resetting connection backoff.

pull/16225/head
Mark D. Roth 6 years ago
parent 3b60506f1a
commit f7e72560b6
  1. 1
      grpc.def
  2. 5
      include/grpc/grpc.h
  3. 9
      include/grpcpp/channel.h
  4. 11
      src/core/ext/filters/client_channel/client_channel.cc
  5. 5
      src/core/ext/filters/client_channel/lb_policy.h
  6. 10
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  7. 8
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  8. 8
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  9. 27
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  10. 8
      src/core/ext/filters/client_channel/resolver.h
  11. 9
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  12. 9
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  13. 18
      src/core/ext/filters/client_channel/subchannel.cc
  14. 7
      src/core/ext/filters/client_channel/subchannel.h
  15. 11
      src/core/lib/surface/channel.cc
  16. 2
      src/core/lib/transport/transport.h
  17. 8
      src/cpp/client/channel_cc.cc
  18. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  19. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  20. 1
      test/core/surface/public_headers_must_be_c89.c
  21. 30
      test/cpp/end2end/client_lb_end2end_test.cc

@ -42,6 +42,7 @@ EXPORTS
grpc_census_call_get_context
grpc_channel_get_target
grpc_channel_get_info
grpc_channel_reset_connect_backoff
grpc_insecure_channel_create
grpc_lame_client_channel_create
grpc_channel_destroy

@ -269,6 +269,11 @@ GRPCAPI char* grpc_channel_get_target(grpc_channel* channel);
GRPCAPI void grpc_channel_get_info(grpc_channel* channel,
const grpc_channel_info* channel_info);
/** EXPERIMENTAL. Resets the channel's connect backoff.
TODO(roth): When we see whether this proves useful, either promote
to non-experimental or remove it. */
GRPCAPI void grpc_channel_reset_connect_backoff(grpc_channel* channel);
/** Create a client channel to 'target'. Additional channel level configuration
MAY be provided by grpc_channel_args, though the expectation is that most
clients will want to simply pass NULL. The user data in 'args' need only

@ -30,6 +30,14 @@
struct grpc_channel;
namespace grpc {
namespace experimental {
/// Resets the channel's connection backoff.
/// TODO(roth): Once we see whether this proves useful, either create a gRFC
/// and change this to be a method of the Channel class, or remove it.
void ChannelResetConnectionBackoff(Channel* channel);
} // namespace experimental
/// Channels represent a connection to an endpoint. Created by \a CreateChannel.
class Channel final : public ChannelInterface,
public internal::CallHook,
@ -52,6 +60,7 @@ class Channel final : public ChannelInterface,
private:
template <class InputMessage, class OutputMessage>
friend class internal::BlockingUnaryCallImpl;
friend void experimental::ChannelResetConnectionBackoff(Channel* channel);
friend std::shared_ptr<Channel> CreateChannelInternal(
const grpc::string& host, grpc_channel* c_channel);
Channel(const grpc::string& host, grpc_channel* c_channel);

@ -622,6 +622,17 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
if (op->reset_connect_backoff) {
if (chand->resolver != nullptr) {
chand->resolver->ResetBackoffLocked();
chand->resolver->RequestReresolutionLocked();
}
if (chand->lb_policy != nullptr) {
chand->lb_policy->ResetBackoffLocked();
}
}
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);

@ -144,7 +144,10 @@ class LoadBalancingPolicy
/// consider whether this method is still needed.
virtual void ExitIdleLocked() GRPC_ABSTRACT;
/// populates child_subchannels and child_channels with the uuids of this
/// Resets connection backoff.
virtual void ResetBackoffLocked() GRPC_ABSTRACT;
/// Populates child_subchannels and child_channels with the uuids of this
/// LB policy's referenced children. This is not invoked from the
/// client_channel's combiner. The implementation is responsible for
/// providing its own synchronization.

@ -134,6 +134,7 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) override;
@ -1214,6 +1215,15 @@ void GrpcLb::ExitIdleLocked() {
}
}
void GrpcLb::ResetBackoffLocked() {
if (lb_channel_ != nullptr) {
grpc_channel_reset_connect_backoff(lb_channel_);
}
if (rr_policy_ != nullptr) {
rr_policy_->ResetBackoffLocked();
}
}
bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) {
PendingPick* pp = PendingPickCreate(pick);
bool pick_done = false;

@ -57,6 +57,7 @@ class PickFirst : public LoadBalancingPolicy {
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* ignored) override;
@ -259,6 +260,13 @@ void PickFirst::ExitIdleLocked() {
}
}
void PickFirst::ResetBackoffLocked() {
subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ResetBackoffLocked();
}
}
bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
// If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) {

@ -68,6 +68,7 @@ class RoundRobin : public LoadBalancingPolicy {
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* ignored) override;
@ -333,6 +334,13 @@ void RoundRobin::ExitIdleLocked() {
}
}
void RoundRobin::ResetBackoffLocked() {
subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ResetBackoffLocked();
}
}
bool RoundRobin::DoPickLocked(PickState* pick) {
const size_t next_ready_index =
subchannel_list_->GetNextReadySubchannelIndexLocked();

@ -107,6 +107,11 @@ class SubchannelData {
// being unreffed.
virtual void UnrefSubchannelLocked(const char* reason);
// Resets the connection backoff.
// TODO(roth): This method should go away when we move the backoff
// code out of the subchannel and into the LB policies.
void ResetBackoffLocked();
// Starts watching the connectivity state of the subchannel.
// ProcessConnectivityChangeLocked() will be called when the
// connectivity state changes.
@ -206,6 +211,11 @@ class SubchannelList
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
// Resets connection backoff of all subchannels.
// TODO(roth): We will probably need to rethink this as part of moving
// the backoff code out of subchannels and into LB policies.
void ResetBackoffLocked();
// Note: Caller must ensure that this is invoked inside of the combiner.
void Orphan() override {
ShutdownLocked();
@ -298,6 +308,14 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
}
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType,
SubchannelDataType>::ResetBackoffLocked() {
if (subchannel_ != nullptr) {
grpc_subchannel_reset_backoff(subchannel_);
}
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType,
SubchannelDataType>::StartConnectivityWatchLocked() {
@ -544,6 +562,15 @@ void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
}
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelList<SubchannelListType,
SubchannelDataType>::ResetBackoffLocked() {
for (size_t i = 0; i < subchannels_.size(); i++) {
SubchannelDataType* sd = &subchannels_[i];
sd->ResetBackoffLocked();
}
}
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */

@ -94,6 +94,14 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
/// throw away unselected subchannels.
virtual void RequestReresolutionLocked() GRPC_ABSTRACT;
/// Resets the re-resolution backoff, if any.
/// This needs to be implemented only by pull-based implementations;
/// for push-based implementations, it will be a no-op.
/// TODO(roth): Pull the backoff code out of resolver and into
/// client_channel, so that it can be shared across resolver
/// implementations. At that point, this method can go away.
virtual void ResetBackoffLocked() {}
void Orphan() override {
// Invoke ShutdownAndUnrefLocked() inside of the combiner.
GRPC_CLOSURE_SCHED(

@ -66,6 +66,8 @@ class AresDnsResolver : public Resolver {
void RequestReresolutionLocked() override;
void ResetBackoffLocked() override;
void ShutdownLocked() override;
private:
@ -187,6 +189,13 @@ void AresDnsResolver::RequestReresolutionLocked() {
}
}
void AresDnsResolver::ResetBackoffLocked() {
if (have_next_resolution_timer_) {
grpc_timer_cancel(&next_resolution_timer_);
}
backoff_.Reset();
}
void AresDnsResolver::ShutdownLocked() {
if (have_next_resolution_timer_) {
grpc_timer_cancel(&next_resolution_timer_);

@ -58,6 +58,8 @@ class NativeDnsResolver : public Resolver {
void RequestReresolutionLocked() override;
void ResetBackoffLocked() override;
void ShutdownLocked() override;
private:
@ -158,6 +160,13 @@ void NativeDnsResolver::RequestReresolutionLocked() {
}
}
void NativeDnsResolver::ResetBackoffLocked() {
if (have_next_resolution_timer_) {
grpc_timer_cancel(&next_resolution_timer_);
}
backoff_.Reset();
}
void NativeDnsResolver::ShutdownLocked() {
if (have_next_resolution_timer_) {
grpc_timer_cancel(&next_resolution_timer_);

@ -132,6 +132,8 @@ struct grpc_subchannel {
bool have_alarm;
/** have we started the backoff loop */
bool backoff_begun;
// reset_backoff() was called while alarm was pending
bool deferred_reset_backoff;
/** our alarm */
grpc_timer alarm;
@ -438,6 +440,9 @@ static void on_alarm(void* arg, grpc_error* error) {
if (c->disconnected) {
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
&error, 1);
} else if (c->deferred_reset_backoff) {
c->deferred_reset_backoff = false;
error = GRPC_ERROR_NONE;
} else {
GRPC_ERROR_REF(error);
}
@ -675,6 +680,19 @@ static void on_subchannel_connected(void* arg, grpc_error* error) {
grpc_channel_args_destroy(delete_channel_args);
}
void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) {
gpr_mu_lock(&subchannel->mu);
if (subchannel->have_alarm) {
subchannel->deferred_reset_backoff = true;
grpc_timer_cancel(&subchannel->alarm);
} else {
subchannel->backoff_begun = false;
subchannel->backoff->Reset();
maybe_start_connecting_locked(subchannel);
}
gpr_mu_unlock(&subchannel->mu);
}
/*
* grpc_subchannel_call implementation
*/

@ -145,6 +145,13 @@ grpc_subchannel_get_connected_subchannel(grpc_subchannel* c);
const grpc_subchannel_key* grpc_subchannel_get_key(
const grpc_subchannel* subchannel);
// Resets the connection backoff of the subchannel.
// TODO(roth): Move connection backoff out of subchannels and up into LB
// policy code (probably by adding a SubchannelGroup between
// SubchannelList and SubchannelData), at which point this method can
// go away.
void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel);
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call* subchannel_call,
grpc_transport_stream_op_batch* op);

@ -281,6 +281,17 @@ void grpc_channel_get_info(grpc_channel* channel,
elem->filter->get_channel_info(elem, channel_info);
}
void grpc_channel_reset_connect_backoff(grpc_channel* channel) {
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_channel_reset_connect_backoff(channel=%p)", 1,
(channel));
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->reset_connect_backoff = true;
grpc_channel_element* elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
elem->filter->start_transport_op(elem, op);
}
static grpc_call* grpc_channel_create_call_internal(
grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,

@ -282,6 +282,8 @@ typedef struct grpc_transport_op {
/** Called when the ping ack is received */
grpc_closure* on_ack;
} send_ping;
// If true, will reset the channel's connection backoff.
bool reset_connect_backoff;
/***************************************************************************
* remaining fields are initialized and used at the discretion of the

@ -84,6 +84,14 @@ grpc::string Channel::GetServiceConfigJSON() const {
&channel_info.service_config_json);
}
namespace experimental {
void ChannelResetConnectionBackoff(Channel* channel) {
grpc_channel_reset_connect_backoff(channel->c_channel_);
}
} // namespace experimental
internal::Call Channel::CreateCall(const internal::RpcMethod& method,
ClientContext* context,
CompletionQueue* cq) {

@ -65,6 +65,7 @@ grpc_census_call_set_context_type grpc_census_call_set_context_import;
grpc_census_call_get_context_type grpc_census_call_get_context_import;
grpc_channel_get_target_type grpc_channel_get_target_import;
grpc_channel_get_info_type grpc_channel_get_info_import;
grpc_channel_reset_connect_backoff_type grpc_channel_reset_connect_backoff_import;
grpc_insecure_channel_create_type grpc_insecure_channel_create_import;
grpc_lame_client_channel_create_type grpc_lame_client_channel_create_import;
grpc_channel_destroy_type grpc_channel_destroy_import;
@ -315,6 +316,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_census_call_get_context_import = (grpc_census_call_get_context_type) GetProcAddress(library, "grpc_census_call_get_context");
grpc_channel_get_target_import = (grpc_channel_get_target_type) GetProcAddress(library, "grpc_channel_get_target");
grpc_channel_get_info_import = (grpc_channel_get_info_type) GetProcAddress(library, "grpc_channel_get_info");
grpc_channel_reset_connect_backoff_import = (grpc_channel_reset_connect_backoff_type) GetProcAddress(library, "grpc_channel_reset_connect_backoff");
grpc_insecure_channel_create_import = (grpc_insecure_channel_create_type) GetProcAddress(library, "grpc_insecure_channel_create");
grpc_lame_client_channel_create_import = (grpc_lame_client_channel_create_type) GetProcAddress(library, "grpc_lame_client_channel_create");
grpc_channel_destroy_import = (grpc_channel_destroy_type) GetProcAddress(library, "grpc_channel_destroy");

@ -170,6 +170,9 @@ extern grpc_channel_get_target_type grpc_channel_get_target_import;
typedef void(*grpc_channel_get_info_type)(grpc_channel* channel, const grpc_channel_info* channel_info);
extern grpc_channel_get_info_type grpc_channel_get_info_import;
#define grpc_channel_get_info grpc_channel_get_info_import
typedef void(*grpc_channel_reset_connect_backoff_type)(grpc_channel* channel);
extern grpc_channel_reset_connect_backoff_type grpc_channel_reset_connect_backoff_import;
#define grpc_channel_reset_connect_backoff grpc_channel_reset_connect_backoff_import
typedef grpc_channel*(*grpc_insecure_channel_create_type)(const char* target, const grpc_channel_args* args, void* reserved);
extern grpc_insecure_channel_create_type grpc_insecure_channel_create_import;
#define grpc_insecure_channel_create grpc_insecure_channel_create_import

@ -104,6 +104,7 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_census_call_get_context);
printf("%lx", (unsigned long) grpc_channel_get_target);
printf("%lx", (unsigned long) grpc_channel_get_info);
printf("%lx", (unsigned long) grpc_channel_reset_connect_backoff);
printf("%lx", (unsigned long) grpc_insecure_channel_create);
printf("%lx", (unsigned long) grpc_lame_client_channel_create);
printf("%lx", (unsigned long) grpc_channel_destroy);

@ -408,6 +408,36 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) {
gpr_atm_rel_store(&g_connection_delay_ms, 0);
}
TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) {
ChannelArguments args;
constexpr int kInitialBackOffMs = 1000;
args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs);
const std::vector<int> ports = {grpc_pick_unused_port_or_die()};
auto channel = BuildChannel("pick_first", args);
auto stub = BuildStub(channel);
SetNextResolution(ports);
// The channel won't become connected (there's no server).
EXPECT_FALSE(
channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
// Bring up a server on the chosen port.
StartServers(1, ports);
const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC);
// Wait for connect, but not long enough. This proves that we're
// being throttled by initial backoff.
EXPECT_FALSE(
channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
// Reset connection backoff.
experimental::ChannelResetConnectionBackoff(channel.get());
// Wait for connect. Should happen ~immediately.
EXPECT_TRUE(
channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10)));
const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0));
gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms);
// We should have waited less than kInitialBackOffMs.
EXPECT_LT(waited_ms, kInitialBackOffMs);
}
TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
// Start servers and send one RPC per server.
const int kNumServers = 3;

Loading…
Cancel
Save