Added overloads for metadata add/remove operations when static index is known.

In several cases, we wish to add or remove metadata when we already know what
kind of metadata it is (e.g. we know we're dealing with the path, or the
authority, or the user agent, etc.).

In these cases, we do not need to re-compute the metadata batch callout index
since we know it a-priori. This saves us some branches and ALU ops spent
pointlessly re-computing these indices in several hot-path filters.

We do need the original methods where we do compute the indices in cases where
we're operating over a collection of metadata, but this is relatively uncommon.
pull/19713/head
Arjun Roy 5 years ago
parent ea3f4dd8d9
commit 6cf05561ce
  1. 5
      src/core/ext/filters/client_channel/client_channel.cc
  2. 3
      src/core/ext/filters/client_channel/health/health_check_client.cc
  3. 18
      src/core/ext/filters/http/client/http_client_filter.cc
  4. 2
      src/core/ext/filters/http/client_authority_filter.cc
  5. 17
      src/core/ext/filters/http/message_compress/message_compress_filter.cc
  6. 78
      src/core/ext/filters/http/server/http_server_filter.cc
  7. 12
      src/core/lib/surface/call.cc
  8. 11
      src/core/lib/surface/server.cc
  9. 88
      src/core/lib/transport/metadata_batch.cc
  10. 38
      src/core/lib/transport/metadata_batch.h
  11. 6
      src/cpp/ext/filters/census/client_filter.cc
  12. 7
      src/cpp/ext/filters/census/server_filter.cc

@ -3186,8 +3186,7 @@ void CallData::AddRetriableSendInitialMetadataOp(
if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
.grpc_previous_rpc_attempts != nullptr)) {
grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
retry_state->send_initial_metadata.idx.named
.grpc_previous_rpc_attempts);
GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
}
if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
grpc_mdelem retry_md = grpc_mdelem_create(
@ -3197,7 +3196,7 @@ void CallData::AddRetriableSendInitialMetadataOp(
&retry_state->send_initial_metadata,
&retry_state
->send_initial_metadata_storage[send_initial_metadata_.list.count],
retry_md);
retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
gpr_log(GPR_ERROR, "error adding retry metadata: %s",
grpc_error_string(error));

@ -346,7 +346,8 @@ void HealthCheckClient::CallState::StartCall() {
&send_initial_metadata_, &path_metadata_storage_,
grpc_mdelem_from_slices(
GRPC_MDSTR_PATH,
GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH));
GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH),
GRPC_BATCH_PATH);
GPR_ASSERT(error == GRPC_ERROR_NONE);
payload_.send_initial_metadata.send_initial_metadata =
&send_initial_metadata_;

@ -109,7 +109,7 @@ static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem,
if (b->idx.named.grpc_status != nullptr ||
grpc_mdelem_static_value_eq(b->idx.named.status->md,
GRPC_MDELEM_STATUS_200)) {
grpc_metadata_batch_remove(b, b->idx.named.status);
grpc_metadata_batch_remove(b, GRPC_BATCH_STATUS);
} else {
char* val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md),
GPR_DUMP_ASCII);
@ -167,7 +167,7 @@ static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem,
gpr_free(val);
}
}
grpc_metadata_batch_remove(b, b->idx.named.content_type);
grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_TYPE);
}
return GRPC_ERROR_NONE;
@ -336,7 +336,7 @@ static grpc_error* update_path_for_get(grpc_call_element* elem,
static void remove_if_present(grpc_metadata_batch* batch,
grpc_metadata_batch_callouts_index idx) {
if (batch->idx.array[idx] != nullptr) {
grpc_metadata_batch_remove(batch, batch->idx.array[idx]);
grpc_metadata_batch_remove(batch, idx);
}
}
@ -433,23 +433,25 @@ static void hc_start_transport_stream_op_batch(
layer headers. */
error = grpc_metadata_batch_add_head(
batch->payload->send_initial_metadata.send_initial_metadata,
&calld->method, method);
&calld->method, method, GRPC_BATCH_METHOD);
if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_head(
batch->payload->send_initial_metadata.send_initial_metadata,
&calld->scheme, channeld->static_scheme);
&calld->scheme, channeld->static_scheme, GRPC_BATCH_SCHEME);
if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail(
batch->payload->send_initial_metadata.send_initial_metadata,
&calld->te_trailers, GRPC_MDELEM_TE_TRAILERS);
&calld->te_trailers, GRPC_MDELEM_TE_TRAILERS, GRPC_BATCH_TE);
if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail(
batch->payload->send_initial_metadata.send_initial_metadata,
&calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
&calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC,
GRPC_BATCH_CONTENT_TYPE);
if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail(
batch->payload->send_initial_metadata.send_initial_metadata,
&calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent));
&calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent),
GRPC_BATCH_USER_AGENT);
if (error != GRPC_ERROR_NONE) goto done;
}

