Merge github.com:grpc/grpc into optionalize_roundrobin

pull/6000/head
Craig Tiller 9 years ago
commit 6771361fad
  1. 41
      src/core/ext/lb_policy/pick_first/pick_first.c
  2. 59
      src/core/ext/lb_policy/round_robin/round_robin.c
  3. 14
      src/core/lib/channel/channel_args.h
  4. 5
      src/core/lib/client_config/lb_policy_factory.c
  5. 15
      src/core/lib/client_config/lb_policy_factory.h
  6. 4
      src/core/lib/client_config/lb_policy_registry.c
  7. 3
      src/core/lib/client_config/lb_policy_registry.h
  8. 23
      src/core/lib/client_config/resolvers/dns_resolver.c
  9. 52
      src/core/lib/client_config/resolvers/sockaddr_resolver.c
  10. 26
      src/core/lib/client_config/resolvers/zookeeper_resolver.c
  11. 2
      src/core/lib/iomgr/pollset_set_windows.c
  12. 15
      src/ruby/ext/grpc/rb_byte_buffer.c
  13. 14
      src/ruby/ext/grpc/rb_call.c
  14. 8
      test/core/client_config/lb_policies_test.c
  15. 174
      test/cpp/end2end/thread_stress_test.cc
  16. 22
      test/distrib/node/run_distrib_test.sh
  17. 2
      tools/run_tests/build_artifact_node.sh
  18. 13
      tools/run_tests/build_artifact_python.sh
  19. 2
      tools/run_tests/build_node.sh
  20. 2
      tools/run_tests/build_package_node.sh
  21. 2
      tools/run_tests/pre_build_node.sh
  22. 2
      tools/run_tests/run_node.sh
  23. 38
      tools/run_tests/run_tests.py
  24. 2
      vsprojects/build_vs2010.bat
  25. 2
      vsprojects/build_vs2013.bat
  26. 2
      vsprojects/build_vs2015.bat

@ -391,19 +391,42 @@ static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
if (args->num_subchannels == 0) return NULL;
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->subchannel_factory != NULL);
if (args->addresses->naddrs == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
p->subchannels =
gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
p->num_subchannels = args->num_subchannels;
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"pick_first");
memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * args->num_subchannels);
gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs);
memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->naddrs; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
exec_ctx, args->subchannel_factory, &sc_args);
if (subchannel != NULL) {
p->subchannels[subchannel_idx++] = subchannel;
}
}
if (subchannel_idx == 0) {
gpr_free(p->subchannels);
gpr_free(p);
return NULL;
}
p->num_subchannels = subchannel_idx;
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
gpr_mu_init(&p->mu);
return &p->base;

@ -498,30 +498,47 @@ static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
size_t i;
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->subchannel_factory != NULL);
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
GPR_ASSERT(args->num_subchannels > 0);
memset(p, 0, sizeof(*p));
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
p->num_subchannels = args->num_subchannels;
p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels);
memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
gpr_mu_init(&p->mu);
for (i = 0; i < args->num_subchannels; i++) {
subchannel_data *sd = gpr_malloc(sizeof(*sd));
memset(sd, 0, sizeof(*sd));
p->subchannels[i] = sd;
sd->policy = p;
sd->index = i;
sd->subchannel = args->subchannels[i];
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
p->subchannels =
gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->naddrs; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
exec_ctx, args->subchannel_factory, &sc_args);
if (subchannel != NULL) {
subchannel_data *sd = gpr_malloc(sizeof(*sd));
memset(sd, 0, sizeof(*sd));
p->subchannels[subchannel_idx] = sd;
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
}
}
if (subchannel_idx == 0) {
gpr_free(p->subchannels);
gpr_free(p);
return NULL;
}
p->num_subchannels = subchannel_idx;
/* The (dummy node) root of the ready list */
p->ready_list.subchannel = NULL;
@ -529,6 +546,10 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
p->ready_list.next = NULL;
p->ready_list_last_pick = &p->ready_list;
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
gpr_mu_init(&p->mu);
return &p->base;
}

@ -37,23 +37,23 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
/* Copy some arguments */
/** Copy the arguments in \a src into a new instance */
grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
/* Copy some arguments, stably sorting keys */
grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a);
/** Copy the arguments in \a src into a new instance, stably sorting keys */
grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *src);
/** Copy some arguments and add the to_add parameter in the end.
If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */
/** Copy the arguments in \a src and append \a to_add. If \a to_add is NULL, it
* is equivalent to calling \a grpc_channel_args_copy. */
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add,
size_t num_to_add);
/** Copy args from a then args from b into a new channel args */
/** Concatenate args from \a a and \a b into a new instance */
grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
const grpc_channel_args *b);
/** Destroy arguments created by grpc_channel_args_copy */
/** Destroy arguments created by \a grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
/** Reads census_enabled settings from channel args. Returns 1 if census_enabled

@ -42,7 +42,8 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory) {
}
grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy(
grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) {
grpc_exec_ctx* exec_ctx, grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
if (factory == NULL) return NULL;
return factory->vtable->create_lb_policy(factory, args);
return factory->vtable->create_lb_policy(exec_ctx, factory, args);
}

@ -35,7 +35,10 @@
#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
#include "src/core/lib/client_config/lb_policy.h"
#include "src/core/lib/client_config/subchannel.h"
#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
@ -47,8 +50,8 @@ struct grpc_lb_policy_factory {
};
typedef struct grpc_lb_policy_args {
grpc_subchannel **subchannels;
size_t num_subchannels;
grpc_resolved_addresses *addresses;
grpc_subchannel_factory *subchannel_factory;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
@ -56,7 +59,8 @@ struct grpc_lb_policy_factory_vtable {
void (*unref)(grpc_lb_policy_factory *factory);
/** Implementation of grpc_lb_policy_factory_create_lb_policy */
grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory,
grpc_lb_policy *(*create_lb_policy)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args);
/** Name for the LB policy this factory implements */
@ -68,6 +72,7 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory);
/** Create a lb_policy instance. */
grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy(
grpc_lb_policy_factory *factory, grpc_lb_policy_args *args);
grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args);
#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H */

@ -74,10 +74,10 @@ static grpc_lb_policy_factory *lookup_factory(const char *name) {
return NULL;
}
grpc_lb_policy *grpc_lb_policy_create(const char *name,
grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name,
grpc_lb_policy_args *args) {
grpc_lb_policy_factory *factory = lookup_factory(name);
grpc_lb_policy *lb_policy =
grpc_lb_policy_factory_create_lb_policy(factory, args);
grpc_lb_policy_factory_create_lb_policy(exec_ctx, factory, args);
return lb_policy;
}

@ -35,6 +35,7 @@
#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
#include "src/core/lib/client_config/lb_policy_factory.h"
#include "src/core/lib/iomgr/exec_ctx.h"
/** Initialize the registry and set \a default_factory as the factory to be
* returned when no name is provided in a lookup */
@ -48,7 +49,7 @@ void grpc_register_lb_policy(grpc_lb_policy_factory *factory);
*
* If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init
* will be returned. */
grpc_lb_policy *grpc_lb_policy_create(const char *name,
grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name,
grpc_lb_policy_args *args);
#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */

@ -162,38 +162,23 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolved_addresses *addresses) {
dns_resolver *r = arg;
grpc_client_config *config = NULL;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
size_t i;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = 0;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
size_t naddrs = 0;
for (i = 0; i < addresses->naddrs; i++) {
memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
args.addr_len = (size_t)addresses->addrs[i].len;
grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
exec_ctx, r->subchannel_factory, &args);
if (subchannel != NULL) {
subchannels[naddrs++] = subchannel;
}
}
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.subchannels = subchannels;
lb_policy_args.num_subchannels = naddrs;
lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
lb_policy_args.addresses = addresses;
lb_policy_args.subchannel_factory = r->subchannel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
grpc_client_config_set_lb_policy(config, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
}
grpc_resolved_addresses_destroy(addresses);
gpr_free(subchannels);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);

@ -58,11 +58,7 @@ typedef struct {
char *lb_policy_name;
/** the addresses that we've 'resolved' */
struct sockaddr_storage *addrs;
/** the corresponding length of the addresses */
size_t *addrs_len;
/** how many elements in \a addrs */
size_t num_addrs;
grpc_resolved_addresses *addresses;
/** mutex guarding the rest of the state */
gpr_mu mu;
@ -125,28 +121,14 @@ static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
grpc_lb_policy_args lb_policy_args;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
if (r->next_completion != NULL && !r->published) {
size_t i;
cfg = grpc_client_config_create();
subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs);
for (i = 0; i < r->num_addrs; i++) {
memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)&r->addrs[i];
args.addr_len = r->addrs_len[i];
subchannels[i] = grpc_subchannel_factory_create_subchannel(
exec_ctx, r->subchannel_factory, &args);
}
grpc_client_config *cfg = grpc_client_config_create();
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.subchannels = subchannels;
lb_policy_args.num_subchannels = r->num_addrs;
lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
gpr_free(subchannels);
lb_policy_args.addresses = r->addresses;
lb_policy_args.subchannel_factory = r->subchannel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;
@ -160,8 +142,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
gpr_free(r->addrs);
gpr_free(r->addrs_len);
grpc_resolved_addresses_destroy(r->addresses);
gpr_free(r->lb_policy_name);
gpr_free(r);
}
@ -269,7 +250,6 @@ static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_resolver_args *args, const char *default_lb_policy_name,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
size_t i;
int errors_found = 0; /* GPR_FALSE */
sockaddr_resolver *r;
gpr_slice path_slice;
@ -309,15 +289,18 @@ static grpc_resolver *sockaddr_create(
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
r->num_addrs = path_parts.count;
r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs);
r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs);
r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
r->addresses->naddrs = path_parts.count;
r->addresses->addrs =
gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs);
for (i = 0; i < r->num_addrs; i++) {
for (size_t i = 0; i < r->addresses->naddrs; i++) {
grpc_uri ith_uri = *args->uri;
char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) {
if (!parse(&ith_uri,
(struct sockaddr_storage *)(&r->addresses->addrs[i].addr),
&r->addresses->addrs[i].len)) {
errors_found = 1; /* GPR_TRUE */
}
gpr_free(part_str);
@ -328,8 +311,7 @@ static grpc_resolver *sockaddr_create(
gpr_slice_unref(path_slice);
if (errors_found) {
gpr_free(r->lb_policy_name);
gpr_free(r->addrs);
gpr_free(r->addrs_len);
grpc_resolved_addresses_destroy(r->addresses);
gpr_free(r);
return NULL;
}

@ -184,28 +184,22 @@ static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolved_addresses *addresses) {
zookeeper_resolver *r = arg;
grpc_client_config *config = NULL;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
size_t i;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
for (i = 0; i < addresses->naddrs; i++) {
memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
args.addr_len = addresses->addrs[i].len;
subchannels[i] = grpc_subchannel_factory_create_subchannel(
exec_ctx, r->subchannel_factory, &args);
lb_policy_args.addresses = addresses;
lb_policy_args.subchannel_factory = r->subchannel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
grpc_client_config_set_lb_policy(config, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
}
lb_policy_args.subchannels = subchannels;
lb_policy_args.num_subchannels = addresses->naddrs;
lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(config, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
grpc_resolved_addresses_destroy(addresses);
gpr_free(subchannels);
}
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving == 1);

@ -37,7 +37,7 @@
#include "src/core/lib/iomgr/pollset_set_windows.h"
grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; }
grpc_pollset_set* grpc_pollset_set_create(void) { return NULL; }
void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}

@ -50,21 +50,18 @@ grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) {
}
VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) {
size_t length = 0;
char *string = NULL;
size_t offset = 0;
VALUE rb_string;
grpc_byte_buffer_reader reader;
gpr_slice next;
if (buffer == NULL) {
return Qnil;
}
length = grpc_byte_buffer_length(buffer);
string = xmalloc(length + 1);
rb_string = rb_str_buf_new(grpc_byte_buffer_length(buffer));
grpc_byte_buffer_reader_init(&reader, buffer);
while (grpc_byte_buffer_reader_next(&reader, &next) != 0) {
memcpy(string + offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next));
offset += GPR_SLICE_LENGTH(next);
rb_str_cat(rb_string, (const char *) GPR_SLICE_START_PTR(next),
GPR_SLICE_LENGTH(next));
gpr_slice_unref(next);
}
return rb_str_new(string, length);
return rb_string;
}

@ -551,13 +551,26 @@ static void grpc_run_batch_stack_init(run_batch_stack *st,
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
* cleaned up */
static void grpc_run_batch_stack_cleanup(run_batch_stack *st) {
size_t i = 0;
grpc_metadata_array_destroy(&st->send_metadata);
grpc_metadata_array_destroy(&st->send_trailing_metadata);
grpc_metadata_array_destroy(&st->recv_metadata);
grpc_metadata_array_destroy(&st->recv_trailing_metadata);
if (st->recv_status_details != NULL) {
gpr_free(st->recv_status_details);
}
if (st->recv_message != NULL) {
grpc_byte_buffer_destroy(st->recv_message);
}
for (i = 0; i < st->op_num; i++) {
if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) {
grpc_byte_buffer_destroy(st->ops[i].data.send_message);
}
}
}
/* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
@ -643,7 +656,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
break;
case GRPC_OP_SEND_MESSAGE:
rb_struct_aset(result, sym_send_message, Qtrue);
grpc_byte_buffer_destroy(st->ops[i].data.send_message);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
rb_struct_aset(result, sym_send_close, Qtrue);

@ -872,6 +872,7 @@ static void verify_rebirth_round_robin(const servers_fixture *f,
}
int main(int argc, char **argv) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
test_spec *spec;
size_t i;
const size_t NUM_ITERS = 10;
@ -881,9 +882,9 @@ int main(int argc, char **argv) {
grpc_init();
grpc_tracer_set_enabled("round_robin", 1);
GPR_ASSERT(grpc_lb_policy_create("this-lb-policy-does-not-exist", NULL) ==
NULL);
GPR_ASSERT(grpc_lb_policy_create(NULL, NULL) == NULL);
GPR_ASSERT(grpc_lb_policy_create(&exec_ctx, "this-lb-policy-does-not-exist",
NULL) == NULL);
GPR_ASSERT(grpc_lb_policy_create(&exec_ctx, NULL, NULL) == NULL);
spec = test_spec_create(NUM_ITERS, NUM_SERVERS);
/* everything is fine, all servers stay up the whole time and life's peachy */
@ -935,6 +936,7 @@ int main(int argc, char **argv) {
test_pending_calls(4);
test_ping();
grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
}

