diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 8dc7ed01e28..95fe372c1b6 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -391,19 +391,42 @@ static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} -static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, +static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - if (args->num_subchannels == 0) return NULL; + GPR_ASSERT(args->addresses != NULL); + GPR_ASSERT(args->subchannel_factory != NULL); + + if (args->addresses->naddrs == 0) return NULL; + pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); + p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); - p->num_subchannels = args->num_subchannels; - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "pick_first"); - memcpy(p->subchannels, args->subchannels, - sizeof(grpc_subchannel *) * args->num_subchannels); + gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + grpc_subchannel_args sc_args; + size_t subchannel_idx = 0; + for (size_t i = 0; i < args->addresses->naddrs; i++) { + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); + sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + + grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( + exec_ctx, args->subchannel_factory, &sc_args); + + if (subchannel != NULL) { + p->subchannels[subchannel_idx++] = subchannel; + } + } + if (subchannel_idx == 0) { + gpr_free(p->subchannels); + gpr_free(p); + return NULL; + } + p->num_subchannels = subchannel_idx; + + grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); gpr_mu_init(&p->mu); return &p->base; diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index b1996922bf9..eb6cccdca9f 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -498,30 +498,47 @@ static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} -static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, +static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - size_t i; + GPR_ASSERT(args->addresses != NULL); + GPR_ASSERT(args->subchannel_factory != NULL); + round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(args->num_subchannels > 0); memset(p, 0, sizeof(*p)); - grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); - p->num_subchannels = args->num_subchannels; - p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); - memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels); - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "round_robin"); - gpr_mu_init(&p->mu); - for (i = 0; i < args->num_subchannels; i++) { - subchannel_data *sd = gpr_malloc(sizeof(*sd)); - memset(sd, 0, sizeof(*sd)); - p->subchannels[i] = sd; - sd->policy = p; - sd->index = i; - sd->subchannel = args->subchannels[i]; - grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed, sd); + p->subchannels = + gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + + grpc_subchannel_args sc_args; + size_t subchannel_idx = 0; + for (size_t i = 0; i < args->addresses->naddrs; i++) { + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); + sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + + grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( + exec_ctx, args->subchannel_factory, &sc_args); + + if (subchannel != NULL) { + subchannel_data *sd = gpr_malloc(sizeof(*sd)); + memset(sd, 0, sizeof(*sd)); + p->subchannels[subchannel_idx] = sd; + sd->policy = p; + sd->index = subchannel_idx; + sd->subchannel = subchannel; + ++subchannel_idx; + grpc_closure_init(&sd->connectivity_changed_closure, + rr_connectivity_changed, sd); + } + } + if (subchannel_idx == 0) { + gpr_free(p->subchannels); + gpr_free(p); + return NULL; } + p->num_subchannels = subchannel_idx; /* The (dummy node) root of the ready list */ p->ready_list.subchannel = NULL; @@ -529,6 +546,10 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, p->ready_list.next = NULL; p->ready_list_last_pick = &p->ready_list; + grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "round_robin"); + gpr_mu_init(&p->mu); return &p->base; } diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 67d287ec6b1..1ea202c5431 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -37,23 +37,23 @@ #include #include -/* Copy some arguments */ +/** Copy the arguments in \a src into a new instance */ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); -/* Copy some arguments, stably sorting keys */ -grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a); +/** Copy the arguments in \a src into a new instance, stably sorting keys */ +grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *src); -/** Copy some arguments and add the to_add parameter in the end. - If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */ +/** Copy the arguments in \a src and append \a to_add. If \a to_add is NULL, it + * is equivalent to calling \a grpc_channel_args_copy. */ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add, size_t num_to_add); -/** Copy args from a then args from b into a new channel args */ +/** Concatenate args from \a a and \a b into a new instance */ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, const grpc_channel_args *b); -/** Destroy arguments created by grpc_channel_args_copy */ +/** Destroy arguments created by \a grpc_channel_args_copy */ void grpc_channel_args_destroy(grpc_channel_args *a); /** Reads census_enabled settings from channel args. Returns 1 if census_enabled diff --git a/src/core/lib/client_config/lb_policy_factory.c b/src/core/lib/client_config/lb_policy_factory.c index 2ca6f42f899..ede1d624afb 100644 --- a/src/core/lib/client_config/lb_policy_factory.c +++ b/src/core/lib/client_config/lb_policy_factory.c @@ -42,7 +42,8 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory) { } grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy( - grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { + grpc_exec_ctx* exec_ctx, grpc_lb_policy_factory* factory, + grpc_lb_policy_args* args) { if (factory == NULL) return NULL; - return factory->vtable->create_lb_policy(factory, args); + return factory->vtable->create_lb_policy(exec_ctx, factory, args); } diff --git a/src/core/lib/client_config/lb_policy_factory.h b/src/core/lib/client_config/lb_policy_factory.h index 36eaf178d93..9a93a8ca3f3 100644 --- a/src/core/lib/client_config/lb_policy_factory.h +++ b/src/core/lib/client_config/lb_policy_factory.h @@ -35,7 +35,10 @@ #define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H #include "src/core/lib/client_config/lb_policy.h" -#include "src/core/lib/client_config/subchannel.h" +#include "src/core/lib/client_config/subchannel_factory.h" +#include "src/core/lib/iomgr/resolve_address.h" + +#include "src/core/lib/iomgr/exec_ctx.h" typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; @@ -47,8 +50,8 @@ struct grpc_lb_policy_factory { }; typedef struct grpc_lb_policy_args { - grpc_subchannel **subchannels; - size_t num_subchannels; + grpc_resolved_addresses *addresses; + grpc_subchannel_factory *subchannel_factory; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { @@ -56,7 +59,8 @@ struct grpc_lb_policy_factory_vtable { void (*unref)(grpc_lb_policy_factory *factory); /** Implementation of grpc_lb_policy_factory_create_lb_policy */ - grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory, + grpc_lb_policy *(*create_lb_policy)(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); /** Name for the LB policy this factory implements */ @@ -68,6 +72,7 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory); /** Create a lb_policy instance. */ grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( - grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); + grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args); #endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H */ diff --git a/src/core/lib/client_config/lb_policy_registry.c b/src/core/lib/client_config/lb_policy_registry.c index 82f70ed8c0b..d1dc502b9ab 100644 --- a/src/core/lib/client_config/lb_policy_registry.c +++ b/src/core/lib/client_config/lb_policy_registry.c @@ -74,10 +74,10 @@ static grpc_lb_policy_factory *lookup_factory(const char *name) { return NULL; } -grpc_lb_policy *grpc_lb_policy_create(const char *name, +grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name, grpc_lb_policy_args *args) { grpc_lb_policy_factory *factory = lookup_factory(name); grpc_lb_policy *lb_policy = - grpc_lb_policy_factory_create_lb_policy(factory, args); + grpc_lb_policy_factory_create_lb_policy(exec_ctx, factory, args); return lb_policy; } diff --git a/src/core/lib/client_config/lb_policy_registry.h b/src/core/lib/client_config/lb_policy_registry.h index da3a5d5e797..1ecf7fe39f6 100644 --- a/src/core/lib/client_config/lb_policy_registry.h +++ b/src/core/lib/client_config/lb_policy_registry.h @@ -35,6 +35,7 @@ #define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H #include "src/core/lib/client_config/lb_policy_factory.h" +#include "src/core/lib/iomgr/exec_ctx.h" /** Initialize the registry and set \a default_factory as the factory to be * returned when no name is provided in a lookup */ @@ -48,7 +49,7 @@ void grpc_register_lb_policy(grpc_lb_policy_factory *factory); * * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init * will be returned. */ -grpc_lb_policy *grpc_lb_policy_create(const char *name, +grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name, grpc_lb_policy_args *args); #endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */ diff --git a/src/core/lib/client_config/resolvers/dns_resolver.c b/src/core/lib/client_config/resolvers/dns_resolver.c index ab445730adb..62bccdd0459 100644 --- a/src/core/lib/client_config/resolvers/dns_resolver.c +++ b/src/core/lib/client_config/resolvers/dns_resolver.c @@ -162,38 +162,23 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { dns_resolver *r = arg; grpc_client_config *config = NULL; - grpc_subchannel **subchannels; - grpc_subchannel_args args; grpc_lb_policy *lb_policy; - size_t i; gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = 0; if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); - size_t naddrs = 0; - for (i = 0; i < addresses->naddrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = (size_t)addresses->addrs[i].len; - grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( - exec_ctx, r->subchannel_factory, &args); - if (subchannel != NULL) { - subchannels[naddrs++] = subchannel; - } - } memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = naddrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); + lb_policy_args.addresses = addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); if (lb_policy != NULL) { grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); } grpc_resolved_addresses_destroy(addresses); - gpr_free(subchannels); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); diff --git a/src/core/lib/client_config/resolvers/sockaddr_resolver.c b/src/core/lib/client_config/resolvers/sockaddr_resolver.c index 66cddc3ed90..c787bd57d68 100644 --- a/src/core/lib/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/lib/client_config/resolvers/sockaddr_resolver.c @@ -58,11 +58,7 @@ typedef struct { char *lb_policy_name; /** the addresses that we've 'resolved' */ - struct sockaddr_storage *addrs; - /** the corresponding length of the addresses */ - size_t *addrs_len; - /** how many elements in \a addrs */ - size_t num_addrs; + grpc_resolved_addresses *addresses; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -125,28 +121,14 @@ static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r) { - grpc_client_config *cfg; - grpc_lb_policy *lb_policy; - grpc_lb_policy_args lb_policy_args; - grpc_subchannel **subchannels; - grpc_subchannel_args args; - if (r->next_completion != NULL && !r->published) { - size_t i; - cfg = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs); - for (i = 0; i < r->num_addrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)&r->addrs[i]; - args.addr_len = r->addrs_len[i]; - subchannels[i] = grpc_subchannel_factory_create_subchannel( - exec_ctx, r->subchannel_factory, &args); - } + grpc_client_config *cfg = grpc_client_config_create(); + grpc_lb_policy_args lb_policy_args; memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = r->num_addrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - gpr_free(subchannels); + lb_policy_args.addresses = r->addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + grpc_lb_policy *lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(cfg, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; @@ -160,8 +142,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); - gpr_free(r->addrs); - gpr_free(r->addrs_len); + grpc_resolved_addresses_destroy(r->addresses); gpr_free(r->lb_policy_name); gpr_free(r); } @@ -269,7 +250,6 @@ static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( grpc_resolver_args *args, const char *default_lb_policy_name, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { - size_t i; int errors_found = 0; /* GPR_FALSE */ sockaddr_resolver *r; gpr_slice path_slice; @@ -309,15 +289,18 @@ static grpc_resolver *sockaddr_create( gpr_slice_buffer_init(&path_parts); gpr_slice_split(path_slice, ",", &path_parts); - r->num_addrs = path_parts.count; - r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs); - r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs); + r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->addresses->naddrs = path_parts.count; + r->addresses->addrs = + gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs); - for (i = 0; i < r->num_addrs; i++) { + for (size_t i = 0; i < r->addresses->naddrs; i++) { grpc_uri ith_uri = *args->uri; char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; - if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) { + if (!parse(&ith_uri, + (struct sockaddr_storage *)(&r->addresses->addrs[i].addr), + &r->addresses->addrs[i].len)) { errors_found = 1; /* GPR_TRUE */ } gpr_free(part_str); @@ -328,8 +311,7 @@ static grpc_resolver *sockaddr_create( gpr_slice_unref(path_slice); if (errors_found) { gpr_free(r->lb_policy_name); - gpr_free(r->addrs); - gpr_free(r->addrs_len); + grpc_resolved_addresses_destroy(r->addresses); gpr_free(r); return NULL; } diff --git a/src/core/lib/client_config/resolvers/zookeeper_resolver.c b/src/core/lib/client_config/resolvers/zookeeper_resolver.c index 3bb0bbdf5cd..404dfcd4234 100644 --- a/src/core/lib/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/lib/client_config/resolvers/zookeeper_resolver.c @@ -184,28 +184,22 @@ static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { zookeeper_resolver *r = arg; grpc_client_config *config = NULL; - grpc_subchannel **subchannels; - grpc_subchannel_args args; grpc_lb_policy *lb_policy; - size_t i; + if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); - for (i = 0; i < addresses->naddrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel( - exec_ctx, r->subchannel_factory, &args); + + lb_policy_args.addresses = addresses; + lb_policy_args.subchannel_factory = r->subchannel_factory; + lb_policy = + grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + + if (lb_policy != NULL) { + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); } - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); - gpr_free(subchannels); } gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving == 1); diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.c index 0b14e446ae6..720fc331edc 100644 --- a/src/core/lib/iomgr/pollset_set_windows.c +++ b/src/core/lib/iomgr/pollset_set_windows.c @@ -37,7 +37,7 @@ #include "src/core/lib/iomgr/pollset_set_windows.h" -grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; } +grpc_pollset_set* grpc_pollset_set_create(void) { return NULL; } void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c index db7cac363a3..9b617e13d3c 100644 --- a/src/ruby/ext/grpc/rb_byte_buffer.c +++ b/src/ruby/ext/grpc/rb_byte_buffer.c @@ -50,21 +50,18 @@ grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) { } VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) { - size_t length = 0; - char *string = NULL; - size_t offset = 0; + VALUE rb_string; grpc_byte_buffer_reader reader; gpr_slice next; if (buffer == NULL) { return Qnil; - } - length = grpc_byte_buffer_length(buffer); - string = xmalloc(length + 1); + rb_string = rb_str_buf_new(grpc_byte_buffer_length(buffer)); grpc_byte_buffer_reader_init(&reader, buffer); while (grpc_byte_buffer_reader_next(&reader, &next) != 0) { - memcpy(string + offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next)); - offset += GPR_SLICE_LENGTH(next); + rb_str_cat(rb_string, (const char *) GPR_SLICE_START_PTR(next), + GPR_SLICE_LENGTH(next)); + gpr_slice_unref(next); } - return rb_str_new(string, length); + return rb_string; } diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index cd0aa6aaf20..b0829efdc78 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -551,13 +551,26 @@ static void grpc_run_batch_stack_init(run_batch_stack *st, /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly * cleaned up */ static void grpc_run_batch_stack_cleanup(run_batch_stack *st) { + size_t i = 0; + grpc_metadata_array_destroy(&st->send_metadata); grpc_metadata_array_destroy(&st->send_trailing_metadata); grpc_metadata_array_destroy(&st->recv_metadata); grpc_metadata_array_destroy(&st->recv_trailing_metadata); + if (st->recv_status_details != NULL) { gpr_free(st->recv_status_details); } + + if (st->recv_message != NULL) { + grpc_byte_buffer_destroy(st->recv_message); + } + + for (i = 0; i < st->op_num; i++) { + if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) { + grpc_byte_buffer_destroy(st->ops[i].data.send_message); + } + } } /* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from @@ -643,7 +656,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { break; case GRPC_OP_SEND_MESSAGE: rb_struct_aset(result, sym_send_message, Qtrue); - grpc_byte_buffer_destroy(st->ops[i].data.send_message); break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: rb_struct_aset(result, sym_send_close, Qtrue); diff --git a/test/core/client_config/lb_policies_test.c b/test/core/client_config/lb_policies_test.c index fcb8630cabb..ac42891f946 100644 --- a/test/core/client_config/lb_policies_test.c +++ b/test/core/client_config/lb_policies_test.c @@ -872,6 +872,7 @@ static void verify_rebirth_round_robin(const servers_fixture *f, } int main(int argc, char **argv) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; test_spec *spec; size_t i; const size_t NUM_ITERS = 10; @@ -881,9 +882,9 @@ int main(int argc, char **argv) { grpc_init(); grpc_tracer_set_enabled("round_robin", 1); - GPR_ASSERT(grpc_lb_policy_create("this-lb-policy-does-not-exist", NULL) == - NULL); - GPR_ASSERT(grpc_lb_policy_create(NULL, NULL) == NULL); + GPR_ASSERT(grpc_lb_policy_create(&exec_ctx, "this-lb-policy-does-not-exist", + NULL) == NULL); + GPR_ASSERT(grpc_lb_policy_create(&exec_ctx, NULL, NULL) == NULL); spec = test_spec_create(NUM_ITERS, NUM_SERVERS); /* everything is fine, all servers stay up the whole time and life's peachy */ @@ -935,6 +936,7 @@ int main(int argc, char **argv) { test_pending_calls(4); test_ping(); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; } diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index 8760b8d28e3..3f75a0c92eb 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -58,6 +58,7 @@ using std::chrono::system_clock; const int kNumThreads = 100; // Number of threads const int kNumAsyncSendThreads = 2; const int kNumAsyncReceiveThreads = 50; +const int kNumAsyncServerThreads = 50; const int kNumRpcs = 1000; // Number of RPCs per thread namespace grpc { @@ -174,23 +175,12 @@ class TestServiceImplDupPkg } }; +template class CommonStressTest { public: CommonStressTest() : kMaxMessageSize_(8192) {} - void SetUp() { - int port = grpc_pick_unused_port_or_die(); - server_address_ << "localhost:" << port; - // Setup server - ServerBuilder builder; - builder.AddListeningPort(server_address_.str(), - InsecureServerCredentials()); - builder.RegisterService(&service_); - builder.SetMaxMessageSize( - kMaxMessageSize_); // For testing max message size. - builder.RegisterService(&dup_pkg_service_); - server_ = builder.BuildAndStart(); - } - void TearDown() { server_->Shutdown(); } + virtual void SetUp() = 0; + virtual void TearDown() = 0; void ResetStub() { std::shared_ptr channel = CreateChannel(server_address_.str(), InsecureChannelCredentials()); @@ -198,15 +188,137 @@ class CommonStressTest { } grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); } + protected: + void SetUpStart(ServerBuilder* builder, Service* service) { + int port = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port; + // Setup server + builder->AddListeningPort(server_address_.str(), + InsecureServerCredentials()); + builder->RegisterService(service); + builder->SetMaxMessageSize( + kMaxMessageSize_); // For testing max message size. + builder->RegisterService(&dup_pkg_service_); + } + void SetUpEnd(ServerBuilder* builder) { server_ = builder->BuildAndStart(); } + void TearDownStart() { server_->Shutdown(); } + void TearDownEnd() {} + private: std::unique_ptr stub_; std::unique_ptr server_; std::ostringstream server_address_; const int kMaxMessageSize_; - TestServiceImpl service_; TestServiceImplDupPkg dup_pkg_service_; }; +class CommonStressTestSyncServer : public CommonStressTest { + public: + void SetUp() GRPC_OVERRIDE { + ServerBuilder builder; + SetUpStart(&builder, &service_); + SetUpEnd(&builder); + } + void TearDown() GRPC_OVERRIDE { + TearDownStart(); + TearDownEnd(); + } + + private: + TestServiceImpl service_; +}; + +class CommonStressTestAsyncServer + : public CommonStressTest<::grpc::testing::EchoTestService::AsyncService> { + public: + void SetUp() GRPC_OVERRIDE { + shutting_down_ = false; + ServerBuilder builder; + SetUpStart(&builder, &service_); + cq_ = builder.AddCompletionQueue(); + SetUpEnd(&builder); + contexts_ = new Context[kNumAsyncServerThreads * 100]; + for (int i = 0; i < kNumAsyncServerThreads * 100; i++) { + RefreshContext(i); + } + for (int i = 0; i < kNumAsyncServerThreads; i++) { + server_threads_.push_back( + new std::thread(&CommonStressTestAsyncServer::ProcessRpcs, this)); + } + } + void TearDown() GRPC_OVERRIDE { + { + unique_lock l(mu_); + TearDownStart(); + shutting_down_ = true; + cq_->Shutdown(); + } + + for (int i = 0; i < kNumAsyncServerThreads; i++) { + server_threads_[i]->join(); + delete server_threads_[i]; + } + + void* ignored_tag; + bool ignored_ok; + while (cq_->Next(&ignored_tag, &ignored_ok)) + ; + TearDownEnd(); + delete[] contexts_; + } + + private: + void ProcessRpcs() { + void* tag; + bool ok; + while (cq_->Next(&tag, &ok)) { + if (ok) { + int i = static_cast(reinterpret_cast(tag)); + switch (contexts_[i].state) { + case Context::READY: { + contexts_[i].state = Context::DONE; + EchoResponse send_response; + send_response.set_message(contexts_[i].recv_request.message()); + contexts_[i].response_writer->Finish(send_response, Status::OK, + tag); + break; + } + case Context::DONE: + RefreshContext(i); + break; + } + } + } + } + void RefreshContext(int i) { + unique_lock l(mu_); + if (!shutting_down_) { + contexts_[i].state = Context::READY; + contexts_[i].srv_ctx.reset(new ServerContext); + contexts_[i].response_writer.reset( + new grpc::ServerAsyncResponseWriter( + contexts_[i].srv_ctx.get())); + service_.RequestEcho(contexts_[i].srv_ctx.get(), + &contexts_[i].recv_request, + contexts_[i].response_writer.get(), cq_.get(), + cq_.get(), (void*)(intptr_t)i); + } + } + struct Context { + std::unique_ptr srv_ctx; + std::unique_ptr> + response_writer; + EchoRequest recv_request; + enum { READY, DONE } state; + } * contexts_; + ::grpc::testing::EchoTestService::AsyncService service_; + std::unique_ptr cq_; + bool shutting_down_; + mutex mu_; + std::vector server_threads_; +}; + +template class End2endTest : public ::testing::Test { protected: End2endTest() {} @@ -214,7 +326,7 @@ class End2endTest : public ::testing::Test { void TearDown() GRPC_OVERRIDE { common_.TearDown(); } void ResetStub() { common_.ResetStub(); } - CommonStressTest common_; + Common common_; }; static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { @@ -230,11 +342,16 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) { } } -TEST_F(End2endTest, ThreadStress) { - common_.ResetStub(); +typedef ::testing::Types + CommonTypes; +TYPED_TEST_CASE(End2endTest, CommonTypes); +TYPED_TEST(End2endTest, ThreadStress) { + this->common_.ResetStub(); std::vector threads; for (int i = 0; i < kNumThreads; ++i) { - threads.push_back(new std::thread(SendRpc, common_.GetStub(), kNumRpcs)); + threads.push_back( + new std::thread(SendRpc, this->common_.GetStub(), kNumRpcs)); } for (int i = 0; i < kNumThreads; ++i) { threads[i]->join(); @@ -242,6 +359,7 @@ TEST_F(End2endTest, ThreadStress) { } } +template class AsyncClientEnd2endTest : public ::testing::Test { protected: AsyncClientEnd2endTest() : rpcs_outstanding_(0) {} @@ -309,31 +427,33 @@ class AsyncClientEnd2endTest : public ::testing::Test { } } - CommonStressTest common_; + Common common_; CompletionQueue cq_; mutex mu_; condition_variable cv_; int rpcs_outstanding_; }; -TEST_F(AsyncClientEnd2endTest, ThreadStress) { - common_.ResetStub(); +TYPED_TEST_CASE(AsyncClientEnd2endTest, CommonTypes); +TYPED_TEST(AsyncClientEnd2endTest, ThreadStress) { + this->common_.ResetStub(); std::vector send_threads, completion_threads; for (int i = 0; i < kNumAsyncReceiveThreads; ++i) { completion_threads.push_back(new std::thread( - &AsyncClientEnd2endTest_ThreadStress_Test::AsyncCompleteRpc, this)); + &AsyncClientEnd2endTest_ThreadStress_Test::AsyncCompleteRpc, + this)); } for (int i = 0; i < kNumAsyncSendThreads; ++i) { - send_threads.push_back( - new std::thread(&AsyncClientEnd2endTest_ThreadStress_Test::AsyncSendRpc, - this, kNumRpcs)); + send_threads.push_back(new std::thread( + &AsyncClientEnd2endTest_ThreadStress_Test::AsyncSendRpc, + this, kNumRpcs)); } for (int i = 0; i < kNumAsyncSendThreads; ++i) { send_threads[i]->join(); delete send_threads[i]; } - Wait(); + this->Wait(); for (int i = 0; i < kNumAsyncReceiveThreads; ++i) { completion_threads[i]->join(); delete completion_threads[i]; diff --git a/test/distrib/node/run_distrib_test.sh b/test/distrib/node/run_distrib_test.sh index 9b8f15771b8..13a42fcb0a2 100755 --- a/test/distrib/node/run_distrib_test.sh +++ b/test/distrib/node/run_distrib_test.sh @@ -28,23 +28,31 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +function finish() { + rv=$? + kill $STATIC_PID || true + curl "localhost:32767/drop/$STATIC_PORT" || true + exit $rv +} + +trap finish EXIT + NODE_VERSION=$1 source ~/.nvm/nvm.sh -set -ex cd $(dirname $0) nvm install $NODE_VERSION +set -ex npm install -g node-static -# Kill off existing static servers -kill -9 $(ps aux | grep '[n]ode .*static' | awk '{print $2}') || true - STATIC_SERVER=127.0.0.1 -STATIC_PORT=8080 +# If port_server is running, get port from that. Otherwise, assume we're in +# docker and use 8080 +STATIC_PORT=$(curl 'localhost:32767/get' || echo '8080') -# Serves the input_artifacts directory statically at localhost:8080 +# Serves the input_artifacts directory statically at localhost: static "$EXTERNAL_GIT_ROOT/input_artifacts" -a $STATIC_SERVER -p $STATIC_PORT & STATIC_PID=$! @@ -52,6 +60,4 @@ STATIC_URL="http://$STATIC_SERVER:$STATIC_PORT/" npm install --unsafe-perm $STATIC_URL/grpc.tgz --grpc_node_binary_host_mirror=$STATIC_URL -kill -9 $STATIC_PID - ./distrib_test.js diff --git a/tools/run_tests/build_artifact_node.sh b/tools/run_tests/build_artifact_node.sh index 6aa48245383..ef3476a0381 100755 --- a/tools/run_tests/build_artifact_node.sh +++ b/tools/run_tests/build_artifact_node.sh @@ -30,9 +30,9 @@ NODE_TARGET_ARCH=$1 source ~/.nvm/nvm.sh -set -ex nvm use 4 +set -ex cd $(dirname $0)/../.. diff --git a/tools/run_tests/build_artifact_python.sh b/tools/run_tests/build_artifact_python.sh index 7ba04d75463..1f23f9fade8 100755 --- a/tools/run_tests/build_artifact_python.sh +++ b/tools/run_tests/build_artifact_python.sh @@ -35,15 +35,18 @@ cd $(dirname $0)/../.. if [ "$SKIP_PIP_INSTALL" == "" ] then pip install --upgrade six - pip install --upgrade setuptools + # There's a bug in newer versions of setuptools (see + # https://bitbucket.org/pypa/setuptools/issues/503/pkg_resources_vendorpackagingrequirementsi) + pip install --upgrade 'setuptools==18' pip install -rrequirements.txt fi +export GRPC_PYTHON_USE_CUSTOM_BDIST=0 +export GRPC_PYTHON_BUILD_WITH_CYTHON=1 + # Build the source distribution first because MANIFEST.in cannot override # exclusion of built shared objects among package resources (for some # inexplicable reason). -GRPC_PYTHON_USE_CUSTOM_BDIST=0 \ -GRPC_PYTHON_BUILD_WITH_CYTHON=1 \ ${SETARCH_CMD} python setup.py \ sdist @@ -51,15 +54,11 @@ ${SETARCH_CMD} python setup.py \ # and thus ought to be run in a shell command separate of others. Further, it # trashes the actual bdist_wheel output, so it should be run first so that # bdist_wheel may be run unmolested. -GRPC_PYTHON_USE_CUSTOM_BDIST=0 \ -GRPC_PYTHON_BUILD_WITH_CYTHON=1 \ ${SETARCH_CMD} python setup.py \ build_tagged_ext # Wheel has a bug where directories don't get excluded. # https://bitbucket.org/pypa/wheel/issues/99/cannot-exclude-directory -GRPC_PYTHON_USE_CUSTOM_BDIST=0 \ -GRPC_PYTHON_BUILD_WITH_CYTHON=1 \ ${SETARCH_CMD} python setup.py \ bdist_wheel diff --git a/tools/run_tests/build_node.sh b/tools/run_tests/build_node.sh index 9c4af071856..cbe0e31d2eb 100755 --- a/tools/run_tests/build_node.sh +++ b/tools/run_tests/build_node.sh @@ -31,9 +31,9 @@ NODE_VERSION=$1 source ~/.nvm/nvm.sh -set -ex nvm use $NODE_VERSION +set -ex CONFIG=${CONFIG:-opt} diff --git a/tools/run_tests/build_package_node.sh b/tools/run_tests/build_package_node.sh index aca90a37500..540c8263117 100755 --- a/tools/run_tests/build_package_node.sh +++ b/tools/run_tests/build_package_node.sh @@ -29,9 +29,9 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. source ~/.nvm/nvm.sh -set -ex nvm use 4 +set -ex cd $(dirname $0)/../.. diff --git a/tools/run_tests/pre_build_node.sh b/tools/run_tests/pre_build_node.sh index 11f46d60fc2..1f55df0b7bc 100755 --- a/tools/run_tests/pre_build_node.sh +++ b/tools/run_tests/pre_build_node.sh @@ -31,9 +31,9 @@ NODE_VERSION=$1 source ~/.nvm/nvm.sh -set -ex nvm use $NODE_VERSION +set -ex export GRPC_CONFIG=${CONFIG:-opt} diff --git a/tools/run_tests/run_node.sh b/tools/run_tests/run_node.sh index d33890068dd..b94dc3ec628 100755 --- a/tools/run_tests/run_node.sh +++ b/tools/run_tests/run_node.sh @@ -30,9 +30,9 @@ NODE_VERSION=$1 source ~/.nvm/nvm.sh -set -ex nvm use $NODE_VERSION +set -ex CONFIG=${CONFIG:-opt} diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index a9438045aa6..6b84daea544 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -120,7 +120,12 @@ def get_c_tests(travis, test_lang) : def _check_compiler(compiler, supported_compilers): if compiler not in supported_compilers: - raise Exception('Compiler %s not supported.' % compiler) + raise Exception('Compiler %s not supported (on this platform).' % compiler) + + +def _check_arch(arch, supported_archs): + if arch not in supported_archs: + raise Exception('Architecture %s not supported.' % arch) def _is_use_docker_child(): @@ -464,7 +469,19 @@ class CSharpLanguage(object): def configure(self, config, args): self.config = config self.args = args - _check_compiler(self.args.compiler, ['default']) + if self.platform == 'windows': + # Explicitly choosing between x86 and x64 arch doesn't work yet + _check_arch(self.args.arch, ['default']) + self._make_options = [_windows_toolset_option(self.args.compiler), + _windows_arch_option(self.args.arch)] + else: + _check_compiler(self.args.compiler, ['default']) + if self.platform == 'mac': + # On Mac, official distribution of mono is 32bit. + self._make_options = ['EMBED_OPENSSL=true', 'EMBED_ZLIB=true', + 'CFLAGS=-arch i386', 'LDFLAGS=-arch i386'] + else: + self._make_options = ['EMBED_OPENSSL=true', 'EMBED_ZLIB=true'] def test_specs(self): with open('src/csharp/tests.json') as f: @@ -511,23 +528,16 @@ class CSharpLanguage(object): return [['tools/run_tests/pre_build_csharp.sh']] def make_targets(self): - # For Windows, this target doesn't really build anything, - # everything is build by buildall script later. - if self.platform == 'windows': - return [] - else: - return ['grpc_csharp_ext'] + return ['grpc_csharp_ext'] def make_options(self): - if self.platform == 'mac': - # On Mac, official distribution of mono is 32bit. - return ['CFLAGS=-arch i386', 'LDFLAGS=-arch i386'] - else: - return [] + return self._make_options; def build_steps(self): if self.platform == 'windows': - return [['src\\csharp\\buildall.bat']] + return [[_windows_build_bat(self.args.compiler), + 'src/csharp/Grpc.sln', + '/p:Configuration=%s' % _MSBUILD_CONFIG[self.config.build_config]]] else: return [['tools/run_tests/build_csharp.sh']] diff --git a/vsprojects/build_vs2010.bat b/vsprojects/build_vs2010.bat index 64b0ed5d3f5..1bc3c86a92f 100644 --- a/vsprojects/build_vs2010.bat +++ b/vsprojects/build_vs2010.bat @@ -1,5 +1,5 @@ @rem Convenience wrapper that runs specified gRPC target using msbuild -@rem Usage: build.bat TARGET_NAME +@rem Usage: build_vs2010.bat TARGET_NAME setlocal @rem Set VS variables (uses Visual Studio 2010) diff --git a/vsprojects/build_vs2013.bat b/vsprojects/build_vs2013.bat index be3caa9298c..82c0a3ad824 100644 --- a/vsprojects/build_vs2013.bat +++ b/vsprojects/build_vs2013.bat @@ -1,5 +1,5 @@ @rem Convenience wrapper that runs specified gRPC target using msbuild -@rem Usage: build.bat TARGET_NAME +@rem Usage: build_vs2013.bat TARGET_NAME setlocal @rem Set VS variables (uses Visual Studio 2013) diff --git a/vsprojects/build_vs2015.bat b/vsprojects/build_vs2015.bat index 50485a30f30..c6e1b433a3e 100644 --- a/vsprojects/build_vs2015.bat +++ b/vsprojects/build_vs2015.bat @@ -1,5 +1,5 @@ @rem Convenience wrapper that runs specified gRPC target using msbuild -@rem Usage: build.bat TARGET_NAME +@rem Usage: build_vs2015.bat TARGET_NAME setlocal @rem Set VS variables (uses Visual Studio 2015)