Merge pull request #18094 from markdroth/subchannel_call_context_fix

Don't use a separate call context for subchannel calls
pull/18247/head
Mark D. Roth 6 years ago committed by GitHub
commit 51e718283e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CMakeLists.txt
  2. 2
      Makefile
  3. 1
      gRPC-Core.podspec
  4. 2
      grpc.gyp
  5. 14
      src/core/ext/filters/client_channel/client_channel.cc
  6. 5
      src/core/ext/filters/client_channel/lb_policy.h
  7. 35
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
  8. 47
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  9. 6
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
  10. 10
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  11. 3
      src/core/lib/channel/context.h
  12. 70
      src/core/lib/security/transport/client_auth_filter.cc
  13. 99
      src/core/lib/transport/metadata.cc
  14. 8
      test/core/end2end/end2end_nosec_tests.cc
  15. 8
      test/core/end2end/end2end_tests.cc
  16. 1
      test/core/end2end/gen_build_yaml.py
  17. 1
      test/core/end2end/generate_tests.bzl
  18. 318
      test/core/end2end/tests/filter_context.cc
  19. 23
      test/core/transport/metadata_test.cc
  20. 3
      test/cpp/end2end/grpclb_end2end_test.cc
  21. 2
      tools/run_tests/generated/sources_and_headers.json
  22. 883
      tools/run_tests/generated/tests.json

@ -5616,6 +5616,7 @@ add_library(end2end_tests
test/core/end2end/tests/empty_batch.cc
test/core/end2end/tests/filter_call_init_fails.cc
test/core/end2end/tests/filter_causes_close.cc
test/core/end2end/tests/filter_context.cc
test/core/end2end/tests/filter_latency.cc
test/core/end2end/tests/filter_status_code.cc
test/core/end2end/tests/graceful_server_shutdown.cc
@ -5739,6 +5740,7 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/empty_batch.cc
test/core/end2end/tests/filter_call_init_fails.cc
test/core/end2end/tests/filter_causes_close.cc
test/core/end2end/tests/filter_context.cc
test/core/end2end/tests/filter_latency.cc
test/core/end2end/tests/filter_status_code.cc
test/core/end2end/tests/graceful_server_shutdown.cc

@ -10410,6 +10410,7 @@ LIBEND2END_TESTS_SRC = \
test/core/end2end/tests/empty_batch.cc \
test/core/end2end/tests/filter_call_init_fails.cc \
test/core/end2end/tests/filter_causes_close.cc \
test/core/end2end/tests/filter_context.cc \
test/core/end2end/tests/filter_latency.cc \
test/core/end2end/tests/filter_status_code.cc \
test/core/end2end/tests/graceful_server_shutdown.cc \
@ -10526,6 +10527,7 @@ LIBEND2END_NOSEC_TESTS_SRC = \
test/core/end2end/tests/empty_batch.cc \
test/core/end2end/tests/filter_call_init_fails.cc \
test/core/end2end/tests/filter_causes_close.cc \
test/core/end2end/tests/filter_context.cc \
test/core/end2end/tests/filter_latency.cc \
test/core/end2end/tests/filter_status_code.cc \
test/core/end2end/tests/graceful_server_shutdown.cc \

@ -1293,6 +1293,7 @@ Pod::Spec.new do |s|
'test/core/end2end/tests/empty_batch.cc',
'test/core/end2end/tests/filter_call_init_fails.cc',
'test/core/end2end/tests/filter_causes_close.cc',
'test/core/end2end/tests/filter_context.cc',
'test/core/end2end/tests/filter_latency.cc',
'test/core/end2end/tests/filter_status_code.cc',
'test/core/end2end/tests/graceful_server_shutdown.cc',

@ -2710,6 +2710,7 @@
'test/core/end2end/tests/empty_batch.cc',
'test/core/end2end/tests/filter_call_init_fails.cc',
'test/core/end2end/tests/filter_causes_close.cc',
'test/core/end2end/tests/filter_context.cc',
'test/core/end2end/tests/filter_latency.cc',
'test/core/end2end/tests/filter_status_code.cc',
'test/core/end2end/tests/graceful_server_shutdown.cc',
@ -2799,6 +2800,7 @@
'test/core/end2end/tests/empty_batch.cc',
'test/core/end2end/tests/filter_call_init_fails.cc',
'test/core/end2end/tests/filter_causes_close.cc',
'test/core/end2end/tests/filter_context.cc',
'test/core/end2end/tests/filter_latency.cc',
'test/core/end2end/tests/filter_status_code.cc',
'test/core/end2end/tests/graceful_server_shutdown.cc',

@ -694,6 +694,7 @@ struct call_data {
arena(args.arena),
owning_call(args.call_stack),
call_combiner(args.call_combiner),
call_context(args.context),
pending_send_initial_metadata(false),
pending_send_message(false),
pending_send_trailing_metadata(false),
@ -707,12 +708,6 @@ struct call_data {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
GPR_ASSERT(pending_batches[i].batch == nullptr);
}
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
if (pick.pick.subchannel_call_context[i].destroy != nullptr) {
pick.pick.subchannel_call_context[i].destroy(
pick.pick.subchannel_call_context[i].value);
}
}
}
// State for handling deadlines.
@ -729,6 +724,7 @@ struct call_data {
gpr_arena* arena;
grpc_call_stack* owning_call;
grpc_call_combiner* call_combiner;
grpc_call_context_element* call_context;
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
@ -2434,7 +2430,9 @@ static void create_subchannel_call(grpc_call_element* elem) {
calld->call_start_time, // start_time
calld->deadline, // deadline
calld->arena, // arena
calld->pick.pick.subchannel_call_context, // context
// TODO(roth): When we implement hedging support, we will probably
// need to use a separate call context for each subchannel call.
calld->call_context, // context
calld->call_combiner, // call_combiner
parent_data_size // parent_data_size
};
@ -2451,7 +2449,7 @@ static void create_subchannel_call(grpc_call_element* elem) {
} else {
if (parent_data_size > 0) {
new (calld->subchannel_call->GetParentData())
subchannel_call_retry_state(calld->pick.pick.subchannel_call_context);
subchannel_call_retry_state(calld->call_context);
}
pending_batches_resume(elem);
}

@ -73,11 +73,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
/// Will be populated with context to pass to the subchannel call, if
/// needed.
// TODO(roth): Remove this from the API, especially since it's not
// working properly anyway (see https://github.com/grpc/grpc/issues/15927).
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT] = {};
};
/// A picker is the object used to actual perform picks.

