diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index fdb18f687f3..ebce801b37c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -491,11 +491,8 @@ static grpc_lb_addresses *process_serverlist_locked( for (size_t i = 0; i < serverlist->num_servers; ++i) { if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; } - if (num_valid == 0) return NULL; - grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid, &lb_token_vtable); - /* second pass: actually populate the addresses and LB tokens (aka user data * to the outside world) to be read by the RR policy during its creation. * Given that the validity tests are very cheap, they are performed again @@ -503,14 +500,12 @@ static grpc_lb_addresses *process_serverlist_locked( * incurr in an allocation due to the arbitrary number of server */ size_t addr_idx = 0; for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { - GPR_ASSERT(addr_idx < num_valid); const grpc_grpclb_server *server = serverlist->servers[sl_idx]; if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; - + GPR_ASSERT(addr_idx < num_valid); /* address processing */ grpc_resolved_address addr; parse_server(server, &addr); - /* lb token processing */ void *user_data; if (server->has_load_balance_token) { @@ -596,7 +591,7 @@ static void update_lb_connectivity_status_locked( grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy); } grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state, - GRPC_ERROR_REF(rr_state_error), + rr_state_error, "update_lb_connectivity_status_locked"); } @@ -678,11 +673,12 @@ static bool pick_from_internal_rr_locked( static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { + grpc_lb_addresses *addresses = + process_serverlist_locked(exec_ctx, glb_policy->serverlist); + GPR_ASSERT(addresses != NULL); grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args)); args->client_channel_factory = glb_policy->cc_factory; args->combiner = glb_policy->base.combiner; - grpc_lb_addresses *addresses = - process_serverlist_locked(exec_ctx, glb_policy->serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; @@ -727,7 +723,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* Connectivity state is a function of the RR policy updated/created */ update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state, rr_state_error); - /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ @@ -761,8 +756,8 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, pp->wrapped_on_complete_arg.client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", - (intptr_t)glb_policy->rr_policy); + gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p", + (void *)glb_policy->rr_policy); } pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, true /* force_async */, pp->target, @@ -788,10 +783,9 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); - if (glb_policy->shutting_down) return; - grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); + GPR_ASSERT(args != NULL); if (glb_policy->rr_policy != NULL) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)", @@ -826,8 +820,8 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, unref_needed = true; gpr_free(rr_connectivity); } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ - update_lb_connectivity_status_locked(exec_ctx, glb_policy, - rr_connectivity->state, error); + update_lb_connectivity_status_locked( + exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ grpc_lb_policy_notify_on_state_change_locked( exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, @@ -1089,6 +1083,16 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } +// Cancel a specific pending pick. +// +// A grpclb pick progresses as follows: +// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be +// handed over to the RR policy (in create_rr_locked()). From that point +// onwards, it'll be RR's responsibility. For cancellations, that implies the +// pick needs also be cancelled by the RR instance. +// - Otherwise, without an RR instance, picks stay pending at this policy's +// level (grpclb), inside the glb_policy->pending_picks list. To cancel these, +// we invoke the completion closure and set *target to NULL right here. static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { @@ -1108,9 +1112,23 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } + if (glb_policy->rr_policy != NULL) { + grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target, + GRPC_ERROR_REF(error)); + } GRPC_ERROR_UNREF(error); } +// Cancel all pending picks. +// +// A grpclb pick progresses as follows: +// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be +// handed over to the RR policy (in create_rr_locked()). From that point +// onwards, it'll be RR's responsibility. For cancellations, that implies the +// pick needs also be cancelled by the RR instance. +// - Otherwise, without an RR instance, picks stay pending at this policy's +// level (grpclb), inside the glb_policy->pending_picks list. To cancel these, +// we invoke the completion closure and set *target to NULL right here. static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, @@ -1132,6 +1150,11 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, } pp = next; } + if (glb_policy->rr_policy != NULL) { + grpc_lb_policy_cancel_picks_locked( + exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask, + initial_metadata_flags_eq, GRPC_ERROR_REF(error)); + } GRPC_ERROR_UNREF(error); } @@ -1463,7 +1486,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op++; /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, + "lb_on_sent_initial_request_locked"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_sent_initial_request); @@ -1480,8 +1504,9 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->reserved = NULL; op++; /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref - * count goes to zero) to be unref'd in lb_on_server_status_received */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + * count goes to zero) to be unref'd in lb_on_server_status_received_locked */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, + "lb_on_server_status_received_locked"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_server_status_received); @@ -1493,8 +1518,9 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; - /* take another weak ref to be unref'd in lb_on_response_received */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received"); + /* take another weak ref to be unref'd/reused in + * lb_on_response_received_locked */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); @@ -1511,13 +1537,12 @@ static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, do_send_client_load_report_locked(exec_ctx, glb_policy); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "lb_on_response_received_locked"); + "lb_on_sent_initial_request_locked"); } static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; @@ -1548,7 +1573,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, } /* take a weak ref (won't prevent calling of \a glb_shutdown() if the * strong ref count goes to zero) to be unref'd in - * send_client_load_report() */ + * send_client_load_report_locked() */ glb_policy->client_load_report_timer_pending = true; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); schedule_next_client_load_report(exec_ctx, glb_policy); @@ -1576,7 +1601,6 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(ipport); } } - /* update serverlist */ if (serverlist->num_servers > 0) { if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, @@ -1611,9 +1635,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); } } - grpc_slice_unref_internal(exec_ctx, response_slice); - if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; @@ -1621,7 +1643,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, op->flags = 0; op->reserved = NULL; op++; - /* reuse the "lb_on_response_received" weak ref taken in + /* reuse the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), @@ -1629,10 +1651,10 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(GRPC_CALL_OK == call_error); } } else { /* empty payload: call cancelled. */ - /* dispose of the "lb_on_response_received" weak ref taken in + /* dispose of the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "lb_on_response_received_empty_payload"); + "lb_on_response_received_locked_empty_payload"); } } @@ -1699,7 +1721,7 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, &glb_policy->lb_on_call_retry, now); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "lb_on_server_status_received"); + "lb_on_server_status_received_locked"); } static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 341763a4d71..bc40165cfb8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -584,10 +584,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // Dispose of outdated subchannel lists. if (sd->subchannel_list != p->subchannel_list && sd->subchannel_list != p->latest_pending_subchannel_list) { - // sd belongs to an outdated subchannel_list: get rid of it. - rr_subchannel_list_shutdown_and_unref(exec_ctx, sd->subchannel_list, - "sl_outdated"); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_outdated"); + char *reason = NULL; + if (sd->subchannel_list->shutting_down) { + reason = "sl_outdated_straggler"; + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, reason); + } else { + reason = "sl_outdated"; + rr_subchannel_list_shutdown_and_unref(exec_ctx, sd->subchannel_list, + reason); + } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, reason); return; } // Now that we're inside the combiner, copy the pending connectivity @@ -753,6 +759,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, for (size_t i = 0; i < addresses->num_addresses; i++) { if (!addresses->addresses[i].is_balancer) ++num_addrs; } + rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs); if (num_addrs == 0) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, @@ -761,18 +768,16 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, if (p->subchannel_list != NULL) { rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, "sl_shutdown_empty_update"); - p->subchannel_list = NULL; } + p->subchannel_list = subchannel_list; // empty list return; } size_t subchannel_index = 0; - rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs); if (p->latest_pending_subchannel_list != NULL && p->started_picking) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down latest pending subchannel list %p, about " - "to be " - "replaced by newer latest %p", + "to be replaced by newer latest %p", (void *)p, (void *)p->latest_pending_subchannel_list, (void *)subchannel_list); } @@ -876,10 +881,10 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args *args) { GPR_ASSERT(args->client_channel_factory != NULL); round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); - rr_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); + rr_update_locked(exec_ctx, &p->base, args); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void *)p, (unsigned long)p->subchannel_list->num_subchannels); diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 86cce2d30d4..1f3255d18d8 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -73,8 +73,8 @@ extern "C" { using std::chrono::system_clock; -using grpc::lb::v1::LoadBalanceResponse; using grpc::lb::v1::LoadBalanceRequest; +using grpc::lb::v1::LoadBalanceResponse; using grpc::lb::v1::LoadBalancer; namespace grpc { @@ -647,7 +647,6 @@ class UpdatesTest : public GrpclbEnd2endTest { TEST_F(UpdatesTest, UpdateBalancers) { const std::vector first_backend{GetBackendPorts()[0]}; const std::vector second_backend{GetBackendPorts()[1]}; - ScheduleResponseForBalancer( 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); ScheduleResponseForBalancer( @@ -934,6 +933,45 @@ TEST_F(SingleBalancerTest, Drop) { EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); } +TEST_F(SingleBalancerTest, DropAllFirst) { + // All registered addresses are marked as "drop". + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 0); + const auto& statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); + } +} + +TEST_F(SingleBalancerTest, DropAll) { + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0); + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 1000); + + // First call succeeds. + auto statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); + } + // But eventually, the update with only dropped servers is processed and calls + // fail. + do { + statuses_and_responses = SendRpc(kMessage_, 1); + ASSERT_EQ(statuses_and_responses.size(), 1UL); + } while (statuses_and_responses[0].first.ok()); + const Status& status = statuses_and_responses[0].first; + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); +} + class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { public: SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {}