Set user data vtable when creating grpc_lb_addresses.

pull/8462/head
Mark D. Roth 8 years ago
parent 98abfd3d64
commit 16883a37ef
  1. 55
      src/core/ext/client_config/lb_policy_factory.c
  2. 23
      src/core/ext/client_config/lb_policy_factory.h
  3. 2
      src/core/ext/client_config/resolver_result.c
  4. 27
      src/core/ext/lb_policy/grpclb/grpclb.c
  5. 4
      src/core/ext/resolver/dns/native/dns_resolver.c
  6. 10
      src/core/ext/resolver/sockaddr/sockaddr_resolver.c
  7. 10
      test/core/end2end/fake_resolver.c

@ -38,19 +38,20 @@
#include "src/core/ext/client_config/lb_policy_factory.h"
grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) {
grpc_lb_addresses* grpc_lb_addresses_create(
size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable) {
grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses));
addresses->num_addresses = num_addresses;
addresses->user_data_vtable = user_data_vtable;
const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
addresses->addresses = gpr_malloc(addresses_size);
memset(addresses->addresses, 0, addresses_size);
return addresses;
}
grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses,
void* (*user_data_copy)(void*)) {
grpc_lb_addresses* new_addresses =
grpc_lb_addresses_create(addresses->num_addresses);
grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) {
grpc_lb_addresses* new_addresses = grpc_lb_addresses_create(
addresses->num_addresses, addresses->user_data_vtable);
memcpy(new_addresses->addresses, addresses->addresses,
sizeof(grpc_lb_address) * addresses->num_addresses);
for (size_t i = 0; i < addresses->num_addresses; ++i) {
@ -58,9 +59,10 @@ grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses,
new_addresses->addresses[i].balancer_name =
gpr_strdup(new_addresses->addresses[i].balancer_name);
}
if (user_data_copy != NULL) {
if (new_addresses->addresses[i].user_data != NULL) {
new_addresses->addresses[i].user_data =
user_data_copy(new_addresses->addresses[i].user_data);
addresses->user_data_vtable->copy(
new_addresses->addresses[i].user_data);
}
}
return new_addresses;
@ -71,6 +73,7 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
bool is_balancer, char* balancer_name,
void* user_data) {
GPR_ASSERT(index < addresses->num_addresses);
if (user_data != NULL) GPR_ASSERT(addresses->user_data_vtable != NULL);
grpc_lb_address* target = &addresses->addresses[index];
memcpy(target->address.addr, address, address_len);
target->address.len = address_len;
@ -79,12 +82,42 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
target->user_data = user_data;
}
void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses,
void (*user_data_destroy)(void*)) {
int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1,
const grpc_lb_addresses* addresses2) {
if (addresses1->num_addresses > addresses2->num_addresses) return 1;
if (addresses1->num_addresses < addresses2->num_addresses) return -1;
if (addresses1->user_data_vtable > addresses2->user_data_vtable) return 1;
if (addresses1->user_data_vtable < addresses2->user_data_vtable) return -1;
for (size_t i = 0; i < addresses1->num_addresses; ++i) {
const grpc_lb_address* target1 = &addresses1->addresses[i];
const grpc_lb_address* target2 = &addresses2->addresses[i];
if (target1->address.len > target2->address.len) return 1;
if (target1->address.len < target2->address.len) return -1;
int retval = memcmp(
target1->address.addr, target2->address.addr, target1->address.len);
if (retval != 0) return retval;
if (target1->is_balancer > target2->is_balancer) return 1;
if (target1->is_balancer < target2->is_balancer) return -1;
const char* balancer_name1 = target1->balancer_name != NULL
? target1->balancer_name : "";
const char* balancer_name2 = target2->balancer_name != NULL
? target2->balancer_name : "";
retval = strcmp(balancer_name1, balancer_name2);
if (retval != 0) return retval;
if (addresses1->user_data_vtable != NULL) {
retval = addresses1->user_data_vtable->cmp(target1->user_data,
target2->user_data);
if (retval != 0) return retval;
}
}
return 0;
}
void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) {
for (size_t i = 0; i < addresses->num_addresses; ++i) {
gpr_free(addresses->addresses[i].balancer_name);
if (user_data_destroy != NULL) {
user_data_destroy(addresses->addresses[i].user_data);
if (addresses->addresses[i].user_data != NULL) {
addresses->user_data_vtable->destroy(addresses->addresses[i].user_data);
}
}
gpr_free(addresses->addresses);

@ -59,19 +59,27 @@ typedef struct grpc_lb_address {
void *user_data;
} grpc_lb_address;
typedef struct grpc_lb_user_data_vtable {
void* (*copy)(void*);
void (*destroy)(void*);
int (*cmp)(void*, void*);
} grpc_lb_user_data_vtable;
typedef struct grpc_lb_addresses {
size_t num_addresses;
grpc_lb_address *addresses;
const grpc_lb_user_data_vtable *user_data_vtable;
} grpc_lb_addresses;
/** Returns a grpc_addresses struct with enough space for
* \a num_addresses addresses. */
grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses);
\a num_addresses addresses. The \a user_data_vtable argument may be
NULL if no user data will be added. */
grpc_lb_addresses *grpc_lb_addresses_create(
size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable);
/** Creates a copy of \a addresses. If \a user_data_copy is not NULL,
* it will be invoked to copy the \a user_data field of each address. */
grpc_lb_addresses *grpc_lb_addresses_copy(grpc_lb_addresses *addresses,
void *(*user_data_copy)(void *));
grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses);
/** Sets the value of the address at index \a index of \a addresses.
* \a address is a socket address of length \a address_len.
@ -81,10 +89,13 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
bool is_balancer, char *balancer_name,
void *user_data);
/** Compares \a addresses1 and \a addresses2. */
int grpc_lb_addresses_cmp(const grpc_lb_addresses *addresses1,
const grpc_lb_addresses *addresses2);
/** Destroys \a addresses. If \a user_data_destroy is not NULL, it will
* be invoked to destroy the \a user_data field of each address. */
void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses,
void (*user_data_destroy)(void *));
void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses);
/** Arguments passed to LB policies. */
/* TODO(roth, ctiller): Consider replacing this struct with

@ -67,7 +67,7 @@ void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
grpc_resolver_result* result) {
if (gpr_unref(&result->refs)) {
gpr_free(result->server_name);
grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */);
grpc_lb_addresses_destroy(result->addresses);
gpr_free(result->lb_policy_name);
grpc_channel_args_destroy(result->channel_args);
gpr_free(result);