@ -37,17 +37,6 @@ static void destroy_channel_elem(grpc_channel_element* elem) {}
namespace {
struct call_data {
call_data(const grpc_call_element_args& args) {
if (args.context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
// Get stats object from context and take a ref.
client_stats = static_cast<grpc_core::GrpcLbClientStats*>(
args.context[GRPC_GRPCLB_CLIENT_STATS].value)
->Ref();
// Record call started.
client_stats->AddCallStarted();
}
}
// Stats object to update.
grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
// State for intercepting send_initial_metadata.
@ -82,7 +71,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
GPR_ASSERT(args->context != nullptr);
new (elem->call_data) call_data(*args);
new (elem->call_data) call_data();
return GRPC_ERROR_NONE;
}
@ -96,9 +85,6 @@ static void destroy_call_elem(grpc_call_element* elem,
calld->client_stats->AddCallFinished(
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
calld->recv_initial_metadata_succeeded /* known_received */);
// All done, so unref the stats object.
// TODO(roth): Eliminate this once filter stack is converted to C++.
calld->client_stats.reset();
}
calld->~call_data();
}
@ -107,15 +93,27 @@ static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
GPR_TIMER_SCOPE("clr_start_transport_stream_op_batch", 0);
if (calld->client_stats != nullptr) {
// Intercept send_initial_metadata.
// Handle send_initial_metadata.
if (batch->send_initial_metadata) {
// Grab client stats object from user_data for LB token metadata.
grpc_linked_mdelem* lb_token =
batch->payload->send_initial_metadata.send_initial_metadata->idx.named
.lb_token;
if (lb_token != nullptr) {
grpc_core::GrpcLbClientStats* client_stats =
static_cast<grpc_core::GrpcLbClientStats*>(grpc_mdelem_get_user_data(
lb_token->md, grpc_core::GrpcLbClientStats::Destroy));
if (client_stats != nullptr) {
calld->client_stats = client_stats->Ref();
// Intercept completion.
calld->original_on_complete_for_send = batch->on_complete;
GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send,
calld, grpc_schedule_on_exec_ctx);
batch->on_complete = &calld->on_complete_for_send;
}
// Intercept recv_initial_metadata.
}
}
// Intercept completion of recv_initial_metadata.
if (batch->recv_initial_metadata) {
calld->original_recv_initial_metadata_ready =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
@ -125,7 +123,6 @@ static void start_transport_stream_op_batch(
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
}
// Chain to next filter.
grpc_call_next_op(elem, batch);
}

