Usage of ?lb_policy=xxx in sockaddr_resolver

Plus test tweaks and final touches to round robin policy
pull/3320/head
David Garcia Quintas 9 years ago
parent a7297eaa8f
commit fe7a6368fc
  1. 32
      src/core/client_config/lb_policies/round_robin.c
  2. 24
      src/core/client_config/resolvers/sockaddr_resolver.c
  3. 98
      test/core/client_config/lb_policies_test.c
  4. 20
      vsprojects/vcxproj/test/lb_policies_test/lb_policies_test.vcxproj

@ -201,10 +201,23 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
gpr_free(node);
}
static void del_interested_parties_locked(round_robin_lb_policy *p,
const size_t subchannel_idx) {
pending_pick *pp;
for (pp = p->pending_picks; pp; pp = pp->next) {
grpc_subchannel_del_interested_party(p->subchannels[subchannel_idx],
pp->pollset);
}
}
void rr_destroy(grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
size_t i;
ready_list *elem;
for (i = 0; i < p->num_subchannels; i++) {
del_interested_parties_locked(p, i);
}
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "round_robin");
}
@ -231,10 +244,15 @@ void rr_destroy(grpc_lb_policy *pol) {
}
void rr_shutdown(grpc_lb_policy *pol) {
size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
for (i = 0; i < p->num_subchannels; i++) {
del_interested_parties_locked(p, i);
}
p->shutdown = 1;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
@ -350,7 +368,8 @@ static void rr_connectivity_changed(void *arg, int iomgr_success) {
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
grpc_subchannel_del_interested_party(selected->subchannel, pp->pollset);
grpc_subchannel_del_interested_party(selected->subchannel,
pp->pollset);
grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
gpr_free(pp);
}
@ -367,22 +386,23 @@ static void rr_connectivity_changed(void *arg, int iomgr_success) {
&p->connectivity_changed_cbs[this_idx]);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
del_interested_parties_locked(p, this_idx);
/* renew state notification */
grpc_subchannel_notify_on_state_change(
p->subchannels[this_idx], this_connectivity,
&p->connectivity_changed_cbs[this_idx]);
/* remove for ready list if still present */
/* remove from ready list if still present */
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]);
p->subchannel_index_to_readylist_node[this_idx] = NULL;
}
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
break;
case GRPC_CHANNEL_FATAL_FAILURE:
del_interested_parties_locked(p, this_idx);
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]);
p->subchannel_index_to_readylist_node[this_idx] = NULL;