@ -60,7 +60,7 @@ void authority_start_transport_stream_op_batch(
initial_metadata->idx.named.authority == nullptr) {
grpc_error* error = grpc_metadata_batch_add_head(
initial_metadata, &calld->authority_storage,
GRPC_MDELEM_REF(chand->default_authority_mdelem));
GRPC_MDELEM_REF(chand->default_authority_mdelem), GRPC_BATCH_AUTHORITY);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(batch, error,
calld->call_combiner);

@ -137,9 +137,8 @@ static grpc_compression_algorithm find_compression_algorithm(
&compression_algorithm));
// Remove this metadata since it's an internal one (i.e., it won't be
// transmitted out).
grpc_metadata_batch_remove(
initial_metadata,
initial_metadata->idx.named.grpc_internal_encoding_request);
grpc_metadata_batch_remove(initial_metadata,
GRPC_BATCH_GRPC_INTERNAL_ENCODING_REQUEST);
// Check if that algorithm is enabled. Note that GRPC_COMPRESS_NONE is always
// enabled.
// TODO(juanlishen): Maybe use channel default or abort() if the algorithm
@ -195,19 +194,22 @@ static grpc_error* process_send_initial_metadata(
error = grpc_metadata_batch_add_tail(
initial_metadata, &calld->message_compression_algorithm_storage,
grpc_message_compression_encoding_mdelem(
calld->message_compression_algorithm));
calld->message_compression_algorithm),
GRPC_BATCH_GRPC_ENCODING);
} else if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) {
initialize_state(elem, calld);
error = grpc_metadata_batch_add_tail(
initial_metadata, &calld->stream_compression_algorithm_storage,
grpc_stream_compression_encoding_mdelem(stream_compression_algorithm));
grpc_stream_compression_encoding_mdelem(stream_compression_algorithm),
GRPC_BATCH_CONTENT_ENCODING);
}
if (error != GRPC_ERROR_NONE) return error;
// Convey supported compression algorithms.
error = grpc_metadata_batch_add_tail(
initial_metadata, &calld->accept_encoding_storage,
GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
channeld->enabled_message_compression_algorithms_bitset));
channeld->enabled_message_compression_algorithms_bitset),
GRPC_BATCH_GRPC_ACCEPT_ENCODING);
if (error != GRPC_ERROR_NONE) return error;
// Do not overwrite accept-encoding header if it already presents (e.g., added
// by some proxy).
@ -215,7 +217,8 @@ static grpc_error* process_send_initial_metadata(
error = grpc_metadata_batch_add_tail(
initial_metadata, &calld->accept_stream_encoding_storage,
GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS(
channeld->enabled_stream_compression_algorithms_bitset));
channeld->enabled_stream_compression_algorithms_bitset),
GRPC_BATCH_ACCEPT_ENCODING);
}
return error;
}

