Encode method config table in channel args instead of in resolver result.

pull/8303/head
Mark D. Roth 8 years ago
parent 2e8920873d
commit 046cf76469
  1. 65
      src/core/ext/client_config/client_channel.c
  2. 71
      src/core/ext/client_config/method_config.c
  3. 5
      src/core/ext/client_config/method_config.h
  4. 11
      src/core/ext/client_config/resolver_result.c
  5. 6
      src/core/ext/client_config/resolver_result.h
  6. 5
      src/core/ext/lb_policy/grpclb/grpclb.c
  7. 1
      src/core/ext/lb_policy/pick_first/pick_first.c
  8. 1
      src/core/ext/lb_policy/round_robin/round_robin.c
  9. 2
      src/core/ext/resolver/dns/native/dns_resolver.c
  10. 2
      src/core/ext/resolver/sockaddr/sockaddr_resolver.c
  11. 10
      src/core/lib/channel/channel_args.c
  12. 6
      src/core/lib/channel/channel_args.h

@ -43,6 +43,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/method_config.h"
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
@ -70,15 +71,14 @@ typedef struct client_channel_channel_data {
/** client channel factory */
grpc_client_channel_factory *client_channel_factory;
/** mutex protecting client configuration, including all
variables below in this data structure */
/** mutex protecting all variables below in this data structure */
gpr_mu mu;
/** currently active load balancer - guarded by mu */
/** currently active load balancer */
grpc_lb_policy *lb_policy;
/** incoming resolver result - set by resolver.next(), guarded by mu */
grpc_resolver_result *incoming_resolver_result;
/** current resolver result */
grpc_resolver_result *current_resolver_result;
/** method config table */
grpc_method_config_table *method_config_table;
/** incoming resolver result - set by resolver.next() */
grpc_resolver_result *resolver_result;
/** a list of closures that are all waiting for config to come in */
grpc_closure_list waiting_for_config_closures;
/** resolver callback */
@ -176,23 +176,23 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_method_config_table *method_config_table = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
if (chand->incoming_resolver_result != NULL) {
if (chand->resolver_result != NULL) {
grpc_lb_policy_args lb_policy_args;
lb_policy_args.server_name =
grpc_resolver_result_get_server_name(chand->incoming_resolver_result);
grpc_resolver_result_get_server_name(chand->resolver_result);
lb_policy_args.addresses =
grpc_resolver_result_get_addresses(chand->incoming_resolver_result);
lb_policy_args.additional_args = grpc_resolver_result_get_lb_policy_args(
chand->incoming_resolver_result);
grpc_resolver_result_get_addresses(chand->resolver_result);
lb_policy_args.additional_args =
grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy = grpc_lb_policy_create(
exec_ctx,
grpc_resolver_result_get_lb_policy_name(
chand->incoming_resolver_result),
grpc_resolver_result_get_lb_policy_name(chand->resolver_result),
&lb_policy_args);
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "config_change");
@ -200,11 +200,15 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
if (chand->current_resolver_result != NULL) {
grpc_resolver_result_unref(exec_ctx, chand->current_resolver_result);
const grpc_arg *channel_arg = grpc_channel_args_find(
lb_policy_args.additional_args, GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
method_config_table = grpc_method_config_table_ref(
(grpc_method_config_table *)channel_arg->value.pointer.p);
}
chand->current_resolver_result = chand->incoming_resolver_result;
chand->incoming_resolver_result = NULL;
grpc_resolver_result_unref(exec_ctx, chand->resolver_result);
chand->resolver_result = NULL;
}
if (lb_policy != NULL) {
@ -215,6 +219,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&chand->mu);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (chand->method_config_table != NULL) {
grpc_method_config_table_unref(chand->method_config_table);
}
chand->method_config_table = method_config_table;
if (lb_policy != NULL) {
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
@ -238,8 +246,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_resolver_result,
grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
gpr_mu_unlock(&chand->mu);
} else {
@ -376,8 +383,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
if (chand->current_resolver_result != NULL) {
grpc_resolver_result_unref(exec_ctx, chand->current_resolver_result);
if (chand->method_config_table != NULL) {
grpc_method_config_table_unref(chand->method_config_table);
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(chand->interested_parties);
@ -512,11 +519,9 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
/* Get method config. */
// FIXME: need to actually use the config data!
// FIXME: think about refcounting vs. atomicity here
grpc_method_config_table* table = grpc_resolver_result_get_method_configs(
chand->current_resolver_result);
if (table != NULL) {
if (chand->method_config_table != NULL) {
calld->method_config = grpc_method_config_table_get_method_config(
table, calld->path);
chand->method_config_table, calld->path);
}
/* Create call on subchannel. */
grpc_subchannel_call *subchannel_call = NULL;
@ -626,8 +631,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_resolver_result,
grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
}
if (chand->resolver != NULL) {
@ -836,7 +840,7 @@ void grpc_client_channel_finish_initialization(
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, resolver, &chand->incoming_resolver_result,
grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
}
chand->client_channel_factory = client_channel_factory;
@ -858,8 +862,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
if (!chand->started_resolving && chand->resolver != NULL) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
chand->started_resolving = true;
grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_resolver_result,
grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
}
}

@ -33,9 +33,11 @@
#include <string.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/lib/transport/metadata.h"
@ -212,3 +214,72 @@ grpc_method_config* grpc_method_config_table_get_method_config(
}
return grpc_method_config_ref(method_config);
}
static void* copy_arg(void* p) {
return grpc_method_config_table_ref(p);
}
static void destroy_arg(void* p) {
grpc_method_config_table_unref(p);
}
static int cmp_arg(void* p1, void* p2) {
grpc_method_config_table* t1 = p1;
grpc_method_config_table* t2 = p2;
for (size_t i = 0; i < GPR_ARRAY_SIZE(t1->entries); ++i) {
grpc_method_config_table_entry* e1 = &t1->entries[i];
grpc_method_config_table_entry* e2 = &t2->entries[i];
// Compare paths by hash value.
if (e1->path->hash < e2->path->hash) return -1;
if (e1->path->hash > e2->path->hash) return 1;
// Compare wait_for_ready.
const bool wait_for_ready1 =
e1->method_config->wait_for_ready == NULL
? false : *e1->method_config->wait_for_ready;
const bool wait_for_ready2 =
e2->method_config->wait_for_ready == NULL
? false : *e2->method_config->wait_for_ready;
if (wait_for_ready1 < wait_for_ready2) return -1;
if (wait_for_ready1 > wait_for_ready2) return 1;
// Compare timeout.
const gpr_timespec timeout1 =
e1->method_config->timeout == NULL
? gpr_inf_past(GPR_CLOCK_MONOTONIC) : *e1->method_config->timeout;
const gpr_timespec timeout2 =
e2->method_config->timeout == NULL
? gpr_inf_past(GPR_CLOCK_MONOTONIC) : *e2->method_config->timeout;
const int timeout_result = gpr_time_cmp(timeout1, timeout2);
if (timeout_result != 0) return timeout_result;
// Compare max_request_message_bytes.
const int32_t max_request_message_bytes1 =
e1->method_config->max_request_message_bytes == NULL
? -1 : *e1->method_config->max_request_message_bytes;
const int32_t max_request_message_bytes2 =
e2->method_config->max_request_message_bytes == NULL
? -1 : *e2->method_config->max_request_message_bytes;
if (max_request_message_bytes1 < max_request_message_bytes2) return -1;
if (max_request_message_bytes1 > max_request_message_bytes2) return 1;
// Compare max_response_message_bytes.
const int32_t max_response_message_bytes1 =
e1->method_config->max_response_message_bytes == NULL
? -1 : *e1->method_config->max_response_message_bytes;
const int32_t max_response_message_bytes2 =
e2->method_config->max_response_message_bytes == NULL
? -1 : *e2->method_config->max_response_message_bytes;
if (max_response_message_bytes1 < max_response_message_bytes2) return -1;
if (max_response_message_bytes1 > max_response_message_bytes2) return 1;
}
return 0;
}
static grpc_arg_pointer_vtable arg_vtable = {copy_arg, destroy_arg, cmp_arg};
grpc_arg grpc_method_config_table_create_channel_arg(
grpc_method_config_table* table) {
grpc_arg arg;
arg.type = GRPC_ARG_POINTER;
arg.key = GRPC_ARG_SERVICE_CONFIG;
arg.value.pointer.p = grpc_method_config_table_ref(table);
arg.value.pointer.vtable = &arg_vtable;
return arg;
}

@ -35,6 +35,7 @@
#include <stdbool.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/transport/metadata.h"
@ -82,4 +83,8 @@ void grpc_method_config_table_add_method_config(
grpc_method_config* grpc_method_config_table_get_method_config(
grpc_method_config_table* table, grpc_mdstr* path);
/// Returns a channel arg containing \a table.
grpc_arg grpc_method_config_table_create_channel_arg(
grpc_method_config_table* table);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_METHOD_CONFIG_H */

@ -49,13 +49,11 @@ struct grpc_resolver_result {
grpc_lb_addresses* addresses;
char* lb_policy_name;
grpc_channel_args* lb_policy_args;
grpc_method_config_table* method_configs;
};
grpc_resolver_result* grpc_resolver_result_create(
const char* server_name, grpc_lb_addresses* addresses,
const char* lb_policy_name, grpc_channel_args* lb_policy_args,
grpc_method_config_table* method_configs) {
const char* lb_policy_name, grpc_channel_args* lb_policy_args) {
grpc_resolver_result* result = gpr_malloc(sizeof(*result));
memset(result, 0, sizeof(*result));
gpr_ref_init(&result->refs, 1);
@ -63,7 +61,6 @@ grpc_resolver_result* grpc_resolver_result_create(
result->addresses = addresses;
result->lb_policy_name = gpr_strdup(lb_policy_name);
result->lb_policy_args = lb_policy_args;
result->method_configs = grpc_method_config_table_ref(method_configs);
return result;
}
@ -78,7 +75,6 @@ void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */);
gpr_free(result->lb_policy_name);
grpc_channel_args_destroy(result->lb_policy_args);
grpc_method_config_table_unref(result->method_configs);
gpr_free(result);
}
}
@ -101,8 +97,3 @@ grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
grpc_resolver_result* result) {
return result->lb_policy_args;
}
grpc_method_config_table* grpc_resolver_result_get_method_configs(
grpc_resolver_result* result) {
return result->method_configs;
}