@ -338,6 +338,21 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
return true;
}
/* vtable for LB tokens in grpc_lb_addresses. */
static void* lb_token_copy(void *token) {
return token == NULL ? NULL : GRPC_MDELEM_REF(token);
}
static void lb_token_destroy(void *token) {
if (token != NULL) GRPC_MDELEM_UNREF(token);
}
static int lb_token_cmp(void* token1, void* token2) {
if (token1 > token2) return 1;
if (token1 < token2) return -1;
return 0;
}
static const grpc_lb_user_data_vtable lb_token_vtable = {
lb_token_copy, lb_token_destroy, lb_token_cmp};
/* Returns addresses extracted from \a serverlist. */
static grpc_lb_addresses *process_serverlist(
const grpc_grpclb_serverlist *serverlist) {
@ -349,7 +364,8 @@ static grpc_lb_addresses *process_serverlist(
}
if (num_valid == 0) return NULL;
grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid);
grpc_lb_addresses *lb_addresses =
grpc_lb_addresses_create(num_valid, &lb_token_vtable);
/* second pass: actually populate the addresses and LB tokens (aka user data
* to the outside world) to be read by the RR policy during its creation.
@ -410,11 +426,6 @@ static grpc_lb_addresses *process_serverlist(
return lb_addresses;
}
/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */
static void lb_token_destroy(void *token) {
if (token != NULL) GRPC_MDELEM_UNREF(token);
}
/* perform a pick over \a rr_policy. Given that a pick can return immediately
* (ignoring its completion callback) we need to perform the cleanups this
* callback would be otherwise resposible for */
@ -465,7 +476,7 @@ static grpc_lb_policy *create_rr_locked(
if (glb_policy->addresses != NULL) {
/* dispose of the previous version */
grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
grpc_lb_addresses_destroy(glb_policy->addresses);
}
glb_policy->addresses = args.addresses;
@ -662,7 +673,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
gpr_mu_destroy(&glb_policy->mu);
grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
grpc_lb_addresses_destroy(glb_policy->addresses);
gpr_free(glb_policy);
}