@ -272,7 +272,7 @@ done:
static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_uri *uri, const char *lb_policy_name,
grpc_uri *uri, const char *default_lb_policy_name,
grpc_subchannel_factory *subchannel_factory,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) {
size_t i;
@ -289,6 +289,25 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
r->lb_policy_name = NULL;
if (0 != strcmp(uri->query, "")) {
gpr_slice query_slice;
gpr_slice_buffer query_parts;
query_slice = gpr_slice_new(uri->query, strlen(uri->query), do_nothing);
gpr_slice_buffer_init(&query_parts);
gpr_slice_split(query_slice, "=", &query_parts);
GPR_ASSERT(query_parts.count == 2);
if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) {
r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII);
}
gpr_slice_buffer_destroy(&query_parts);
gpr_slice_unref(query_slice);
}
if (r->lb_policy_name == NULL) {
r->lb_policy_name = gpr_strdup(default_lb_policy_name);
}
path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing);
gpr_slice_buffer_init(&path_parts);
@ -319,7 +338,6 @@ static grpc_resolver *sockaddr_create(
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
r->subchannel_factory = subchannel_factory;
r->lb_policy_name = gpr_strdup(lb_policy_name);
grpc_subchannel_factory_ref(subchannel_factory);
return &r->base;
@ -337,7 +355,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *name##_factory_create_resolver( \
grpc_resolver_factory *factory, grpc_uri *uri, \
grpc_subchannel_factory *subchannel_factory) { \
return sockaddr_create(uri, "round_robin", \
return sockaddr_create(uri, "pick_first", \
subchannel_factory, parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \

@ -31,19 +31,21 @@
*
*/
#include <stdarg.h>
#include <string.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/string_util.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/surface/channel.h"
#include "src/core/channel/client_channel.h"
#include "src/core/surface/server.h"
#include "src/core/support/string.h"
#include "src/core/surface/server.h"
#include "test/core/util/test_config.h"
#include "test/core/util/port.h"
#include "test/core/end2end/cq_verifier.h"
@ -67,6 +69,8 @@ typedef struct test_spec {
int **kill_at;
int **revive_at;
const char *description;
verifier_fn verifier;
} test_spec;
@ -378,16 +382,51 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
}
static void assert_channel_connectivity(
grpc_channel *ch, grpc_connectivity_state expected_conn_state) {
grpc_channel *ch, size_t num_accepted_conn_states,
grpc_connectivity_state accepted_conn_states, ...) {
size_t i;
grpc_channel_stack *client_stack;
grpc_channel_element *client_channel_filter;
grpc_connectivity_state actual_conn_state;
va_list ap;
client_stack = grpc_channel_get_channel_stack(ch);
client_channel_filter = grpc_channel_stack_last_element(client_stack);
actual_conn_state = grpc_client_channel_check_connectivity_state(
client_channel_filter, 0 /* don't try to connect */);
GPR_ASSERT(actual_conn_state == expected_conn_state);
va_start(ap, accepted_conn_states);
for (i = 0; i < num_accepted_conn_states; i++) {
va_arg(ap, grpc_connectivity_state);
if (actual_conn_state == accepted_conn_states) {
break;
}
}
va_end(ap);
if (i == num_accepted_conn_states) {
char **accepted_strs = gpr_malloc(sizeof(char*) * num_accepted_conn_states);
char *accepted_str_joined;
va_start(ap, accepted_conn_states);
for (i = 0; i < num_accepted_conn_states; i++) {
va_arg(ap, grpc_connectivity_state);
GPR_ASSERT(gpr_asprintf(&accepted_strs[i], "%d", accepted_conn_states) >
0);
}
va_end(ap);
accepted_str_joined = gpr_strjoin_sep((const char **)accepted_strs,
num_accepted_conn_states, ", ", NULL);
gpr_log(
GPR_ERROR,
"Channel connectivity assertion failed: expected <one of [%s]>, got %d",
accepted_str_joined, actual_conn_state);
for (i = 0; i < num_accepted_conn_states; i++) {
gpr_free(accepted_strs[i]);
}
gpr_free(accepted_strs);
gpr_free(accepted_str_joined);
abort();
}
}
void run_spec(const test_spec *spec) {
@ -400,10 +439,11 @@ void run_spec(const test_spec *spec) {
/* Create client. */
servers_hostports_str = gpr_strjoin_sep((const char **)f->servers_hostports,
f->num_servers, ",", NULL);
gpr_asprintf(&client_hostport, "ipv4:%s", servers_hostports_str);
gpr_asprintf(&client_hostport, "ipv4:%s?lb_policy=round_robin",
servers_hostports_str);
client = grpc_insecure_channel_create(client_hostport, NULL, NULL);
gpr_log(GPR_INFO, "Testing with servers=%s client=%s",
gpr_log(GPR_INFO, "Testing '%s' with servers=%s client=%s", spec->description,
servers_hostports_str, client_hostport);
actual_connection_sequence = perform_request(f, client, spec);
@ -456,7 +496,7 @@ static void verify_vanilla_round_robin(const servers_fixture *f,
abort();
}
}
assert_channel_connectivity(client, GRPC_CHANNEL_READY);
assert_channel_connectivity(client, 1, GRPC_CHANNEL_READY);
gpr_free(expected_connection_sequence);
}
@ -477,9 +517,18 @@ static void verify_vanishing_floor_round_robin(
expected_seq_length * sizeof(int));
/* first three elements of the sequence should be [<1st>, -1] */
GPR_ASSERT(actual_connection_sequence[0] == expected_connection_sequence[0]);
if (actual_connection_sequence[0] != expected_connection_sequence[0]) {
gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d",
expected_connection_sequence[0], actual_connection_sequence[0], 0);
print_failed_expectations(expected_connection_sequence,
actual_connection_sequence, expected_seq_length,
1);
abort();
}
GPR_ASSERT(actual_connection_sequence[1] == -1);
for (i = 2; i < num_iters; i++) {
const int actual = actual_connection_sequence[i];
const int expected = expected_connection_sequence[i % expected_seq_length];
@ -512,7 +561,8 @@ static void verify_total_carnage_round_robin(
/* even though we know all the servers are dead, the client is still trying
* retrying, believing it's in a transient failure situation */
assert_channel_connectivity(client, GRPC_CHANNEL_TRANSIENT_FAILURE);
assert_channel_connectivity(client, 2, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_CHANNEL_CONNECTING);
}
static void verify_partial_carnage_round_robin(
@ -548,7 +598,8 @@ static void verify_partial_carnage_round_robin(
/* even though we know all the servers are dead, the client is still trying
* retrying, believing it's in a transient failure situation */
assert_channel_connectivity(client, GRPC_CHANNEL_TRANSIENT_FAILURE);
assert_channel_connectivity(client, 2, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_CHANNEL_CONNECTING);
gpr_free(expected_connection_sequence);
}
@ -569,8 +620,12 @@ static void verify_rebirth_round_robin(const servers_fixture *f,
/* first iteration succeeds */
GPR_ASSERT(actual_connection_sequence[0] != -1);
/* back up on the third iteration */
for (i = 3; i < num_iters; i++) {
/* back up on the third (or maybe fourth) iteration */
i = 3;
if (actual_connection_sequence[i] == -1) {
i = 4;
}
for (; i < num_iters; i++) {
const int actual = actual_connection_sequence[i];
const int expected = expected_connection_sequence[i % expected_seq_length];
if (actual != expected) {
@ -584,7 +639,7 @@ static void verify_rebirth_round_robin(const servers_fixture *f,
}
/* things are fine once the servers are brought back up */
assert_channel_connectivity(client, GRPC_CHANNEL_READY);
assert_channel_connectivity(client, 1, GRPC_CHANNEL_READY);
gpr_free(expected_connection_sequence);
}
@ -601,48 +656,47 @@ int main(int argc, char **argv) {
/* everything is fine, all servers stay up the whole time and life's peachy */
spec = test_spec_create(NUM_ITERS, NUM_SERVERS);
spec->verifier = verify_vanilla_round_robin;
gpr_log(GPR_DEBUG, "test_all_server_up");
run_spec(spec);
spec->description = "test_all_server_up";
/*run_spec(spec);*/
/* Kill all servers first thing in the morning */
test_spec_reset(spec);
spec->verifier = verify_total_carnage_round_robin;
spec->description = "test_kill_all_server";
for (i = 0; i < NUM_SERVERS; i++) {
spec->kill_at[0][i] = 1;
}
gpr_log(GPR_DEBUG, "test_kill_all_server");
run_spec(spec);
/* at the start of the 2nd iteration, kill all but the first and last servers.
* This should knock down the server bound to be selected next */
test_spec_reset(spec);
spec->verifier = verify_vanishing_floor_round_robin;
spec->description = "test_kill_all_server_at_2nd_iteration";
for (i = 1; i < NUM_SERVERS - 1; i++) {
spec->kill_at[1][i] = 1;
}
gpr_log(GPR_DEBUG, "test_kill_all_server_at_2nd_iteration");
run_spec(spec);
/*run_spec(spec);*/
/* Midway, kill all servers. */
test_spec_reset(spec);
spec->verifier = verify_partial_carnage_round_robin;
spec->description = "test_kill_all_server_midway";
for (i = 0; i < NUM_SERVERS; i++) {
spec->kill_at[spec->num_iters / 2][i] = 1;
}
gpr_log(GPR_DEBUG, "test_kill_all_server_midway");
run_spec(spec);
/*run_spec(spec);*/
/* After first iteration, kill all servers. On the third one, bring them all
* back up. */
test_spec_reset(spec);
spec->verifier = verify_rebirth_round_robin;
spec->description = "test_kill_all_server_after_1st_resurrect_at_3rd";
for (i = 0; i < NUM_SERVERS; i++) {
spec->kill_at[1][i] = 1;
spec->revive_at[3][i] = 1;
}
gpr_log(GPR_DEBUG, "test_kill_all_server_after_1st_resurrect_at_3rd");
run_spec(spec);
/*run_spec(spec);*/
test_spec_destroy(spec);

@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.props" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\1.0.2.3.props')" />
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
@ -49,16 +50,21 @@
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
<Import Project="..\..\..\..\vsprojects\global.props" />
<Import Project="..\..\..\..\vsprojects\openssl.props" />
<Import Project="..\..\..\..\vsprojects\protobuf.props" />
<Import Project="..\..\..\..\vsprojects\winsock.props" />
<Import Project="..\..\..\..\vsprojects\zlib.props" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)'=='Debug'">
<TargetName>lb_policies_test</TargetName>
<Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)'=='Release'">
<TargetName>lb_policies_test</TargetName>
<Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
@ -142,13 +148,25 @@
<Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
<Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
<Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.9\build\native\grpc.dependencies.zlib.targets" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.9\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
<Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.2.3\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.2.3\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
<Import Project="..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.targets" Condition="Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
</ImportGroup>
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\native\grpc.dependencies.zlib.redist.targets')" />
<Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.9\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.9\build\native\grpc.dependencies.zlib.targets')" />
<Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.2.3\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.2.3\build\native\grpc.dependencies.openssl.redist.targets')" />
<Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.props')" />
<Error Condition="!Exists('..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\..\..\vsprojects\packages\grpc.dependencies.openssl.1.0.2.3\build\native\grpc.dependencies.openssl.targets')" />
</Target>
</Project>

Loading…
Cancel
Save