@ -124,6 +124,32 @@ static void hs_add_error(const char* error_name, grpc_error** cumulative,
*cumulative = grpc_error_add_child(*cumulative, new_err);
}
// Metadata equality within this filter leverages the fact that the sender was
// likely using the gRPC chttp2 transport, in which case the encoder would emit
// indexed values, in which case the local hpack parser would intern the
// relevant metadata, allowing a simple pointer comparison.
//
// That said, if the header was transmitted sans indexing/encoding, we still
// need to do the right thing.
//
// Assumptions:
// 1) The keys for a and b_static must match
// 2) b_static must be a statically allocated metadata object.
// 3) It is assumed that the remote end is indexing, but not necessary.
// TODO(arjunroy): Revisit this method when grpc_mdelem is strongly typed.
static bool md_strict_equal(grpc_mdelem a, grpc_mdelem b_static) {
// Hpack encoder on the remote side should emit indexed values, in which case
// hpack parser on this end should pick up interned values, in which case the
// pointer comparison alone is enough.
//
if (GPR_LIKELY(GRPC_MDELEM_IS_INTERNED(a))) {
return a.payload == b_static.payload;
} else {
return grpc_slice_eq_static_interned(GRPC_MDVALUE(a),
GRPC_MDVALUE(b_static));
}
}
static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
grpc_metadata_batch* b) {
call_data* calld = static_cast<call_data*>(elem->call_data);
@ -131,19 +157,18 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
static const char* error_name = "Failed processing incoming headers";
if (b->idx.named.method != nullptr) {
if (grpc_mdelem_static_value_eq(b->idx.named.method->md,
GRPC_MDELEM_METHOD_POST)) {
if (md_strict_equal(b->idx.named.method->md, GRPC_MDELEM_METHOD_POST)) {
*calld->recv_initial_metadata_flags &=
~(GRPC_INITIAL_METADATA_CACHEABLE_REQUEST |
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
} else if (grpc_mdelem_static_value_eq(b->idx.named.method->md,
GRPC_MDELEM_METHOD_PUT)) {
} else if (md_strict_equal(b->idx.named.method->md,
GRPC_MDELEM_METHOD_PUT)) {
*calld->recv_initial_metadata_flags &=
~GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
*calld->recv_initial_metadata_flags |=
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
} else if (grpc_mdelem_static_value_eq(b->idx.named.method->md,
GRPC_MDELEM_METHOD_GET)) {
} else if (md_strict_equal(b->idx.named.method->md,
GRPC_MDELEM_METHOD_GET)) {
*calld->recv_initial_metadata_flags |=
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
*calld->recv_initial_metadata_flags &=
@ -154,7 +179,7 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"),
b->idx.named.method->md));
}
grpc_metadata_batch_remove(b, b->idx.named.method);
grpc_metadata_batch_remove(b, GRPC_BATCH_METHOD);
} else {
hs_add_error(
error_name, &error,
@ -171,7 +196,7 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"),
b->idx.named.te->md));
}
grpc_metadata_batch_remove(b, b->idx.named.te);
grpc_metadata_batch_remove(b, GRPC_BATCH_TE);
} else {
hs_add_error(error_name, &error,
grpc_error_set_str(
@ -180,10 +205,8 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
}
if (b->idx.named.scheme != nullptr) {
if (!grpc_mdelem_static_value_eq(b->idx.named.scheme->md,
GRPC_MDELEM_SCHEME_HTTP) &&
!grpc_mdelem_static_value_eq(b->idx.named.scheme->md,
GRPC_MDELEM_SCHEME_HTTPS) &&
if (!md_strict_equal(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTP) &&
!md_strict_equal(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTPS) &&
!grpc_mdelem_static_value_eq(b->idx.named.scheme->md,
GRPC_MDELEM_SCHEME_GRPC)) {
hs_add_error(error_name, &error,
@ -191,7 +214,7 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"),
b->idx.named.scheme->md));
}
grpc_metadata_batch_remove(b, b->idx.named.scheme);
grpc_metadata_batch_remove(b, GRPC_BATCH_SCHEME);
} else {
hs_add_error(
error_name, &error,
@ -227,7 +250,7 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
gpr_free(val);
}
}
grpc_metadata_batch_remove(b, b->idx.named.content_type);
grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_TYPE);
}
if (b->idx.named.path == nullptr) {
@ -282,12 +305,13 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
grpc_linked_mdelem* el = b->idx.named.host;
grpc_mdelem md = GRPC_MDELEM_REF(el->md);
grpc_metadata_batch_remove(b, el);
hs_add_error(error_name, &error,
grpc_metadata_batch_add_head(
b, el,
grpc_mdelem_from_slices(
GRPC_MDSTR_AUTHORITY,
grpc_slice_ref_internal(GRPC_MDVALUE(md)))));
hs_add_error(
error_name, &error,
grpc_metadata_batch_add_head(
b, el,
grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY,
grpc_slice_ref_internal(GRPC_MDVALUE(md))),
GRPC_BATCH_AUTHORITY));
GRPC_MDELEM_UNREF(md);
}
@ -301,7 +325,7 @@ static grpc_error* hs_filter_incoming_metadata(grpc_call_element* elem,
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
if (!chand->surface_user_agent && b->idx.named.user_agent != nullptr) {
grpc_metadata_batch_remove(b, b->idx.named.user_agent);
grpc_metadata_batch_remove(b, GRPC_BATCH_USER_AGENT);
}
return error;
@ -392,15 +416,17 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem,
if (op->send_initial_metadata) {
grpc_error* error = GRPC_ERROR_NONE;
static const char* error_name = "Failed sending initial metadata";
hs_add_error(error_name, &error,
grpc_metadata_batch_add_head(
op->payload->send_initial_metadata.send_initial_metadata,
&calld->status, GRPC_MDELEM_STATUS_200));
hs_add_error(
error_name, &error,
grpc_metadata_batch_add_head(
op->payload->send_initial_metadata.send_initial_metadata,
&calld->status, GRPC_MDELEM_STATUS_200, GRPC_BATCH_STATUS));
hs_add_error(error_name, &error,
grpc_metadata_batch_add_tail(
op->payload->send_initial_metadata.send_initial_metadata,
&calld->content_type,
GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC));
GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC,
GRPC_BATCH_CONTENT_TYPE));
hs_add_error(
error_name, &error,
hs_filter_outgoing_metadata(

@ -1007,13 +1007,13 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
set_incoming_stream_compression_algorithm(
call, decode_stream_compression(b->idx.named.content_encoding->md));
grpc_metadata_batch_remove(b, b->idx.named.content_encoding);
grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
}
if (b->idx.named.grpc_encoding != nullptr) {
GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
set_incoming_message_compression_algorithm(
call, decode_message_compression(b->idx.named.grpc_encoding->md));
grpc_metadata_batch_remove(b, b->idx.named.grpc_encoding);
grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
}
uint32_t message_encodings_accepted_by_peer = 1u;
uint32_t stream_encodings_accepted_by_peer = 1u;
@ -1021,13 +1021,13 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
&message_encodings_accepted_by_peer, false);
grpc_metadata_batch_remove(b, b->idx.named.grpc_accept_encoding);
grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
}
if (b->idx.named.accept_encoding != nullptr) {
GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
&stream_encodings_accepted_by_peer, true);
grpc_metadata_batch_remove(b, b->idx.named.accept_encoding);
grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
}
call->encodings_accepted_by_peer =
grpc_compression_bitset_from_message_stream_compression_bitset(
@ -1059,13 +1059,13 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
error = grpc_error_set_str(
error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
grpc_metadata_batch_remove(b, b->idx.named.grpc_message);
grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
} else if (error != GRPC_ERROR_NONE) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_empty_slice());
}
set_final_status(call, GRPC_ERROR_REF(error));
grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
GRPC_ERROR_UNREF(error);
} else if (!call->is_client) {
set_final_status(call, GRPC_ERROR_NONE);

@ -744,19 +744,18 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
grpc_millis op_deadline;
if (error == GRPC_ERROR_NONE) {
GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
GPR_ASSERT(calld->recv_initial_metadata->idx.named.authority != nullptr);
GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.authority !=
nullptr);
calld->path = grpc_slice_ref_internal(
GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
calld->host = grpc_slice_ref_internal(
GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
calld->path_set = true;
calld->host_set = true;
grpc_metadata_batch_remove(calld->recv_initial_metadata, GRPC_BATCH_PATH);
grpc_metadata_batch_remove(calld->recv_initial_metadata,
calld->recv_initial_metadata->idx.named.path);
grpc_metadata_batch_remove(
calld->recv_initial_metadata,
calld->recv_initial_metadata->idx.named.authority);
GRPC_BATCH_AUTHORITY);
} else {
GRPC_ERROR_REF(error);
}

@ -93,6 +93,23 @@ grpc_error* grpc_attach_md_to_error(grpc_error* src, grpc_mdelem md) {
return out;
}
static grpc_error* GPR_ATTRIBUTE_NOINLINE error_with_md(grpc_mdelem md) {
return grpc_attach_md_to_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unallowed duplicate metadata"), md);
}
static grpc_error* link_callout(grpc_metadata_batch* batch,
grpc_linked_mdelem* storage,
grpc_metadata_batch_callouts_index idx) {
GPR_DEBUG_ASSERT(idx >= 0 && idx < GRPC_BATCH_CALLOUTS_COUNT);
if (GPR_LIKELY(batch->idx.array[idx] == nullptr)) {
++batch->list.default_count;
batch->idx.array[idx] = storage;
return GRPC_ERROR_NONE;
}
return error_with_md(storage->md);
}
static grpc_error* maybe_link_callout(grpc_metadata_batch* batch,
grpc_linked_mdelem* storage)
GRPC_MUST_USE_RESULT;
@ -104,14 +121,7 @@ static grpc_error* maybe_link_callout(grpc_metadata_batch* batch,
if (idx == GRPC_BATCH_CALLOUTS_COUNT) {
return GRPC_ERROR_NONE;
}
if (GPR_LIKELY(batch->idx.array[idx] == nullptr)) {
++batch->list.default_count;
batch->idx.array[idx] = storage;
return GRPC_ERROR_NONE;
}
return grpc_attach_md_to_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unallowed duplicate metadata"),
storage->md);
return link_callout(batch, storage, idx);
}
static void maybe_unlink_callout(grpc_metadata_batch* batch,
@ -122,21 +132,21 @@ static void maybe_unlink_callout(grpc_metadata_batch* batch,
return;
}
--batch->list.default_count;
GPR_ASSERT(batch->idx.array[idx] != nullptr);
GPR_DEBUG_ASSERT(batch->idx.array[idx] != nullptr);
batch->idx.array[idx] = nullptr;
}
grpc_error* grpc_metadata_batch_add_head(grpc_metadata_batch* batch,
grpc_linked_mdelem* storage,
grpc_mdelem elem_to_add) {
GPR_ASSERT(!GRPC_MDISNULL(elem_to_add));
GPR_DEBUG_ASSERT(!GRPC_MDISNULL(elem_to_add));
storage->md = elem_to_add;
return grpc_metadata_batch_link_head(batch, storage);
}
static void link_head(grpc_mdelem_list* list, grpc_linked_mdelem* storage) {
assert_valid_list(list);
GPR_ASSERT(!GRPC_MDISNULL(storage->md));
GPR_DEBUG_ASSERT(!GRPC_MDISNULL(storage->md));
storage->prev = nullptr;
storage->next = list->head;
storage->reserved = nullptr;
@ -163,17 +173,35 @@ grpc_error* grpc_metadata_batch_link_head(grpc_metadata_batch* batch,
return GRPC_ERROR_NONE;
}
// TODO(arjunroy): Need to revisit this and see what guarantees exist between
// C-core and the internal-metadata subsystem. E.g. can we ensure a particular
// metadata is never added twice, even in the presence of user supplied data?
grpc_error* grpc_metadata_batch_link_head(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_metadata_batch_callouts_index idx) {
GPR_DEBUG_ASSERT(GRPC_BATCH_INDEX_OF(GRPC_MDKEY(storage->md)) == idx);
assert_valid_callouts(batch);
grpc_error* err = link_callout(batch, storage, idx);
if (GPR_UNLIKELY(err != GRPC_ERROR_NONE)) {
assert_valid_callouts(batch);
return err;
}
link_head(&batch->list, storage);
assert_valid_callouts(batch);
return GRPC_ERROR_NONE;
}
grpc_error* grpc_metadata_batch_add_tail(grpc_metadata_batch* batch,
grpc_linked_mdelem* storage,
grpc_mdelem elem_to_add) {
GPR_ASSERT(!GRPC_MDISNULL(elem_to_add));
GPR_DEBUG_ASSERT(!GRPC_MDISNULL(elem_to_add));
storage->md = elem_to_add;
return grpc_metadata_batch_link_tail(batch, storage);
}
static void link_tail(grpc_mdelem_list* list, grpc_linked_mdelem* storage) {
assert_valid_list(list);
GPR_ASSERT(!GRPC_MDISNULL(storage->md));
GPR_DEBUG_ASSERT(!GRPC_MDISNULL(storage->md));
storage->prev = list->tail;
storage->next = nullptr;
storage->reserved = nullptr;
@ -200,6 +228,21 @@ grpc_error* grpc_metadata_batch_link_tail(grpc_metadata_batch* batch,
return GRPC_ERROR_NONE;
}
grpc_error* grpc_metadata_batch_link_tail(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_metadata_batch_callouts_index idx) {
GPR_DEBUG_ASSERT(GRPC_BATCH_INDEX_OF(GRPC_MDKEY(storage->md)) == idx);
assert_valid_callouts(batch);
grpc_error* err = link_callout(batch, storage, idx);
if (GPR_UNLIKELY(err != GRPC_ERROR_NONE)) {
assert_valid_callouts(batch);
return err;
}
link_tail(&batch->list, storage);
assert_valid_callouts(batch);
return GRPC_ERROR_NONE;
}
static void unlink_storage(grpc_mdelem_list* list,
grpc_linked_mdelem* storage) {
assert_valid_list(list);
@ -226,6 +269,18 @@ void grpc_metadata_batch_remove(grpc_metadata_batch* batch,
assert_valid_callouts(batch);
}
void grpc_metadata_batch_remove(grpc_metadata_batch* batch,
grpc_metadata_batch_callouts_index idx) {
assert_valid_callouts(batch);
grpc_linked_mdelem* storage = batch->idx.array[idx];
GPR_DEBUG_ASSERT(storage != nullptr);
--batch->list.default_count;
batch->idx.array[idx] = nullptr;
unlink_storage(&batch->list, storage);
GRPC_MDELEM_UNREF(storage->md);
assert_valid_callouts(batch);
}
void grpc_metadata_batch_set_value(grpc_linked_mdelem* storage,
const grpc_slice& value) {
grpc_mdelem old_mdelem = storage->md;
@ -313,13 +368,14 @@ void grpc_metadata_batch_copy(grpc_metadata_batch* src,
size_t i = 0;
for (grpc_linked_mdelem* elem = src->list.head; elem != nullptr;
elem = elem->next) {
grpc_error* error = grpc_metadata_batch_add_tail(dst, &storage[i++],
GRPC_MDELEM_REF(elem->md));
// Error unused in non-debug builds.
grpc_error* GRPC_UNUSED error = grpc_metadata_batch_add_tail(
dst, &storage[i++], GRPC_MDELEM_REF(elem->md));
// The only way that grpc_metadata_batch_add_tail() can fail is if
// there's a duplicate entry for a callout. However, that can't be
// the case here, because we would not have been allowed to create
// a source batch that had that kind of conflict.
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_DEBUG_ASSERT(error == GRPC_ERROR_NONE);
}
}

@ -67,6 +67,8 @@ size_t grpc_metadata_batch_size(grpc_metadata_batch* batch);
/** Remove \a storage from the batch, unreffing the mdelem contained */
void grpc_metadata_batch_remove(grpc_metadata_batch* batch,
grpc_linked_mdelem* storage);
void grpc_metadata_batch_remove(grpc_metadata_batch* batch,
grpc_metadata_batch_callouts_index idx);
/** Substitute a new mdelem for an old value */
grpc_error* grpc_metadata_batch_substitute(grpc_metadata_batch* batch,
@ -84,6 +86,9 @@ void grpc_metadata_batch_set_value(grpc_linked_mdelem* storage,
grpc_error* grpc_metadata_batch_link_head(grpc_metadata_batch* batch,
grpc_linked_mdelem* storage)
GRPC_MUST_USE_RESULT;
grpc_error* grpc_metadata_batch_link_head(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_metadata_batch_callouts_index idx) GRPC_MUST_USE_RESULT;
/** Add \a storage to the end of \a batch. storage->md is
assumed to be valid.
@ -93,6 +98,9 @@ grpc_error* grpc_metadata_batch_link_head(grpc_metadata_batch* batch,
grpc_error* grpc_metadata_batch_link_tail(grpc_metadata_batch* batch,
grpc_linked_mdelem* storage)
GRPC_MUST_USE_RESULT;
grpc_error* grpc_metadata_batch_link_tail(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_metadata_batch_callouts_index idx) GRPC_MUST_USE_RESULT;
/** Add \a elem_to_add as the first element in \a batch, using
\a storage as backing storage for the linked list element.
@ -104,6 +112,22 @@ grpc_error* grpc_metadata_batch_add_head(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_mdelem elem_to_add) GRPC_MUST_USE_RESULT;
// TODO(arjunroy, roth): Remove redundant methods.
// add/link_head/tail are almost identical.
inline grpc_error* GRPC_MUST_USE_RESULT grpc_metadata_batch_add_head(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_metadata_batch_callouts_index idx) {
return grpc_metadata_batch_link_head(batch, storage, idx);
}
inline grpc_error* GRPC_MUST_USE_RESULT grpc_metadata_batch_add_head(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_mdelem elem_to_add, grpc_metadata_batch_callouts_index idx) {
GPR_DEBUG_ASSERT(!GRPC_MDISNULL(elem_to_add));
storage->md = elem_to_add;
return grpc_metadata_batch_add_head(batch, storage, idx);
}
/** Add \a elem_to_add as the last element in \a batch, using
\a storage as backing storage for the linked list element.
\a storage is owned by the caller and must survive for the
@ -114,6 +138,20 @@ grpc_error* grpc_metadata_batch_add_tail(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_mdelem elem_to_add) GRPC_MUST_USE_RESULT;
inline grpc_error* GRPC_MUST_USE_RESULT grpc_metadata_batch_add_tail(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_metadata_batch_callouts_index idx) {
return grpc_metadata_batch_link_tail(batch, storage, idx);
}
inline grpc_error* GRPC_MUST_USE_RESULT grpc_metadata_batch_add_tail(
grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
grpc_mdelem elem_to_add, grpc_metadata_batch_callouts_index idx) {
GPR_DEBUG_ASSERT(!GRPC_MDISNULL(elem_to_add));
storage->md = elem_to_add;
return grpc_metadata_batch_add_tail(batch, storage, idx);
}
grpc_error* grpc_attach_md_to_error(grpc_error* src, grpc_mdelem md);
typedef struct {

@ -94,7 +94,8 @@ void CensusClientCallData::StartTransportStreamOpBatch(
op->send_initial_metadata()->batch(), &tracing_bin_,
grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_TRACE_BIN,
grpc_core::UnmanagedMemorySlice(tracing_buf_, tracing_len))));
grpc_core::UnmanagedMemorySlice(tracing_buf_, tracing_len)),
GRPC_BATCH_GRPC_TRACE_BIN));
}
grpc_slice tags = grpc_empty_slice();
// TODO: Add in tagging serialization.
@ -104,7 +105,8 @@ void CensusClientCallData::StartTransportStreamOpBatch(
"census grpc_filter",
grpc_metadata_batch_add_tail(
op->send_initial_metadata()->batch(), &stats_bin_,
grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags)));
grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags),
GRPC_BATCH_GRPC_TAGS_BIN));
}
}

@ -50,12 +50,12 @@ void FilterInitialMetadata(grpc_metadata_batch* b,
if (b->idx.named.grpc_trace_bin != nullptr) {
sml->tracing_slice =
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md));
grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin);
grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_TRACE_BIN);
}
if (b->idx.named.grpc_tags_bin != nullptr) {
sml->census_proto =
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md));
grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin);
grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_TAGS_BIN);
}
}
@ -155,7 +155,8 @@ void CensusServerCallData::StartTransportStreamOpBatch(
op->send_trailing_metadata()->batch(), &census_bin_,
grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_SERVER_STATS_BIN,
grpc_core::UnmanagedMemorySlice(stats_buf_, len))));
grpc_core::UnmanagedMemorySlice(stats_buf_, len)),
GRPC_BATCH_GRPC_SERVER_STATS_BIN));
}
}
// Call next op.

Loading…
Cancel
Save