@ -58,6 +58,7 @@ using std::chrono::system_clock;
const int kNumThreads = 100; // Number of threads
const int kNumAsyncSendThreads = 2;
const int kNumAsyncReceiveThreads = 50;
const int kNumAsyncServerThreads = 50;
const int kNumRpcs = 1000; // Number of RPCs per thread
namespace grpc {
@ -174,23 +175,12 @@ class TestServiceImplDupPkg
}
};
template <class Service>
class CommonStressTest {
public:
CommonStressTest() : kMaxMessageSize_(8192) {}
void SetUp() {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(),
InsecureServerCredentials());
builder.RegisterService(&service_);
builder.SetMaxMessageSize(
kMaxMessageSize_); // For testing max message size.
builder.RegisterService(&dup_pkg_service_);
server_ = builder.BuildAndStart();
}
void TearDown() { server_->Shutdown(); }
virtual void SetUp() = 0;
virtual void TearDown() = 0;
void ResetStub() {
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureChannelCredentials());
@ -198,15 +188,137 @@ class CommonStressTest {
}
grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
builder->AddListeningPort(server_address_.str(),
InsecureServerCredentials());
builder->RegisterService(service);
builder->SetMaxMessageSize(
kMaxMessageSize_); // For testing max message size.
builder->RegisterService(&dup_pkg_service_);
}
void SetUpEnd(ServerBuilder* builder) { server_ = builder->BuildAndStart(); }
void TearDownStart() { server_->Shutdown(); }
void TearDownEnd() {}
private:
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
std::ostringstream server_address_;
const int kMaxMessageSize_;
TestServiceImpl service_;
TestServiceImplDupPkg dup_pkg_service_;
};
class CommonStressTestSyncServer : public CommonStressTest<TestServiceImpl> {
public:
void SetUp() GRPC_OVERRIDE {
ServerBuilder builder;
SetUpStart(&builder, &service_);
SetUpEnd(&builder);
}
void TearDown() GRPC_OVERRIDE {
TearDownStart();
TearDownEnd();
}
private:
TestServiceImpl service_;
};
class CommonStressTestAsyncServer
: public CommonStressTest<::grpc::testing::EchoTestService::AsyncService> {
public:
void SetUp() GRPC_OVERRIDE {
shutting_down_ = false;
ServerBuilder builder;
SetUpStart(&builder, &service_);
cq_ = builder.AddCompletionQueue();
SetUpEnd(&builder);
contexts_ = new Context[kNumAsyncServerThreads * 100];
for (int i = 0; i < kNumAsyncServerThreads * 100; i++) {
RefreshContext(i);
}
for (int i = 0; i < kNumAsyncServerThreads; i++) {
server_threads_.push_back(
new std::thread(&CommonStressTestAsyncServer::ProcessRpcs, this));
}
}
void TearDown() GRPC_OVERRIDE {
{
unique_lock<mutex> l(mu_);
TearDownStart();
shutting_down_ = true;
cq_->Shutdown();
}
for (int i = 0; i < kNumAsyncServerThreads; i++) {
server_threads_[i]->join();
delete server_threads_[i];
}
void* ignored_tag;
bool ignored_ok;
while (cq_->Next(&ignored_tag, &ignored_ok))
;
TearDownEnd();
delete[] contexts_;
}
private:
void ProcessRpcs() {
void* tag;
bool ok;
while (cq_->Next(&tag, &ok)) {
if (ok) {
int i = static_cast<int>(reinterpret_cast<intptr_t>(tag));
switch (contexts_[i].state) {
case Context::READY: {
contexts_[i].state = Context::DONE;
EchoResponse send_response;
send_response.set_message(contexts_[i].recv_request.message());
contexts_[i].response_writer->Finish(send_response, Status::OK,
tag);
break;
}
case Context::DONE:
RefreshContext(i);
break;
}
}
}
}
void RefreshContext(int i) {
unique_lock<mutex> l(mu_);
if (!shutting_down_) {
contexts_[i].state = Context::READY;
contexts_[i].srv_ctx.reset(new ServerContext);
contexts_[i].response_writer.reset(
new grpc::ServerAsyncResponseWriter<EchoResponse>(
contexts_[i].srv_ctx.get()));
service_.RequestEcho(contexts_[i].srv_ctx.get(),
&contexts_[i].recv_request,
contexts_[i].response_writer.get(), cq_.get(),
cq_.get(), (void*)(intptr_t)i);
}
}
struct Context {
std::unique_ptr<ServerContext> srv_ctx;
std::unique_ptr<grpc::ServerAsyncResponseWriter<EchoResponse>>
response_writer;
EchoRequest recv_request;
enum { READY, DONE } state;
} * contexts_;
::grpc::testing::EchoTestService::AsyncService service_;
std::unique_ptr<ServerCompletionQueue> cq_;
bool shutting_down_;
mutex mu_;
std::vector<std::thread*> server_threads_;
};
template <class Common>
class End2endTest : public ::testing::Test {
protected:
End2endTest() {}
@ -214,7 +326,7 @@ class End2endTest : public ::testing::Test {
void TearDown() GRPC_OVERRIDE { common_.TearDown(); }
void ResetStub() { common_.ResetStub(); }
CommonStressTest common_;
Common common_;
};
static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
@ -230,11 +342,16 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
}
}
TEST_F(End2endTest, ThreadStress) {
common_.ResetStub();
typedef ::testing::Types<CommonStressTestSyncServer,
CommonStressTestAsyncServer>
CommonTypes;
TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub();
std::vector<std::thread*> threads;
for (int i = 0; i < kNumThreads; ++i) {
threads.push_back(new std::thread(SendRpc, common_.GetStub(), kNumRpcs));
threads.push_back(
new std::thread(SendRpc, this->common_.GetStub(), kNumRpcs));
}
for (int i = 0; i < kNumThreads; ++i) {
threads[i]->join();
@ -242,6 +359,7 @@ TEST_F(End2endTest, ThreadStress) {
}
}
template <class Common>
class AsyncClientEnd2endTest : public ::testing::Test {
protected:
AsyncClientEnd2endTest() : rpcs_outstanding_(0) {}
@ -309,31 +427,33 @@ class AsyncClientEnd2endTest : public ::testing::Test {
}
}
CommonStressTest common_;
Common common_;
CompletionQueue cq_;
mutex mu_;
condition_variable cv_;
int rpcs_outstanding_;
};
TEST_F(AsyncClientEnd2endTest, ThreadStress) {
common_.ResetStub();
TYPED_TEST_CASE(AsyncClientEnd2endTest, CommonTypes);
TYPED_TEST(AsyncClientEnd2endTest, ThreadStress) {
this->common_.ResetStub();
std::vector<std::thread *> send_threads, completion_threads;
for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
completion_threads.push_back(new std::thread(
&AsyncClientEnd2endTest_ThreadStress_Test::AsyncCompleteRpc, this));
&AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncCompleteRpc,
this));
}
for (int i = 0; i < kNumAsyncSendThreads; ++i) {
send_threads.push_back(
new std::thread(&AsyncClientEnd2endTest_ThreadStress_Test::AsyncSendRpc,
this, kNumRpcs));
send_threads.push_back(new std::thread(
&AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncSendRpc,
this, kNumRpcs));
}
for (int i = 0; i < kNumAsyncSendThreads; ++i) {
send_threads[i]->join();
delete send_threads[i];
}
Wait();
this->Wait();
for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
completion_threads[i]->join();
delete completion_threads[i];