@ -170,8 +170,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(r->resolving);
r->resolving = false;
if (r->addresses != NULL) {
grpc_lb_addresses *addresses =
grpc_lb_addresses_create(r->addresses->naddrs);
grpc_lb_addresses *addresses = grpc_lb_addresses_create(
r->addresses->naddrs, NULL /* user_data_vtable */);
for (size_t i = 0; i < r->addresses->naddrs; ++i) {
grpc_lb_addresses_set_address(
addresses, i, &r->addresses->addrs[i].addr,

@ -121,8 +121,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->next_completion != NULL && !r->published) {
r->published = true;
*r->target_result = grpc_resolver_result_create(
r->target_name,
grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
r->target_name, grpc_lb_addresses_copy(r->addresses),
NULL /* lb_policy_name */, grpc_channel_args_copy(r->channel_args));
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
@ -133,7 +132,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
gpr_free(r->target_name);
grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
grpc_lb_addresses_destroy(r->addresses);
grpc_channel_args_destroy(r->channel_args);
gpr_free(r);
}
@ -178,7 +177,8 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
gpr_slice_buffer path_parts;
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
grpc_lb_addresses *addresses = grpc_lb_addresses_create(path_parts.count);
grpc_lb_addresses *addresses = grpc_lb_addresses_create(
path_parts.count, NULL /* user_data_vtable */);
bool errors_found = false;
for (size_t i = 0; i < addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
@ -196,7 +196,7 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
gpr_slice_buffer_destroy(&path_parts);
gpr_slice_unref(path_slice);
if (errors_found) {
grpc_lb_addresses_destroy(addresses, NULL /* user_data_destroy */);
grpc_lb_addresses_destroy(addresses);
return NULL;
}
/* Instantiate resolver. */

@ -78,7 +78,7 @@ static void fake_resolver_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) {
fake_resolver* r = (fake_resolver*)gr;
gpr_mu_destroy(&r->mu);
gpr_free(r->target_name);
grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
grpc_lb_addresses_destroy(r->addresses);
gpr_free(r->lb_policy_name);
grpc_method_config_table_unref(r->method_config_table);
gpr_free(r);
@ -107,8 +107,7 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
lb_policy_args = grpc_channel_args_copy_and_add(NULL /* src */, &arg, 1);
}
*r->target_result = grpc_resolver_result_create(
r->target_name,
grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
r->target_name, grpc_lb_addresses_copy(r->addresses),
r->lb_policy_name, lb_policy_args);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
@ -168,7 +167,8 @@ static grpc_resolver* fake_resolver_create(grpc_resolver_factory* factory,
gpr_slice_buffer path_parts;
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
grpc_lb_addresses* addresses = grpc_lb_addresses_create(path_parts.count);
grpc_lb_addresses* addresses = grpc_lb_addresses_create(
path_parts.count, NULL /* user_data_vtable */);
bool errors_found = false;
for (size_t i = 0; i < addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
@ -187,7 +187,7 @@ static grpc_resolver* fake_resolver_create(grpc_resolver_factory* factory,
gpr_slice_buffer_destroy(&path_parts);
gpr_slice_unref(path_slice);
if (errors_found) {
grpc_lb_addresses_destroy(addresses, NULL /* user_data_destroy */);
grpc_lb_addresses_destroy(addresses);
return NULL;
}
// Construct method config table.

Loading…
Cancel
Save