Watch the LB channel using the right initial conn. state

pull/11965/head
David Garcia Quintas 8 years ago
parent 1d27c66d8e
commit 6a7935e14e
  1. 3
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
  2. 15
      test/cpp/end2end/grpclb_end2end_test.cc

@ -1772,7 +1772,8 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
if (!glb_policy->watching_lb_channel) { if (!glb_policy->watching_lb_channel) {
// Watch the LB channel connectivity for connection. // Watch the LB channel connectivity for connection.
glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT; glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
glb_policy->lb_channel, true /* try to connect */);
grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(glb_policy->lb_channel)); grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);

@ -172,11 +172,12 @@ class BalancerServiceImpl : public BalancerService {
shutdown_(false) {} shutdown_(false) {}
Status BalanceLoad(ServerContext* context, Stream* stream) override { Status BalanceLoad(ServerContext* context, Stream* stream) override {
gpr_log(GPR_INFO, "LB: BalanceLoad"); gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
LoadBalanceRequest request; LoadBalanceRequest request;
stream->Read(&request); stream->Read(&request);
IncreaseRequestCount(); IncreaseRequestCount();
gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str()); gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this,
request.DebugString().c_str());
if (client_load_reporting_interval_seconds_ > 0) { if (client_load_reporting_interval_seconds_ > 0) {
LoadBalanceResponse initial_response; LoadBalanceResponse initial_response;
@ -207,7 +208,7 @@ class BalancerServiceImpl : public BalancerService {
if (client_load_reporting_interval_seconds_ > 0) { if (client_load_reporting_interval_seconds_ > 0) {
request.Clear(); request.Clear();
stream->Read(&request); stream->Read(&request);
gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'", gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this,
request.DebugString().c_str()); request.DebugString().c_str());
GPR_ASSERT(request.has_client_stats()); GPR_ASSERT(request.has_client_stats());
// We need to acquire the lock here in order to prevent the notify_one // We need to acquire the lock here in order to prevent the notify_one
@ -231,7 +232,7 @@ class BalancerServiceImpl : public BalancerService {
load_report_cond_.notify_one(); load_report_cond_.notify_one();
} }
done: done:
gpr_log(GPR_INFO, "LB: done"); gpr_log(GPR_INFO, "LB[%p]: done", this);
return Status::OK; return Status::OK;
} }
@ -246,7 +247,7 @@ class BalancerServiceImpl : public BalancerService {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
const bool prev = !shutdown_; const bool prev = !shutdown_;
shutdown_ = true; shutdown_ = true;
gpr_log(GPR_INFO, "LB: shut down"); gpr_log(GPR_INFO, "LB[%p]: shut down", this);
return prev; return prev;
} }
@ -283,13 +284,13 @@ class BalancerServiceImpl : public BalancerService {
private: private:
void SendResponse(Stream* stream, const LoadBalanceResponse& response, void SendResponse(Stream* stream, const LoadBalanceResponse& response,
int delay_ms) { int delay_ms) {
gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms); gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms);
if (delay_ms > 0) { if (delay_ms > 0) {
gpr_sleep_until( gpr_sleep_until(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(delay_ms, GPR_TIMESPAN))); gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
} }
gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'", gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this,
response.DebugString().c_str()); response.DebugString().c_str());
IncreaseResponseCount(); IncreaseResponseCount();
stream->Write(response); stream->Write(response);

Loading…
Cancel
Save