@ -28,23 +28,31 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
function finish() {
rv=$?
kill $STATIC_PID || true
curl "localhost:32767/drop/$STATIC_PORT" || true
exit $rv
}
trap finish EXIT
NODE_VERSION=$1
source ~/.nvm/nvm.sh
set -ex
cd $(dirname $0)
nvm install $NODE_VERSION
set -ex
npm install -g node-static
# Kill off existing static servers
kill -9 $(ps aux | grep '[n]ode .*static' | awk '{print $2}') || true
STATIC_SERVER=127.0.0.1
STATIC_PORT=8080
# If port_server is running, get port from that. Otherwise, assume we're in
# docker and use 8080
STATIC_PORT=$(curl 'localhost:32767/get' || echo '8080')
# Serves the input_artifacts directory statically at localhost:8080
# Serves the input_artifacts directory statically at localhost:
static "$EXTERNAL_GIT_ROOT/input_artifacts" -a $STATIC_SERVER -p $STATIC_PORT &
STATIC_PID=$!
@ -52,6 +60,4 @@ STATIC_URL="http://$STATIC_SERVER:$STATIC_PORT/"
npm install --unsafe-perm $STATIC_URL/grpc.tgz --grpc_node_binary_host_mirror=$STATIC_URL
kill -9 $STATIC_PID
./distrib_test.js

@ -30,9 +30,9 @@
NODE_TARGET_ARCH=$1
source ~/.nvm/nvm.sh
set -ex
nvm use 4
set -ex
cd $(dirname $0)/../..

@ -35,15 +35,18 @@ cd $(dirname $0)/../..
if [ "$SKIP_PIP_INSTALL" == "" ]
then
pip install --upgrade six
pip install --upgrade setuptools
# There's a bug in newer versions of setuptools (see
# https://bitbucket.org/pypa/setuptools/issues/503/pkg_resources_vendorpackagingrequirementsi)
pip install --upgrade 'setuptools==18'
pip install -rrequirements.txt
fi
export GRPC_PYTHON_USE_CUSTOM_BDIST=0
export GRPC_PYTHON_BUILD_WITH_CYTHON=1
# Build the source distribution first because MANIFEST.in cannot override
# exclusion of built shared objects among package resources (for some
# inexplicable reason).
GRPC_PYTHON_USE_CUSTOM_BDIST=0 \
GRPC_PYTHON_BUILD_WITH_CYTHON=1 \
${SETARCH_CMD} python setup.py \
sdist
@ -51,15 +54,11 @@ ${SETARCH_CMD} python setup.py \
# and thus ought to be run in a shell command separate of others. Further, it
# trashes the actual bdist_wheel output, so it should be run first so that
# bdist_wheel may be run unmolested.
GRPC_PYTHON_USE_CUSTOM_BDIST=0 \
GRPC_PYTHON_BUILD_WITH_CYTHON=1 \
${SETARCH_CMD} python setup.py \
build_tagged_ext
# Wheel has a bug where directories don't get excluded.
# https://bitbucket.org/pypa/wheel/issues/99/cannot-exclude-directory
GRPC_PYTHON_USE_CUSTOM_BDIST=0 \
GRPC_PYTHON_BUILD_WITH_CYTHON=1 \
${SETARCH_CMD} python setup.py \
bdist_wheel

