Merge pull request #1277 from ctiller/fast-unref

Batch unref metadata in HTTP/2 stream encoder
pull/1285/head^2
Vijay Pai 10 years ago
commit db38bb1d82
  1. 71
      src/core/security/credentials.c
  2. 3
      src/core/security/credentials.h
  3. 17
      src/core/security/security_context.c
  4. 2
      src/core/security/security_context.h
  5. 3
      src/core/surface/secure_channel_create.c
  6. 82
      src/core/transport/chttp2/stream_encoder.c
  7. 14
      src/core/transport/metadata.c
  8. 12
      src/core/transport/metadata.h

@ -111,6 +111,11 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds,
creds->vtable->get_request_metadata(creds, service_url, cb, user_data);
}
grpc_mdctx *grpc_credentials_get_metadata_context(grpc_credentials *creds) {
if (creds == NULL) return NULL;
return creds->vtable->get_metadata_context(creds);
}
void grpc_server_credentials_release(grpc_server_credentials *creds) {
if (creds == NULL) return;
creds->vtable->destroy(creds);
@ -167,8 +172,13 @@ static int ssl_has_request_metadata_only(const grpc_credentials *creds) {
return 0;
}
static grpc_mdctx *ssl_get_metadata_context(grpc_credentials *creds) {
return NULL;
}
static grpc_credentials_vtable ssl_vtable = {
ssl_destroy, ssl_has_request_metadata, ssl_has_request_metadata_only, NULL};
ssl_destroy, ssl_has_request_metadata, ssl_has_request_metadata_only,
ssl_get_metadata_context, NULL};
static grpc_server_credentials_vtable ssl_server_vtable = {ssl_server_destroy};
@ -371,9 +381,14 @@ static void jwt_get_request_metadata(grpc_credentials *creds,
}
}
static grpc_mdctx *jwt_get_metadata_context(grpc_credentials *creds) {
grpc_jwt_credentials *c = (grpc_jwt_credentials *)creds;
return c->md_ctx;
}
static grpc_credentials_vtable jwt_vtable = {
jwt_destroy, jwt_has_request_metadata, jwt_has_request_metadata_only,
jwt_get_request_metadata};
jwt_get_metadata_context, jwt_get_request_metadata};
grpc_credentials *grpc_jwt_credentials_create(const char *json_key,
gpr_timespec token_lifetime) {
@ -585,11 +600,19 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c,
c->fetch_func = fetch_func;
}
static grpc_mdctx *oauth2_token_fetcher_get_metadata_context(
grpc_credentials *creds) {
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)creds;
return c->md_ctx;
}
/* -- ComputeEngine credentials. -- */
static grpc_credentials_vtable compute_engine_vtable = {
oauth2_token_fetcher_destroy, oauth2_token_fetcher_has_request_metadata,
oauth2_token_fetcher_has_request_metadata_only,
oauth2_token_fetcher_get_metadata_context,
oauth2_token_fetcher_get_request_metadata};
static void compute_engine_fetch_oauth2(
@ -633,6 +656,7 @@ static void service_account_destroy(grpc_credentials *creds) {
static grpc_credentials_vtable service_account_vtable = {
service_account_destroy, oauth2_token_fetcher_has_request_metadata,
oauth2_token_fetcher_has_request_metadata_only,
oauth2_token_fetcher_get_metadata_context,
oauth2_token_fetcher_get_request_metadata};
static void service_account_fetch_oauth2(
@ -706,6 +730,7 @@ static void refresh_token_destroy(grpc_credentials *creds) {
static grpc_credentials_vtable refresh_token_vtable = {
refresh_token_destroy, oauth2_token_fetcher_has_request_metadata,
oauth2_token_fetcher_has_request_metadata_only,
oauth2_token_fetcher_get_metadata_context,
oauth2_token_fetcher_get_request_metadata};
static void refresh_token_fetch_oauth2(
@ -801,9 +826,15 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds,
}
}
static grpc_mdctx *fake_oauth2_get_metadata_context(grpc_credentials *creds) {
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds;
return c->md_ctx;
}
static grpc_credentials_vtable fake_oauth2_vtable = {
fake_oauth2_destroy, fake_oauth2_has_request_metadata,
fake_oauth2_has_request_metadata_only, fake_oauth2_get_request_metadata};
fake_oauth2_has_request_metadata_only, fake_oauth2_get_metadata_context,
fake_oauth2_get_request_metadata};
grpc_credentials *grpc_fake_oauth2_credentials_create(
const char *token_md_value, int is_async) {
@ -842,10 +873,16 @@ static int fake_transport_security_has_request_metadata_only(
return 0;
}
static grpc_mdctx *fake_transport_security_get_metadata_context(
grpc_credentials *c) {
return NULL;
}
static grpc_credentials_vtable fake_transport_security_credentials_vtable = {
fake_transport_security_credentials_destroy,
fake_transport_security_has_request_metadata,
fake_transport_security_has_request_metadata_only, NULL};
fake_transport_security_has_request_metadata_only,
fake_transport_security_get_metadata_context, NULL};
static grpc_server_credentials_vtable
fake_transport_security_server_credentials_vtable = {
@ -995,9 +1032,26 @@ static void composite_get_request_metadata(grpc_credentials *creds,
GPR_ASSERT(0); /* Should have exited before. */
}
static grpc_mdctx *composite_get_metadata_context(grpc_credentials *creds) {
grpc_composite_credentials *c = (grpc_composite_credentials *)creds;
grpc_mdctx *ctx = NULL;
size_t i;
for (i = 0; i < c->inner.num_creds; i++) {
grpc_credentials *inner_creds = c->inner.creds_array[i];
grpc_mdctx *inner_ctx = grpc_credentials_get_metadata_context(inner_creds);
if (inner_ctx) {
GPR_ASSERT(ctx == NULL &&
"can only have one metadata context per composite credential");
ctx = inner_ctx;
}
}
return ctx;
}
static grpc_credentials_vtable composite_credentials_vtable = {
composite_destroy, composite_has_request_metadata,
composite_has_request_metadata_only, composite_get_request_metadata};
composite_has_request_metadata_only, composite_get_metadata_context,
composite_get_request_metadata};
static grpc_credentials_array get_creds_array(grpc_credentials **creds_addr) {
grpc_credentials_array result;
@ -1102,9 +1156,14 @@ static void iam_get_request_metadata(grpc_credentials *creds,
cb(user_data, md_array, 2, GRPC_CREDENTIALS_OK);
}
static grpc_mdctx *iam_get_metadata_context(grpc_credentials *creds) {
grpc_iam_credentials *c = (grpc_iam_credentials *)creds;
return c->md_ctx;
}
static grpc_credentials_vtable iam_vtable = {
iam_destroy, iam_has_request_metadata, iam_has_request_metadata_only,
iam_get_request_metadata};
iam_get_metadata_context, iam_get_request_metadata};
grpc_credentials *grpc_iam_credentials_create(const char *token,
const char *authority_selector) {

@ -94,6 +94,7 @@ typedef struct {
void (*destroy)(grpc_credentials *c);
int (*has_request_metadata)(const grpc_credentials *c);
int (*has_request_metadata_only)(const grpc_credentials *c);
grpc_mdctx *(*get_metadata_context)(grpc_credentials *c);
void (*get_request_metadata)(grpc_credentials *c,
const char *service_url,
grpc_credentials_metadata_cb cb,
@ -114,6 +115,8 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds,
const char *service_url,
grpc_credentials_metadata_cb cb,
void *user_data);
grpc_mdctx *grpc_credentials_get_metadata_context(grpc_credentials *creds);
typedef struct {
unsigned char *pem_private_key;
size_t pem_private_key_size;

@ -165,6 +165,16 @@ static int check_request_metadata_creds(grpc_credentials *creds) {
return 1;
}
static grpc_mdctx *get_or_create_mdctx(grpc_credentials *creds) {
grpc_mdctx *mdctx = grpc_credentials_get_metadata_context(creds);
if (mdctx == NULL) {
mdctx = grpc_mdctx_create();
} else {
grpc_mdctx_ref(mdctx);
}
return mdctx;
}
/* -- Fake implementation. -- */
typedef struct {
@ -626,7 +636,8 @@ grpc_channel *grpc_ssl_channel_create(grpc_credentials *ssl_creds,
arg.key = GRPC_ARG_HTTP2_SCHEME;
arg.value.string = "https";
new_args = grpc_channel_args_copy_and_add(args, &arg);
channel = grpc_secure_channel_create_internal(target, new_args, ctx);
channel = grpc_secure_channel_create_internal(
target, new_args, ctx, get_or_create_mdctx(request_metadata_creds));
grpc_security_context_unref(&ctx->base);
grpc_channel_args_destroy(new_args);
return channel;
@ -637,8 +648,8 @@ grpc_channel *grpc_fake_transport_security_channel_create(
const char *target, const grpc_channel_args *args) {
grpc_channel_security_context *ctx =
grpc_fake_channel_security_context_create(request_metadata_creds, 1);
grpc_channel *channel =
grpc_secure_channel_create_internal(target, args, ctx);
grpc_channel *channel = grpc_secure_channel_create_internal(
target, args, ctx, get_or_create_mdctx(request_metadata_creds));
grpc_security_context_unref(&ctx->base);
return channel;
}

@ -190,7 +190,7 @@ grpc_channel *grpc_fake_transport_security_channel_create(
grpc_channel *grpc_secure_channel_create_internal(
const char *target, const grpc_channel_args *args,
grpc_channel_security_context *ctx);
grpc_channel_security_context *ctx, grpc_mdctx *mdctx);
typedef grpc_channel *(*grpc_secure_channel_factory_func)(
grpc_credentials *transport_security_creds,

@ -205,12 +205,11 @@ static grpc_transport_setup_result complete_setup(void *channel_stack,
- perform handshakes */
grpc_channel *grpc_secure_channel_create_internal(
const char *target, const grpc_channel_args *args,
grpc_channel_security_context *context) {
grpc_channel_security_context *context, grpc_mdctx *mdctx) {
setup *s;
grpc_channel *channel;
grpc_arg context_arg;
grpc_channel_args *args_copy;
grpc_mdctx *mdctx = grpc_mdctx_create();
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
int n = 0;

@ -171,13 +171,15 @@ static gpr_uint8 *add_tiny_header_data(framer_state *st, int len) {
return gpr_slice_buffer_tiny_add(st->output, len);
}
static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
/* add an element to the decoder table: returns metadata element to unref */
static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c,
grpc_mdelem *elem) {
gpr_uint32 key_hash = elem->key->hash;
gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
gpr_uint32 new_index = c->tail_remote_index + c->table_elems + 1;
gpr_uint32 elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) +
GPR_SLICE_LENGTH(elem->value->slice);
int drop_ref;
grpc_mdelem *elem_to_unref;
/* Reserve space for this element in the remote table: if this overflows
the current table, drop elements until it fits, matching the decompressor
@ -204,34 +206,32 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem) {
/* already there: update with new index */
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
drop_ref = 1;
elem_to_unref = elem;
} else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem) {
/* already there (cuckoo): update with new index */
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
drop_ref = 1;
elem_to_unref = elem;
} else if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == NULL) {
/* not there, but a free element: add */
c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem;
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
drop_ref = 0;
elem_to_unref = NULL;
} else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == NULL) {
/* not there (cuckoo), but a free element: add */
c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem;
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
drop_ref = 0;
elem_to_unref = NULL;
} else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] <
c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) {
/* not there: replace oldest */
grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
elem_to_unref = c->entries_elems[HASH_FRAGMENT_2(elem_hash)];
c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem;
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
drop_ref = 0;
} else {
/* not there: replace oldest */
grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
elem_to_unref = c->entries_elems[HASH_FRAGMENT_3(elem_hash)];
c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem;
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
drop_ref = 0;
}
/* do exactly the same for the key (so we can find by that again too) */
@ -257,9 +257,7 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
}
if (drop_ref) {
grpc_mdelem_unref(elem);
}
return elem_to_unref;
}
static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 index,
@ -348,9 +346,9 @@ static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c, gpr_uint32 index) {
c->table_elems - index;
}
/* encode an mdelem, taking ownership of it */
static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
framer_state *st) {
/* encode an mdelem; returns metadata element to unref */
static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
grpc_mdelem *elem, framer_state *st) {
gpr_uint32 key_hash = elem->key->hash;
gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
size_t decoder_space_usage;
@ -366,8 +364,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: complete element (first cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]),
st);
grpc_mdelem_unref(elem);
return;
return elem;
}
if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem &&
@ -375,8 +372,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: complete element (second cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]),
st);
grpc_mdelem_unref(elem);
return;
return elem;
}
/* should this elem be in the table? */
@ -394,12 +390,12 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
add_elem(c, elem);
return add_elem(c, elem);
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
grpc_mdelem_unref(elem);
return elem;
}
return;
abort();
}
indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)];
@ -408,23 +404,24 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
add_elem(c, elem);
return add_elem(c, elem);
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
grpc_mdelem_unref(elem);
return elem;
}
return;
abort();
}
/* no elem, key in the table... fall back to literal emission */
if (should_add_elem) {
emit_lithdr_incidx_v(c, elem, st);
add_elem(c, elem);
return add_elem(c, elem);
} else {
emit_lithdr_noidx_v(c, elem, st);
grpc_mdelem_unref(elem);
return elem;
}
abort();
}
#define STRLEN_LIT(x) (sizeof(x) - 1)
@ -433,11 +430,13 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
framer_state *st) {
char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
grpc_mdelem *mdelem;
grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str);
hpack_enc(c, grpc_mdelem_from_metadata_strings(
mdelem = grpc_mdelem_from_metadata_strings(
c->mdctx, grpc_mdstr_ref(c->timeout_key_str),
grpc_mdstr_from_string(c->mdctx, timeout_str)),
st);
grpc_mdstr_from_string(c->mdctx, timeout_str));
mdelem = hpack_enc(c, mdelem, st);
if (mdelem) grpc_mdelem_unref(mdelem);
}
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) {
@ -542,6 +541,9 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
grpc_stream_op *op;
gpr_uint32 max_take_size;
gpr_uint32 curop = 0;
gpr_uint32 unref_op;
grpc_mdctx *mdctx = compressor->mdctx;
int need_unref = 0;
GPR_ASSERT(stream_id != 0);
@ -564,7 +566,12 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
curop++;
break;
case GRPC_OP_METADATA:
hpack_enc(compressor, op->data.metadata, &st);
/* Encode a metadata element; store the returned value, representing
a metadata element that needs to be unreffed back into the metadata
slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got
updated). After this loop, we'll do a batch unref of elements. */
op->data.metadata = hpack_enc(compressor, op->data.metadata, &st);
need_unref |= op->data.metadata != NULL;
curop++;
break;
case GRPC_OP_DEADLINE:
@ -601,4 +608,15 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
begin_frame(&st, DATA);
}
finish_frame(&st, 1, eof);
if (need_unref) {
grpc_mdctx_lock(mdctx);
for (unref_op = 0; unref_op < curop; unref_op++) {
op = &ops[unref_op];
if (op->type != GRPC_OP_METADATA) continue;
if (!op->data.metadata) continue;
grpc_mdctx_locked_mdelem_unref(mdctx, op->data.metadata);
}
grpc_mdctx_unlock(mdctx);
}
}