@ -33,7 +33,6 @@
#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
#include "src/core/ext/client_config/lb_policy_factory.h"
#include "src/core/ext/client_config/method_config.h"
#include "src/core/lib/iomgr/resolve_address.h"
// TODO(roth, ctiller): In the long term, we are considering replacing
@ -52,8 +51,7 @@ typedef struct grpc_resolver_result grpc_resolver_result;
/// Takes ownership of \a addresses, \a lb_policy_args.
grpc_resolver_result* grpc_resolver_result_create(
const char* server_name, grpc_lb_addresses* addresses,
const char* lb_policy_name, grpc_channel_args* lb_policy_args,
grpc_method_config_table* method_configs);
const char* lb_policy_name, grpc_channel_args* lb_policy_args);
void grpc_resolver_result_ref(grpc_resolver_result* result);
void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
@ -67,7 +65,5 @@ const char* grpc_resolver_result_get_lb_policy_name(
grpc_resolver_result* result);
grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
grpc_resolver_result* result);
grpc_method_config_table* grpc_resolver_result_get_method_configs(
grpc_resolver_result* result);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */

@ -112,6 +112,7 @@
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
@ -285,6 +286,7 @@ typedef struct glb_lb_policy {
const char *server_name;
grpc_client_channel_factory *cc_factory;
grpc_channel_args *args;
/** for communicating with the LB server */
grpc_channel *lb_channel;
@ -442,6 +444,7 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
args.server_name = glb_policy->server_name;
args.client_channel_factory = glb_policy->cc_factory;
args.addresses = process_serverlist(serverlist);
args.additional_args = glb_policy->args;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
@ -567,6 +570,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* Create a client channel over them to communicate with a LB service */
glb_policy->server_name = gpr_strdup(args->server_name);
glb_policy->cc_factory = args->client_channel_factory;
glb_policy->args = grpc_channel_args_copy(args->additional_args);
GPR_ASSERT(glb_policy->cc_factory != NULL);
/* construct a target from the addresses in args, given in the form
@ -633,6 +637,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GPR_ASSERT(glb_policy->pending_picks == NULL);
GPR_ASSERT(glb_policy->pending_pings == NULL);
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(glb_policy->args);
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = NULL;
grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);

@ -470,6 +470,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr);
sc_args.addr_len = args->addresses->addresses[i].address.len;
sc_args.args = args->additional_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);

@ -633,6 +633,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr);
sc_args.addr_len = args->addresses->addresses[i].address.len;
sc_args.args = args->additional_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);

@ -181,7 +181,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_resolved_addresses_destroy(r->addresses);
result = grpc_resolver_result_create(r->target_name, addresses,
r->lb_policy_name, NULL, NULL);
r->lb_policy_name, NULL);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);

@ -122,7 +122,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
r->published = true;
*r->target_result = grpc_resolver_result_create(
"", grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
r->lb_policy_name, NULL, NULL);
r->lb_policy_name, NULL);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}

@ -272,6 +272,16 @@ int grpc_channel_args_compare(const grpc_channel_args *a,
return 0;
}
const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
const char *name) {
if (args != NULL) {
for (size_t i = 0; i < args->num_args; ++i) {
if (args->args[i].key == name) return &args->args[i];
}
}
return NULL;
}
int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) {
if (arg->type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key);

@ -89,6 +89,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
/** Returns the value of argument \a name from \a args, or NULL if not found.
Note: \a name is matched using pointer equality, so it must be the
same instance of the string used to create the grpc_arg key. */
const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
const char *name);
typedef struct grpc_integer_options {
int default_value; // Return this if value is outside of expected bounds.
int min_value;

Loading…
Cancel
Save