@ -31,9 +31,9 @@
NODE_VERSION=$1
source ~/.nvm/nvm.sh
set -ex
nvm use $NODE_VERSION
set -ex
CONFIG=${CONFIG:-opt}

@ -29,9 +29,9 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
source ~/.nvm/nvm.sh
set -ex
nvm use 4
set -ex
cd $(dirname $0)/../..

@ -31,9 +31,9 @@
NODE_VERSION=$1
source ~/.nvm/nvm.sh
set -ex
nvm use $NODE_VERSION
set -ex
export GRPC_CONFIG=${CONFIG:-opt}

@ -30,9 +30,9 @@
NODE_VERSION=$1
source ~/.nvm/nvm.sh
set -ex
nvm use $NODE_VERSION
set -ex
CONFIG=${CONFIG:-opt}

@ -120,7 +120,12 @@ def get_c_tests(travis, test_lang) :
def _check_compiler(compiler, supported_compilers):
if compiler not in supported_compilers:
raise Exception('Compiler %s not supported.' % compiler)
raise Exception('Compiler %s not supported (on this platform).' % compiler)
def _check_arch(arch, supported_archs):
if arch not in supported_archs:
raise Exception('Architecture %s not supported.' % arch)
def _is_use_docker_child():
@ -464,7 +469,19 @@ class CSharpLanguage(object):
def configure(self, config, args):
self.config = config
self.args = args
_check_compiler(self.args.compiler, ['default'])
if self.platform == 'windows':
# Explicitly choosing between x86 and x64 arch doesn't work yet
_check_arch(self.args.arch, ['default'])
self._make_options = [_windows_toolset_option(self.args.compiler),
_windows_arch_option(self.args.arch)]
else:
_check_compiler(self.args.compiler, ['default'])
if self.platform == 'mac':
# On Mac, official distribution of mono is 32bit.
self._make_options = ['EMBED_OPENSSL=true', 'EMBED_ZLIB=true',
'CFLAGS=-arch i386', 'LDFLAGS=-arch i386']
else:
self._make_options = ['EMBED_OPENSSL=true', 'EMBED_ZLIB=true']
def test_specs(self):
with open('src/csharp/tests.json') as f:
@ -511,23 +528,16 @@ class CSharpLanguage(object):
return [['tools/run_tests/pre_build_csharp.sh']]
def make_targets(self):
# For Windows, this target doesn't really build anything,
# everything is build by buildall script later.
if self.platform == 'windows':
return []
else:
return ['grpc_csharp_ext']
return ['grpc_csharp_ext']
def make_options(self):
if self.platform == 'mac':
# On Mac, official distribution of mono is 32bit.
return ['CFLAGS=-arch i386', 'LDFLAGS=-arch i386']
else:
return []
return self._make_options;
def build_steps(self):
if self.platform == 'windows':
return [['src\\csharp\\buildall.bat']]
return [[_windows_build_bat(self.args.compiler),
'src/csharp/Grpc.sln',
'/p:Configuration=%s' % _MSBUILD_CONFIG[self.config.build_config]]]
else:
return [['tools/run_tests/build_csharp.sh']]

@ -1,5 +1,5 @@
@rem Convenience wrapper that runs specified gRPC target using msbuild
@rem Usage: build.bat TARGET_NAME
@rem Usage: build_vs2010.bat TARGET_NAME
setlocal
@rem Set VS variables (uses Visual Studio 2010)

@ -1,5 +1,5 @@
@rem Convenience wrapper that runs specified gRPC target using msbuild
@rem Usage: build.bat TARGET_NAME
@rem Usage: build_vs2013.bat TARGET_NAME
setlocal
@rem Set VS variables (uses Visual Studio 2013)

@ -1,5 +1,5 @@
@rem Convenience wrapper that runs specified gRPC target using msbuild
@rem Usage: build.bat TARGET_NAME
@rem Usage: build_vs2015.bat TARGET_NAME
setlocal
@rem Set VS variables (uses Visual Studio 2015)

Loading…
Cancel
Save