@ -225,7 +225,8 @@ class GrpcLb : public LoadBalancingPolicy {
UniquePtr<char> AsText() const;
// Extracts all non-drop entries into a ServerAddressList.
ServerAddressList GetServerAddressList() const;
ServerAddressList GetServerAddressList(
GrpcLbClientStats* client_stats) const;
// Returns true if the serverlist contains at least one drop entry and
// no backend address entries.
@ -446,7 +447,8 @@ bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
}
// Returns addresses extracted from the serverlist.
ServerAddressList GrpcLb::Serverlist::GetServerAddressList() const {
ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
GrpcLbClientStats* client_stats) const {
ServerAddressList addresses;
for (size_t i = 0; i < serverlist_->num_servers; ++i) {
const grpc_grpclb_server* server = serverlist_->servers[i];
@ -464,6 +466,11 @@ ServerAddressList GrpcLb::Serverlist::GetServerAddressList() const {
grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
server->load_balance_token, lb_token_length);
lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
if (client_stats != nullptr) {
GPR_ASSERT(grpc_mdelem_set_user_data(
lb_token, GrpcLbClientStats::Destroy,
client_stats->Ref().release()) == client_stats);
}
} else {
char* uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
@ -504,22 +511,6 @@ const char* GrpcLb::Serverlist::ShouldDrop() {
// GrpcLb::Picker
//
// Adds lb_token of selected subchannel (address) to the call's initial
// metadata.
grpc_error* AddLbTokenToInitialMetadata(
grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
grpc_metadata_batch* initial_metadata) {
GPR_ASSERT(lb_token_mdelem_storage != nullptr);
GPR_ASSERT(!GRPC_MDISNULL(lb_token));
return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
lb_token);
}
// Destroy function used when embedding client stats in call context.
void DestroyClientStats(void* arg) {
static_cast<GrpcLbClientStats*>(arg)->Unref();
}
GrpcLb::Picker::PickResult GrpcLb::Picker::Pick(PickState* pick,
grpc_error** error) {
// Check if we should drop the call.
@ -550,15 +541,14 @@ GrpcLb::Picker::PickResult GrpcLb::Picker::Pick(PickState* pick,
abort();
}
grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(lb_token),
&pick->lb_token_mdelem_storage,
pick->initial_metadata);
// Pass on client stats via context. Passes ownership of the reference.
if (client_stats_ != nullptr) {
pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
client_stats_->Ref().release();
pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
DestroyClientStats;
GPR_ASSERT(!GRPC_MDISNULL(lb_token));
GPR_ASSERT(grpc_metadata_batch_add_tail(
pick->initial_metadata, &pick->lb_token_mdelem_storage,
GRPC_MDELEM_REF(lb_token)) == GRPC_ERROR_NONE);
GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(
grpc_mdelem_get_user_data(lb_token, GrpcLbClientStats::Destroy));
if (client_stats != nullptr) {
client_stats->AddCallStarted();
}
}
return result;
@ -1414,7 +1404,8 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
ServerAddressList* addresses = &tmp_addresses;
bool is_backend_from_grpclb_load_balancer = false;
if (serverlist_ != nullptr) {
tmp_addresses = serverlist_->GetServerAddressList();
tmp_addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true;
} else {
// If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't

@ -56,6 +56,12 @@ class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
int64_t* num_calls_finished_known_received,
UniquePtr<DroppedCallCounts>* drop_token_counts);
// A destruction function to use as the user_data key when attaching
// client stats to a grpc_mdelem.
static void Destroy(void* arg) {
static_cast<GrpcLbClientStats*>(arg)->Unref();
}
private:
// This field must only be accessed via *_locked() methods.
UniquePtr<DroppedCallCounts> drop_token_counts_;

@ -322,11 +322,6 @@ class XdsLb : public LoadBalancingPolicy {
// XdsLb::Picker
//
// Destroy function used when embedding client stats in call context.
void DestroyClientStats(void* arg) {
static_cast<XdsLbClientStats*>(arg)->Unref();
}
XdsLb::Picker::PickResult XdsLb::Picker::Pick(PickState* pick,
grpc_error** error) {
// TODO(roth): Add support for drop handling.
@ -335,10 +330,7 @@ XdsLb::Picker::PickResult XdsLb::Picker::Pick(PickState* pick,
// If pick succeeded, add client stats.
if (result == PickResult::PICK_COMPLETE &&
pick->connected_subchannel != nullptr && client_stats_ != nullptr) {
pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
client_stats_->Ref().release();
pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
DestroyClientStats;
// TODO(roth): Add support for client stats.
}
return result;
}

@ -35,9 +35,6 @@ typedef enum {
/// Reserved for traffic_class_context.
GRPC_CONTEXT_TRAFFIC,
/// Value is a \a grpc_grpclb_client_stats.
GRPC_GRPCLB_CLIENT_STATS,
GRPC_CONTEXT_COUNT
} grpc_context_index;

@ -41,12 +41,42 @@
#define MAX_CREDENTIALS_METADATA_COUNT 4
namespace {
/* We can have a per-channel credentials. */
struct channel_data {
channel_data(grpc_channel_security_connector* security_connector,
grpc_auth_context* auth_context)
: security_connector(
security_connector->Ref(DEBUG_LOCATION, "client_auth_filter")),
auth_context(auth_context->Ref(DEBUG_LOCATION, "client_auth_filter")) {}
~channel_data() {
security_connector.reset(DEBUG_LOCATION, "client_auth_filter");
auth_context.reset(DEBUG_LOCATION, "client_auth_filter");
}
grpc_core::RefCountedPtr<grpc_channel_security_connector> security_connector;
grpc_core::RefCountedPtr<grpc_auth_context> auth_context;
};
/* We can have a per-call credentials. */
struct call_data {
call_data(grpc_call_element* elem, const grpc_call_element_args& args)
: arena(args.arena),
owning_call(args.call_stack),
call_combiner(args.call_combiner) {}
: owning_call(args.call_stack), call_combiner(args.call_combiner) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
GPR_ASSERT(args.context != nullptr);
if (args.context[GRPC_CONTEXT_SECURITY].value == nullptr) {
args.context[GRPC_CONTEXT_SECURITY].value =
grpc_client_security_context_create(args.arena, /*creds=*/nullptr);
args.context[GRPC_CONTEXT_SECURITY].destroy =
grpc_client_security_context_destroy;
}
grpc_client_security_context* sec_ctx =
static_cast<grpc_client_security_context*>(
args.context[GRPC_CONTEXT_SECURITY].value);
sec_ctx->auth_context.reset(DEBUG_LOCATION, "client_auth_filter");
sec_ctx->auth_context =
chand->auth_context->Ref(DEBUG_LOCATION, "client_auth_filter");
}
// This method is technically the dtor of this class. However, since
// `get_request_metadata_cancel_closure` can run in parallel to
@ -61,7 +91,6 @@ struct call_data {
grpc_auth_metadata_context_reset(&auth_md_context);
}
gpr_arena* arena;
grpc_call_stack* owning_call;
grpc_call_combiner* call_combiner;
grpc_core::RefCountedPtr<grpc_call_credentials> creds;
@ -81,21 +110,6 @@ struct call_data {
grpc_closure get_request_metadata_cancel_closure;
};
/* We can have a per-channel credentials. */
struct channel_data {
channel_data(grpc_channel_security_connector* security_connector,
grpc_auth_context* auth_context)
: security_connector(
security_connector->Ref(DEBUG_LOCATION, "client_auth_filter")),
auth_context(auth_context->Ref(DEBUG_LOCATION, "client_auth_filter")) {}
~channel_data() {
security_connector.reset(DEBUG_LOCATION, "client_auth_filter");
auth_context.reset(DEBUG_LOCATION, "client_auth_filter");
}
grpc_core::RefCountedPtr<grpc_channel_security_connector> security_connector;
grpc_core::RefCountedPtr<grpc_auth_context> auth_context;
};
} // namespace
void grpc_auth_metadata_context_reset(
@ -307,24 +321,6 @@ static void auth_start_transport_stream_op_batch(
call_data* calld = static_cast<call_data*>(elem->call_data);
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
if (!batch->cancel_stream) {
// TODO(hcaseyal): move this to init_call_elem once issue #15927 is
// resolved.
GPR_ASSERT(batch->payload->context != nullptr);
if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == nullptr) {
batch->payload->context[GRPC_CONTEXT_SECURITY].value =
grpc_client_security_context_create(calld->arena, /*creds=*/nullptr);
batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_client_security_context_destroy;
}
grpc_client_security_context* sec_ctx =
static_cast<grpc_client_security_context*>(
batch->payload->context[GRPC_CONTEXT_SECURITY].value);
sec_ctx->auth_context.reset(DEBUG_LOCATION, "client_auth_filter");
sec_ctx->auth_context =
chand->auth_context->Ref(DEBUG_LOCATION, "client_auth_filter");
}
if (batch->send_initial_metadata) {
grpc_metadata_batch* metadata =
batch->payload->send_initial_metadata.send_initial_metadata;

@ -71,6 +71,12 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_metadata(false, "metadata");
typedef void (*destroy_user_data_func)(void* user_data);
struct UserData {
gpr_mu mu_user_data;
gpr_atm destroy_user_data;
gpr_atm user_data;
};
/* Shadow structure for grpc_mdelem_data for interned elements */
typedef struct interned_metadata {
/* must be byte compatible with grpc_mdelem_data */
@ -80,9 +86,7 @@ typedef struct interned_metadata {
/* private only data */
gpr_atm refcnt;
gpr_mu mu_user_data;
gpr_atm destroy_user_data;
gpr_atm user_data;
UserData user_data;
struct interned_metadata* bucket_next;
} interned_metadata;
@ -95,6 +99,8 @@ typedef struct allocated_metadata {
/* private only data */
gpr_atm refcnt;
UserData user_data;
} allocated_metadata;
typedef struct mdtab_shard {
@ -178,16 +184,17 @@ static void gc_mdtab(mdtab_shard* shard) {
for (i = 0; i < shard->capacity; i++) {
prev_next = &shard->elems[i];
for (md = shard->elems[i]; md; md = next) {
void* user_data = (void*)gpr_atm_no_barrier_load(&md->user_data);
void* user_data =
(void*)gpr_atm_no_barrier_load(&md->user_data.user_data);
next = md->bucket_next;
if (gpr_atm_acq_load(&md->refcnt) == 0) {
grpc_slice_unref_internal(md->key);
grpc_slice_unref_internal(md->value);
if (md->user_data) {
if (md->user_data.user_data) {
((destroy_user_data_func)gpr_atm_no_barrier_load(
&md->destroy_user_data))(user_data);
&md->user_data.destroy_user_data))(user_data);
}
gpr_mu_destroy(&md->mu_user_data);
gpr_mu_destroy(&md->user_data.mu_user_data);
gpr_free(md);
*prev_next = next;
num_freed++;
@ -251,6 +258,9 @@ grpc_mdelem grpc_mdelem_create(
allocated->key = grpc_slice_ref_internal(key);
allocated->value = grpc_slice_ref_internal(value);
gpr_atm_rel_store(&allocated->refcnt, 1);
allocated->user_data.user_data = 0;
allocated->user_data.destroy_user_data = 0;
gpr_mu_init(&allocated->user_data.mu_user_data);
#ifndef NDEBUG
if (grpc_trace_metadata.enabled()) {
char* key_str = grpc_slice_to_c_string(allocated->key);
@ -299,11 +309,11 @@ grpc_mdelem grpc_mdelem_create(
gpr_atm_rel_store(&md->refcnt, 1);
md->key = grpc_slice_ref_internal(key);
md->value = grpc_slice_ref_internal(value);
md->user_data = 0;
md->destroy_user_data = 0;
md->user_data.user_data = 0;
md->user_data.destroy_user_data = 0;
md->bucket_next = shard->elems[idx];
shard->elems[idx] = md;
gpr_mu_init(&md->mu_user_data);
gpr_mu_init(&md->user_data.mu_user_data);
#ifndef NDEBUG
if (grpc_trace_metadata.enabled()) {
char* key_str = grpc_slice_to_c_string(md->key);
@ -450,6 +460,13 @@ void grpc_mdelem_unref(grpc_mdelem gmd DEBUG_ARGS) {
if (1 == prev_refcount) {
grpc_slice_unref_internal(md->key);
grpc_slice_unref_internal(md->value);
if (md->user_data.user_data) {
destroy_user_data_func destroy_user_data =
(destroy_user_data_func)gpr_atm_no_barrier_load(
&md->user_data.destroy_user_data);
destroy_user_data((void*)md->user_data.user_data);
}
gpr_mu_destroy(&md->user_data.mu_user_data);
gpr_free(md);
}
break;
@ -457,58 +474,74 @@ void grpc_mdelem_unref(grpc_mdelem gmd DEBUG_ARGS) {
}
}
static void* get_user_data(UserData* user_data, void (*destroy_func)(void*)) {
if (gpr_atm_acq_load(&user_data->destroy_user_data) ==
(gpr_atm)destroy_func) {
return (void*)gpr_atm_no_barrier_load(&user_data->user_data);
} else {
return nullptr;
}
}
void* grpc_mdelem_get_user_data(grpc_mdelem md, void (*destroy_func)(void*)) {
switch (GRPC_MDELEM_STORAGE(md)) {
case GRPC_MDELEM_STORAGE_EXTERNAL:
case GRPC_MDELEM_STORAGE_ALLOCATED:
return nullptr;
case GRPC_MDELEM_STORAGE_STATIC:
return (void*)grpc_static_mdelem_user_data[GRPC_MDELEM_DATA(md) -
grpc_static_mdelem_table];
case GRPC_MDELEM_STORAGE_ALLOCATED: {
allocated_metadata* am =
reinterpret_cast<allocated_metadata*>(GRPC_MDELEM_DATA(md));
return get_user_data(&am->user_data, destroy_func);
}
case GRPC_MDELEM_STORAGE_INTERNED: {
interned_metadata* im =
reinterpret_cast<interned_metadata*> GRPC_MDELEM_DATA(md);
void* result;
if (gpr_atm_acq_load(&im->destroy_user_data) == (gpr_atm)destroy_func) {
return (void*)gpr_atm_no_barrier_load(&im->user_data);
} else {
return nullptr;
}
return result;
return get_user_data(&im->user_data, destroy_func);
}
}
GPR_UNREACHABLE_CODE(return nullptr);
}
static void* set_user_data(UserData* ud, void (*destroy_func)(void*),
void* user_data) {
GPR_ASSERT((user_data == nullptr) == (destroy_func == nullptr));
gpr_mu_lock(&ud->mu_user_data);
if (gpr_atm_no_barrier_load(&ud->destroy_user_data)) {
/* user data can only be set once */
gpr_mu_unlock(&ud->mu_user_data);
if (destroy_func != nullptr) {
destroy_func(user_data);
}
return (void*)gpr_atm_no_barrier_load(&ud->user_data);
}
gpr_atm_no_barrier_store(&ud->user_data, (gpr_atm)user_data);
gpr_atm_rel_store(&ud->destroy_user_data, (gpr_atm)destroy_func);
gpr_mu_unlock(&ud->mu_user_data);
return user_data;
}
void* grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void*),
void* user_data) {
switch (GRPC_MDELEM_STORAGE(md)) {
case GRPC_MDELEM_STORAGE_EXTERNAL:
case GRPC_MDELEM_STORAGE_ALLOCATED:
destroy_func(user_data);
return nullptr;
case GRPC_MDELEM_STORAGE_STATIC:
destroy_func(user_data);
return (void*)grpc_static_mdelem_user_data[GRPC_MDELEM_DATA(md) -
grpc_static_mdelem_table];
case GRPC_MDELEM_STORAGE_ALLOCATED: {
allocated_metadata* am =
reinterpret_cast<allocated_metadata*>(GRPC_MDELEM_DATA(md));
return set_user_data(&am->user_data, destroy_func, user_data);
}
case GRPC_MDELEM_STORAGE_INTERNED: {
interned_metadata* im =
reinterpret_cast<interned_metadata*> GRPC_MDELEM_DATA(md);
GPR_ASSERT(!is_mdelem_static(md));
GPR_ASSERT((user_data == nullptr) == (destroy_func == nullptr));
gpr_mu_lock(&im->mu_user_data);
if (gpr_atm_no_barrier_load(&im->destroy_user_data)) {
/* user data can only be set once */
gpr_mu_unlock(&im->mu_user_data);
if (destroy_func != nullptr) {
destroy_func(user_data);
}
return (void*)gpr_atm_no_barrier_load(&im->user_data);
}
gpr_atm_no_barrier_store(&im->user_data, (gpr_atm)user_data);
gpr_atm_rel_store(&im->destroy_user_data, (gpr_atm)destroy_func);
gpr_mu_unlock(&im->mu_user_data);
return user_data;
return set_user_data(&im->user_data, destroy_func, user_data);
}
}
GPR_UNREACHABLE_CODE(return nullptr);

@ -70,6 +70,8 @@ extern void filter_call_init_fails(grpc_end2end_test_config config);
extern void filter_call_init_fails_pre_init(void);
extern void filter_causes_close(grpc_end2end_test_config config);
extern void filter_causes_close_pre_init(void);
extern void filter_context(grpc_end2end_test_config config);
extern void filter_context_pre_init(void);
extern void filter_latency(grpc_end2end_test_config config);
extern void filter_latency_pre_init(void);
extern void filter_status_code(grpc_end2end_test_config config);
@ -207,6 +209,7 @@ void grpc_end2end_tests_pre_init(void) {
empty_batch_pre_init();
filter_call_init_fails_pre_init();
filter_causes_close_pre_init();
filter_context_pre_init();
filter_latency_pre_init();
filter_status_code_pre_init();
graceful_server_shutdown_pre_init();
@ -292,6 +295,7 @@ void grpc_end2end_tests(int argc, char **argv,
empty_batch(config);
filter_call_init_fails(config);
filter_causes_close(config);
filter_context(config);
filter_latency(config);
filter_status_code(config);
graceful_server_shutdown(config);
@ -432,6 +436,10 @@ void grpc_end2end_tests(int argc, char **argv,
filter_causes_close(config);
continue;
}
if (0 == strcmp("filter_context", argv[i])) {
filter_context(config);
continue;
}
if (0 == strcmp("filter_latency", argv[i])) {
filter_latency(config);
continue;

@ -72,6 +72,8 @@ extern void filter_call_init_fails(grpc_end2end_test_config config);
extern void filter_call_init_fails_pre_init(void);
extern void filter_causes_close(grpc_end2end_test_config config);
extern void filter_causes_close_pre_init(void);
extern void filter_context(grpc_end2end_test_config config);
extern void filter_context_pre_init(void);
extern void filter_latency(grpc_end2end_test_config config);
extern void filter_latency_pre_init(void);
extern void filter_status_code(grpc_end2end_test_config config);
@ -210,6 +212,7 @@ void grpc_end2end_tests_pre_init(void) {
empty_batch_pre_init();
filter_call_init_fails_pre_init();
filter_causes_close_pre_init();
filter_context_pre_init();
filter_latency_pre_init();
filter_status_code_pre_init();
graceful_server_shutdown_pre_init();
@ -296,6 +299,7 @@ void grpc_end2end_tests(int argc, char **argv,
empty_batch(config);
filter_call_init_fails(config);
filter_causes_close(config);
filter_context(config);
filter_latency(config);
filter_status_code(config);
graceful_server_shutdown(config);
@ -440,6 +444,10 @@ void grpc_end2end_tests(int argc, char **argv,
filter_causes_close(config);
continue;
}
if (0 == strcmp("filter_context", argv[i])) {
filter_context(config);
continue;
}
if (0 == strcmp("filter_latency", argv[i])) {
filter_latency(config);
continue;

@ -124,6 +124,7 @@ END2END_TESTS = {
'empty_batch': default_test_options._replace(cpu_cost=LOWCPU),
'filter_causes_close': default_test_options._replace(cpu_cost=LOWCPU),
'filter_call_init_fails': default_test_options,
'filter_context': default_test_options,
'filter_latency': default_test_options._replace(cpu_cost=LOWCPU),
'filter_status_code': default_test_options._replace(cpu_cost=LOWCPU),
'graceful_server_shutdown': default_test_options._replace(

@ -215,6 +215,7 @@ END2END_TESTS = {
"empty_batch": _test_options(),
"filter_causes_close": _test_options(),
"filter_call_init_fails": _test_options(),
"filter_context": _test_options(),
"graceful_server_shutdown": _test_options(exclude_inproc = True),
"hpack_size": _test_options(
proxyable = False,

@ -0,0 +1,318 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test/core/end2end/end2end_tests.h"
#include <limits.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_init.h"
#include "test/core/end2end/cq_verifier.h"
enum { TIMEOUT = 200000 };
static bool g_enable_filter = false;
static void* tag(intptr_t t) { return (void*)t; }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char* test_name,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_from_now(int n) {
return grpc_timeout_seconds_to_deadline(n);
}
static gpr_timespec five_seconds_from_now(void) {
return n_seconds_from_now(5);
}
static void drain_cq(grpc_completion_queue* cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture* f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = nullptr;
}
static void shutdown_client(grpc_end2end_test_fixture* f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = nullptr;
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
grpc_completion_queue_destroy(f->shutdown_cq);
}
// Simple request to test that filters see a consistent view of the
// call context.
static void test_request(grpc_end2end_test_config config) {
grpc_call* c;
grpc_call* s;
grpc_slice request_payload_slice =
grpc_slice_from_copied_string("hello world");
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_end2end_test_fixture f =
begin_test(config, "filter_context", nullptr, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_byte_buffer* request_payload_recv = nullptr;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;
gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/foo"), nullptr,
deadline, nullptr);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->data.send_initial_metadata.metadata = nullptr;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
cq_verify(cqv);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
grpc_slice status_string = grpc_slice_from_static_string("xyz");
op->data.send_status_from_server.status_details = &status_string;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_unref(s);
grpc_call_unref(c);
cq_verifier_destroy(cqv);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(request_payload_recv);
end_test(&f);
config.tear_down_data(&f);
}
/*******************************************************************************
* Test context filter
*/
struct call_data {
grpc_call_context_element* context;
};
static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->context = args->context;
gpr_log(GPR_INFO, "init_call_elem(): context=%p", args->context);
return GRPC_ERROR_NONE;
}
static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// If batch payload context is not null (which will happen in some
// cancellation cases), make sure we get the same context here that we
// saw in init_call_elem().
gpr_log(GPR_INFO, "start_transport_stream_op_batch(): context=%p",
batch->payload->context);
if (batch->payload->context != nullptr) {
GPR_ASSERT(calld->context == batch->payload->context);
}
grpc_call_next_op(elem, batch);
}
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {}
static grpc_error* init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
return GRPC_ERROR_NONE;
}
static void destroy_channel_elem(grpc_channel_element* elem) {}
static const grpc_channel_filter test_filter = {
start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0,
init_channel_elem,
destroy_channel_elem,
grpc_channel_next_get_info,
"filter_context"};
/*******************************************************************************
* Registration
*/
static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) {
grpc_channel_filter* filter = static_cast<grpc_channel_filter*>(arg);
if (g_enable_filter) {
// Want to add the filter as close to the end as possible, to make
// sure that all of the filters work well together. However, we
// can't add it at the very end, because the connected channel filter
// must be the last one. So we add it right before the last one.
grpc_channel_stack_builder_iterator* it =
grpc_channel_stack_builder_create_iterator_at_last(builder);
GPR_ASSERT(grpc_channel_stack_builder_move_prev(it));
const bool retval = grpc_channel_stack_builder_add_filter_before(
it, filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
return retval;
} else {
return true;
}
}
static void init_plugin(void) {
grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX,
maybe_add_filter, (void*)&test_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
maybe_add_filter, (void*)&test_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
maybe_add_filter, (void*)&test_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
maybe_add_filter, (void*)&test_filter);
}
static void destroy_plugin(void) {}
void filter_context(grpc_end2end_test_config config) {
g_enable_filter = true;
test_request(config);
g_enable_filter = false;
}
void filter_context_pre_init(void) {
grpc_register_plugin(init_plugin, destroy_plugin);
}

@ -289,6 +289,28 @@ static void test_user_data_works(void) {
grpc_shutdown();
}
static void test_user_data_works_for_allocated_md(void) {
int* ud1;
int* ud2;
grpc_mdelem md;
gpr_log(GPR_INFO, "test_user_data_works");
grpc_init();
grpc_core::ExecCtx exec_ctx;
ud1 = static_cast<int*>(gpr_malloc(sizeof(int)));
*ud1 = 1;
ud2 = static_cast<int*>(gpr_malloc(sizeof(int)));
*ud2 = 2;
md = grpc_mdelem_from_slices(grpc_slice_from_static_string("abc"),
grpc_slice_from_static_string("123"));
grpc_mdelem_set_user_data(md, gpr_free, ud1);
grpc_mdelem_set_user_data(md, gpr_free, ud2);
GPR_ASSERT(grpc_mdelem_get_user_data(md, gpr_free) == ud1);
GRPC_MDELEM_UNREF(md);
grpc_shutdown();
}
static void verify_ascii_header_size(const char* key, const char* value,
bool intern_key, bool intern_value) {
grpc_mdelem elem = grpc_mdelem_from_slices(
@ -386,6 +408,7 @@ int main(int argc, char** argv) {
test_create_many_persistant_metadata();
test_things_stick_around();
test_user_data_works();
test_user_data_works_for_allocated_md();
grpc_shutdown();
return 0;
}

@ -1483,6 +1483,9 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 3) {}
};
// TODO(roth): Add test that when switching balancers, we don't include
// any calls that were sent prior to connecting to the new balancer.
TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100;

@ -8869,6 +8869,7 @@
"test/core/end2end/tests/empty_batch.cc",
"test/core/end2end/tests/filter_call_init_fails.cc",
"test/core/end2end/tests/filter_causes_close.cc",
"test/core/end2end/tests/filter_context.cc",
"test/core/end2end/tests/filter_latency.cc",
"test/core/end2end/tests/filter_status_code.cc",
"test/core/end2end/tests/graceful_server_shutdown.cc",
@ -8967,6 +8968,7 @@
"test/core/end2end/tests/empty_batch.cc",
"test/core/end2end/tests/filter_call_init_fails.cc",
"test/core/end2end/tests/filter_causes_close.cc",
"test/core/end2end/tests/filter_context.cc",
"test/core/end2end/tests/filter_latency.cc",
"test/core/end2end/tests/filter_status_code.cc",
"test/core/end2end/tests/graceful_server_shutdown.cc",

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save