@ -555,3 +555,17 @@ gpr_slice grpc_mdstr_as_base64_encoded_and_huffman_compressed(grpc_mdstr *gs) {
unlock(ctx);
return slice;
}
void grpc_mdctx_lock(grpc_mdctx *ctx) { lock(ctx); }
void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *gmd) {
internal_metadata *md = (internal_metadata *)gmd;
grpc_mdctx *elem_ctx = md->context;
GPR_ASSERT(ctx == elem_ctx);
assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1);
if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) {
ctx->mdtab_free++;
}
}
void grpc_mdctx_unlock(grpc_mdctx *ctx) { unlock(ctx); }

@ -135,6 +135,18 @@ void grpc_mdelem_unref(grpc_mdelem *md);
Does not promise that the returned string has no embedded nulls however. */
const char *grpc_mdstr_as_c_string(grpc_mdstr *s);
/* Batch mode metadata functions.
These API's have equivalents above, but allow taking the mdctx just once,
performing a bunch of work, and then leaving the mdctx. */
/* Lock the metadata context: it's only safe to call _locked_ functions against
this context from the calling thread until grpc_mdctx_unlock is called */
void grpc_mdctx_lock(grpc_mdctx *ctx);
/* Unref a metadata element */
void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *elem);
/* Unlock the metadata context */
void grpc_mdctx_unlock(grpc_mdctx *ctx);
#define GRPC_MDSTR_KV_HASH(k_hash, v_hash) (GPR_ROTL((k_hash), 2) ^ (v_hash))
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_METADATA_H */

Loading…
Cancel
Save