diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index 1603ffb8be0..eaa215fe8f6 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -33,8 +33,9 @@ #include "src/core/client_config/connector.h" -void grpc_connector_ref(grpc_connector* connector) { +grpc_connector *grpc_connector_ref(grpc_connector* connector) { connector->vtable->ref(connector); + return connector; } void grpc_connector_unref(grpc_exec_ctx* exec_ctx, grpc_connector* connector) { diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index b4482fa2eeb..b301e1bb197 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -81,7 +81,7 @@ struct grpc_connector_vtable { grpc_connect_out_args *out_args, grpc_closure *notify); }; -void grpc_connector_ref(grpc_connector *connector); +grpc_connector *grpc_connector_ref(grpc_connector *connector); void grpc_connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *connector); /** Connect using the connector: max one outstanding call at a time */ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector, diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 9f287c4b03d..bf2f15a4443 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -42,11 +42,11 @@ #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/client_config/initial_connect_string.h" +#include "src/core/client_config/subchannel_index.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" #include "src/core/surface/channel.h" #include "src/core/transport/connectivity_state.h" -#include "src/core/transport/connectivity_state.h" #define INTERNAL_REF_BITS 16 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) @@ -95,6 +95,8 @@ struct grpc_subchannel { struct sockaddr *addr; size_t addr_len; + grpc_subchannel_key *key; + /** initial string to send to peer */ gpr_slice initial_connect_string; @@ -239,6 +241,7 @@ void grpc_subchannel_weak_ref(grpc_subchannel *c static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connected_subchannel *con; + grpc_subchannel_index_unregister(exec_ctx, c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = 1; @@ -277,10 +280,19 @@ static uint32_t random_seed() { return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); } -grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, +grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, + grpc_connector *connector, grpc_subchannel_args *args) { - grpc_subchannel *c = gpr_malloc(sizeof(*c)); + grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args); + grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key); + if (c) { + grpc_subchannel_key_destroy(key); + return c; + } + + c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); + c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; grpc_connector_ref(c->connector); @@ -302,7 +314,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); gpr_mu_init(&c->mu); - return c; + + return grpc_subchannel_index_register(exec_ctx, key, c); } static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 57c7c9dc674..8fd976276b5 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -48,6 +48,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #ifdef GRPC_STREAM_REFCOUNT_DEBUG #define GRPC_SUBCHANNEL_REF(p, r) \ grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \ + grpc_subchannel_ref_from_weak_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) \ grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) \ @@ -66,6 +68,7 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; , const char *file, int line, const char *reason #else #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) +#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) grpc_subchannel_ref_from_weak_ref((p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) #define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \ @@ -146,6 +149,8 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call); struct grpc_subchannel_args { + /* When updating this struct, also update subchannel_index.c */ + /** Channel filters for this channel - wrapped factories will likely want to mutate this */ const grpc_channel_filter **filters; @@ -159,7 +164,8 @@ struct grpc_subchannel_args { }; /** create a subchannel given a connector */ -grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, +grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, + grpc_connector *connector, grpc_subchannel_args *args); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ diff --git a/src/core/client_config/subchannel_index.c b/src/core/client_config/subchannel_index.c index ffe6c1fe931..9f6ecca2959 100644 --- a/src/core/client_config/subchannel_index.c +++ b/src/core/client_config/subchannel_index.c @@ -31,49 +31,83 @@ * */ +#include "src/core/client_config/subchannel_index.h" + +#include + +#include +#include +#include + +#include "src/core/channel/channel_args.h" + /* a map of subchannel_key --> subchannel, used for detecting connections to the same destination in order to share them */ static gpr_avl g_subchannel_index; static gpr_mu g_mu; -struct subchannel_key { - size_t addr_len; - struct sockaddr *addr; - grpc_channel_args *normalized_args; +struct grpc_subchannel_key { + grpc_connector *connector; + grpc_subchannel_args args; }; GPR_TLS_DECL(subchannel_index_exec_ctx); -static subchannel_key *subchannel_key_create(struct sockaddr *sockaddr, size_t addr_len, grpc_channel_args *args) { - subchannel_key *k = gpr_malloc(sizeof(*k)); - k->addr_len = addr_len; - k->addr = gpr_malloc(addr_len); - memcpy(k->addr, addr, addr_len); - k->normalized_args = grpc_channel_args_normalize(args); - return k; +static void enter_ctx(grpc_exec_ctx *exec_ctx) { + GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == 0); + gpr_tls_set(&subchannel_index_exec_ctx, (intptr_t)exec_ctx); +} + +static void leave_ctx(grpc_exec_ctx *exec_ctx) { + GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == (intptr_t)exec_ctx); + gpr_tls_set(&subchannel_index_exec_ctx, 0); +} + +static grpc_exec_ctx *current_ctx() { + grpc_exec_ctx *c = (grpc_exec_ctx *)gpr_tls_get(&subchannel_index_exec_ctx); + GPR_ASSERT(c != NULL); + return c; } -static subchannel_key *subchannel_key_copy(subchannel_key *k) { - subchannel_key *k = gpr_malloc(sizeof(*k)); - k->addr_len = addr_len; - k->addr = gpr_malloc(addr_len); - memcpy(k->addr, addr, addr_len); - k->normalized_args = grpc_channel_args_copy(args); +static grpc_subchannel_key *create_key(grpc_connector *connector, grpc_subchannel_args *args, grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) { + grpc_subchannel_key *k = gpr_malloc(sizeof(*k)); + k->connector = grpc_connector_ref(connector); + k->args.filter_count = args->filter_count; + k->args.filters = gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count); + memcpy(k->args.filters, args->filters, sizeof(*k->args.filters) * k->args.filter_count); + k->args.addr_len = args->addr_len; + k->args.addr = gpr_malloc(args->addr_len); + memcpy(k->args.addr, args->addr, k->args.addr_len); + k->args.args = copy_channel_args(args->args); return k; } -static int subchannel_key_compare(subchannel_key *a, subchannel_key *b) { - int c = GPR_ICMP(a->addr_len, b->addr_len); +grpc_subchannel_key *grpc_subchannel_key_create(grpc_connector *connector, grpc_subchannel_args *args) { + return create_key(connector, args, grpc_channel_args_normalize); +} + +static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) { + return create_key(k->connector, &k->args, grpc_channel_args_copy); +} + +static int subchannel_key_compare(grpc_subchannel_key *a, grpc_subchannel_key *b) { + int c = GPR_ICMP(a->connector, b->connector); + if (c != 0) return c; + c = GPR_ICMP(a->args.addr_len, b->args.addr_len); if (c != 0) return c; - c = memcmp(a->addr, b->addr, a->addr_len); + c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; - return grpc_channel_args_compare(a->normalized_args, b->normalized_args); + c = memcmp(a->args.addr, b->args.addr, a->args.addr_len); + if (c != 0) return c; + c = memcmp(a->args.filters, b->args.filters, a->args.filter_count * sizeof(*a->args.filters)); + return grpc_channel_args_compare(a->args.args, b->args.args); } -static void subchannel_key_destroy(subchannel_key *k) { - gpr_free(k->addr); - grpc_channel_args_destroy(k->normalized_args); +static void subchannel_key_destroy(grpc_subchannel_key *k) { + gpr_free(k->args.addr); + gpr_free(k->args.filters); + grpc_channel_args_destroy((grpc_channel_args*)k->args.args); gpr_free(k); } @@ -85,16 +119,17 @@ static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); } -static void *sck_avl_compare(void *a, void *b) { +static long sck_avl_compare(void *a, void *b) { return subchannel_key_compare(a, b); } static void scv_avl_destroy(void *p) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p, "subchannel_index"); + grpc_exec_ctx *exec_ctx = current_ctx(); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index"); } static void *scv_avl_copy(void *p) { - GRPC_SUBCHANNEL_REF(p, "subchannel_index"); + GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index"); return p; } @@ -107,26 +142,31 @@ static const gpr_avl_vtable subchannel_avl_vtable = { }; grpc_subchannel *grpc_subchannel_index_find( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_subchannel_args *args) { + enter_ctx(ctx); + gpr_mu_lock(&g_mu); gpr_avl index = gpr_avl_ref(g_subchannel_index); gpr_mu_unlock(&g_mu); subchannel_key *key = subchannel_key_create(connector, args); - grpc_subchannel *c = grpc_subchannel_ref(gpr_avl_get(index, key)); + grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key)); subchannel_key_destroy(key); gpr_avl_unref(index); + leave_ctx(ctx); return c; } grpc_subchannel *grpc_subchannel_index_register( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_subchannel_args *args, grpc_subchannel *constructed) { + enter_ctx(ctx); + subchannel_key *key = subchannel_key_create(connector, args); grpc_subchannel *c = NULL; @@ -137,7 +177,7 @@ grpc_subchannel *grpc_subchannel_index_register( c = gpr_avl_get(index, key); if (c != NULL) { - GRPC_SUBCHANNEL_UNREF(constructed); + GRPC_SUBCHANNEL_WEAK_UNREF(constructed); } else { gpr_avl updated = gpr_avl_add(index, key, constructed); @@ -151,5 +191,7 @@ grpc_subchannel *grpc_subchannel_index_register( gpr_avl_unref(index); } + leave_ctx(ctx); + return c; } diff --git a/src/core/client_config/subchannel_index.h b/src/core/client_config/subchannel_index.h index d501e121f1d..dfbc3228d9c 100644 --- a/src/core/client_config/subchannel_index.h +++ b/src/core/client_config/subchannel_index.h @@ -34,6 +34,28 @@ #ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H +#include "src/core/client_config/connector.h" +#include "src/core/client_config/subchannel.h" +typedef struct grpc_subchannel_key grpc_subchannel_key; + +grpc_subchannel_key *grpc_subchannel_key_create( + grpc_connector *con, grpc_subchannel_args *args); + +void grpc_subchannel_key_destroy(grpc_subchannel_key *key); + +grpc_subchannel *grpc_subchannel_index_find( + grpc_exec_ctx *ctx, + grpc_subchannel_key *key); + +grpc_subchannel *grpc_subchannel_index_register( + grpc_exec_ctx *ctx, + grpc_subchannel_key *key, + grpc_subchannel *constructed); + +void grpc_subchannel_index_unregister( + grpc_exec_ctx *ctx, + grpc_subchannel_key *key, + grpc_subchannel *constructed); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */ diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 552a570713d..38f3e28e3da 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -238,7 +238,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; - s = grpc_subchannel_create(&c->base, args); + s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); return s;