fixed failures in h2_proxy

pull/7333/head
David Garcia Quintas 9 years ago
parent b395c9e278
commit 6eee24cfaf
  1. 20
      src/core/ext/load_reporting/load_reporting.c
  2. 40
      src/core/ext/load_reporting/load_reporting.h
  3. 104
      src/core/ext/load_reporting/load_reporting_filter.c
  4. 3
      src/core/lib/channel/channel_stack.h
  5. 94
      test/core/end2end/tests/load_reporting_hook.c
  6. 2
      third_party/protobuf

@ -50,11 +50,11 @@ struct grpc_load_reporting_config {
grpc_load_reporting_config *grpc_load_reporting_config_create(
grpc_load_reporting_fn fn, void *user_data) {
GPR_ASSERT(fn != NULL);
grpc_load_reporting_config *lrc =
grpc_load_reporting_config *lr_config =
gpr_malloc(sizeof(grpc_load_reporting_config));
lrc->fn = fn;
lrc->user_data = user_data;
return lrc;
lr_config->fn = fn;
lr_config->user_data = user_data;
return lr_config;
}
grpc_load_reporting_config *grpc_load_reporting_config_copy(
@ -62,14 +62,14 @@ grpc_load_reporting_config *grpc_load_reporting_config_copy(
return grpc_load_reporting_config_create(src->fn, src->user_data);
}
void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lrc) {
gpr_free(lrc);
void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lr_config) {
gpr_free(lr_config);
}
void grpc_load_reporting_config_call(
grpc_load_reporting_config *lrc,
grpc_load_reporting_config *lr_config,
const grpc_load_reporting_call_data *call_data) {
lrc->fn(call_data, lrc->user_data);
lr_config->fn(call_data, lr_config->user_data);
}
static bool is_load_reporting_enabled(const grpc_channel_args *a) {
@ -110,11 +110,11 @@ static const grpc_arg_pointer_vtable lrd_ptr_vtable = {
lrd_arg_copy, lrd_arg_destroy, lrd_arg_cmp};
grpc_arg grpc_load_reporting_config_create_arg(
grpc_load_reporting_config *lrc) {
grpc_load_reporting_config *lr_config) {
grpc_arg arg;
arg.type = GRPC_ARG_POINTER;
arg.key = GRPC_ARG_ENABLE_LOAD_REPORTING;
arg.value.pointer.p = lrc;
arg.value.pointer.p = lr_config;
arg.value.pointer.vtable = &lrd_ptr_vtable;
return arg;
}

@ -37,8 +37,15 @@
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/channel/channel_stack.h"
/** Metadata key for initial metadata coming from clients */
#define GRPC_LOAD_REPORTING_INITIAL_MD_KEY "load-reporting-initial"
/** Metadata key for trailing metadata from servers */
#define GRPC_LOAD_REPORTING_TRAILING_MD_KEY "load-reporting-trailing"
typedef struct grpc_load_reporting_config grpc_load_reporting_config;
/** Identifiers for the invocation point of the users LR callback */
typedef enum grpc_load_reporting_source {
GRPC_LR_POINT_UNKNOWN = 0,
GRPC_LR_POINT_CHANNEL_CREATION,
@ -47,17 +54,31 @@ typedef enum grpc_load_reporting_source {
GRPC_LR_POINT_CALL_DESTRUCTION
} grpc_load_reporting_source;
/** Call information to be passed to the provided load reporting function upon
* completion of the call */
/** Call information to be passed to the provided LR callback. */
typedef struct grpc_load_reporting_call_data {
const grpc_load_reporting_source source;
const grpc_load_reporting_source source; /**< point of last data update. */
// XXX
intptr_t channel_id;
intptr_t call_id;
/** Only valid when \a source is \a GRPC_LR_POINT_CALL_DESTRUCTION, that is,
* once the call has completed */
const grpc_call_final_info *final_info;
const char *initial_md_string; /**< value string for LR's initial md key */
const char *trailing_md_string; /**< value string for LR's trailing md key */
const char *method; /**< Corresponds to :path header */
const char *method_name; /**< Corresponds to :path header */
} grpc_load_reporting_call_data;
/** Custom function to be called by the load reporting filter. */
/** Return a \a grpc_arg enabling load reporting */
grpc_arg grpc_load_reporting_config_create_arg(
grpc_load_reporting_config *lr_config);
/** Custom function to be called by the load reporting filter.
*
* \a call_data is provided by the runtime. \a user_data is given by the user
* as part of \a grpc_load_reporting_config_create */
typedef void (*grpc_load_reporting_fn)(
const grpc_load_reporting_call_data *call_data, void *user_data);
@ -73,14 +94,11 @@ grpc_load_reporting_config *grpc_load_reporting_config_create(
grpc_load_reporting_config *grpc_load_reporting_config_copy(
grpc_load_reporting_config *src);
void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lrc);
void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lr_config);
/** Invoke the function registered by \a grpc_load_reporting_init. */
/** Invoke the LR callback from \a lr_config with \a call_data */
void grpc_load_reporting_config_call(
grpc_load_reporting_config *lrc,
grpc_load_reporting_config *lr_config,
const grpc_load_reporting_call_data *call_data);
/** Return a \a grpc_arg enabling load reporting */
grpc_arg grpc_load_reporting_config_create_arg(grpc_load_reporting_config *lrc);
#endif /* GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H */

@ -31,6 +31,7 @@
*
*/
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
@ -43,37 +44,40 @@
#include "src/core/lib/transport/static_metadata.h"
typedef struct call_data {
const char *trailing_md_string;
const char *initial_md_string;
intptr_t id; /**< an id unique to the call */
char *trailing_md_string;
char *initial_md_string;
const char *service_method;
grpc_metadata_batch *recv_initial_metadata;
/* stores the recv_initial_metadata op's ready closure, which we wrap with our
* own (on_initial_md_ready) in order to capture the incoming initial metadata
* */
grpc_closure *ops_recv_initial_metadata_ready;
/* to get notified of the availability of the incoming initial metadata. */
grpc_closure on_initial_md_ready;
grpc_metadata_batch *recv_initial_metadata;
} call_data;
typedef struct channel_data {
gpr_mu mu;
grpc_load_reporting_config *lrc;
intptr_t id; /**< an id unique to the channel */
grpc_load_reporting_config *lr_config;
} channel_data;
static void invoke_lr_fn_locked(grpc_load_reporting_config *lrc,
static void invoke_lr_fn_locked(grpc_load_reporting_config *lr_config,
grpc_load_reporting_call_data *lr_call_data) {
GPR_TIMER_BEGIN("load_reporting_config_fn", 0);
grpc_load_reporting_config_call(lrc, lr_call_data);
grpc_load_reporting_config_call(lr_config, lr_call_data);
GPR_TIMER_END("load_reporting_config_fn", 0);
}
typedef struct {
grpc_call_element *elem;
grpc_exec_ctx *exec_ctx;
} server_filter_args;
} recv_md_filter_args;
static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) {
server_filter_args *a = user_data;
recv_md_filter_args *a = user_data;
grpc_call_element *elem = a->elem;
call_data *calld = elem->call_data;
@ -81,22 +85,22 @@ static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) {
calld->service_method = grpc_mdstr_as_c_string(md->value);
} else if (md->key == GRPC_MDSTR_LOAD_REPORTING_INITIAL) {
calld->initial_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
return NULL;
}
return md;
}
static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_error *err) {
grpc_error *err) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (err == GRPC_ERROR_NONE) {
server_filter_args a;
recv_md_filter_args a;
a.elem = elem;
a.exec_ctx = exec_ctx;
grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter, &a);
grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter,
&a);
if (calld->service_method == NULL) {
err =
grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header"));
@ -116,13 +120,17 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(call_data));
calld->id = (intptr_t)args->call_stack;
grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem);
grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION,
NULL, NULL, NULL, NULL};
gpr_mu_lock(&chand->mu);
invoke_lr_fn_locked(chand->lrc, &lr_call_data);
gpr_mu_unlock(&chand->mu);
(intptr_t)chand->id,
(intptr_t)calld->id,
NULL,
NULL,
NULL,
NULL};
invoke_lr_fn_locked(chand->lr_config, &lr_call_data);
}
/* Destructor for call_data */
@ -132,13 +140,18 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_load_reporting_call_data lr_call_data = {
GRPC_LR_POINT_CALL_DESTRUCTION, final_info, calld->initial_md_string,
calld->trailing_md_string, calld->service_method};
grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION,
(intptr_t)chand->id,
(intptr_t)calld->id,
final_info,
calld->initial_md_string,
calld->trailing_md_string,
calld->service_method};
gpr_mu_lock(&chand->mu);
invoke_lr_fn_locked(chand->lrc, &lr_call_data);
gpr_mu_unlock(&chand->mu);
invoke_lr_fn_locked(chand->lr_config, &lr_call_data);
gpr_free(calld->initial_md_string);
gpr_free(calld->trailing_md_string);
}
/* Constructor for channel_data */
@ -149,24 +162,28 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(channel_data));
gpr_mu_init(&chand->mu);
chand->id = (intptr_t)args->channel_stack;
for (size_t i = 0; i < args->channel_args->num_args; i++) {
if (0 == strcmp(args->channel_args->args[i].key,
GRPC_ARG_ENABLE_LOAD_REPORTING)) {
grpc_load_reporting_config *arg_lrc =
grpc_load_reporting_config *arg_lr_config =
args->channel_args->args[i].value.pointer.p;
chand->lrc = grpc_load_reporting_config_copy(arg_lrc);
GPR_ASSERT(chand->lrc != NULL);
chand->lr_config = grpc_load_reporting_config_copy(arg_lr_config);
GPR_ASSERT(chand->lr_config != NULL);
break;
}
}
GPR_ASSERT(chand->lrc != NULL); /* arg actually found */
GPR_ASSERT(chand->lr_config != NULL); /* arg actually found */
grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION,
NULL, NULL, NULL, NULL};
gpr_mu_lock(&chand->mu);
invoke_lr_fn_locked(chand->lrc, &lr_call_data);
gpr_mu_unlock(&chand->mu);
(intptr_t)chand,
0,
NULL,
NULL,
NULL,
NULL};
invoke_lr_fn_locked(chand->lr_config, &lr_call_data);
}
/* Destructor for channel data */
@ -174,10 +191,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
grpc_load_reporting_call_data lr_call_data = {
GRPC_LR_POINT_CHANNEL_DESTRUCTION, NULL, NULL, NULL, NULL};
invoke_lr_fn_locked(chand->lrc, &lr_call_data);
gpr_mu_destroy(&chand->mu);
grpc_load_reporting_config_destroy(chand->lrc);
GRPC_LR_POINT_CHANNEL_DESTRUCTION,
(intptr_t)chand->id,
0,
NULL,
NULL,
NULL,
NULL};
invoke_lr_fn_locked(chand->lr_config, &lr_call_data);
grpc_load_reporting_config_destroy(chand->lr_config);
}
static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) {
@ -186,7 +208,6 @@ static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) {
if (md->key == GRPC_MDSTR_LOAD_REPORTING_TRAILING) {
calld->trailing_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
return NULL;
}
return md;
@ -199,10 +220,9 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->ops_recv_initial_metadata_ready =
op->recv_initial_metadata_ready;
/* substitute our callback for the higher callback */
calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->on_initial_md_ready;
} else if (op->send_trailing_metadata) {
grpc_metadata_batch_filter(op->send_trailing_metadata,

@ -77,6 +77,7 @@ typedef struct {
gpr_timespec latency; /* From call creating to enqueing of received status */
} grpc_call_stats;
/** Information about the call upon completion. */
typedef struct {
grpc_call_stats stats;
grpc_status_code final_status;
@ -123,7 +124,7 @@ typedef struct {
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
\a and_free_memory that should be passed to gpr_free when destruction
is complete. \a final_info contains data about the completed code, mainly
is complete. \a final_info contains data about the completed call, mainly
for reporting purposes. */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info* final_info,

@ -45,50 +45,69 @@
#include "src/core/ext/load_reporting/load_reporting.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/transport/static_metadata.h"
enum { TIMEOUT = 200000 };
static void *tag(intptr_t t) { return (void *)t; }
typedef struct {
gpr_mu mu;
intptr_t channel_id;
intptr_t call_id;
uint32_t call_creation_token; /* expected 0xCAFED00D */
uint32_t call_destruction_token; /* expected 0xDEADD00D */
uint32_t channel_creation_token; /* expected 0xCAFEFACE */
uint32_t channel_destruction_token; /* expected 0xDEADFACE */
char *initial_md_str;
char *trailing_md_str;
char *method_name;
uint64_t total_bytes;
uint64_t incoming_bytes;
uint64_t outgoing_bytes;
bool fully_processed;
} aggregated_bw_stats;
static void sample_fn(const grpc_load_reporting_call_data *call_data,
void *user_data) {
GPR_ASSERT(user_data != NULL);
aggregated_bw_stats *custom_stats = (aggregated_bw_stats *)user_data;
gpr_mu_lock(&custom_stats->mu);
switch (call_data->source) {
case GRPC_LR_POINT_CHANNEL_CREATION:
custom_stats->channel_creation_token = 0xCAFEFACE;
custom_stats->channel_id = call_data->channel_id;
break;
case GRPC_LR_POINT_CHANNEL_DESTRUCTION:
custom_stats->channel_destruction_token = 0xDEADFACE;
break;
case GRPC_LR_POINT_CALL_CREATION:
custom_stats->call_creation_token = 0xCAFED00D;
custom_stats->call_id = call_data->call_id;
break;
case GRPC_LR_POINT_CALL_DESTRUCTION:
custom_stats->method_name = gpr_strdup(call_data->method);
custom_stats->initial_md_str = gpr_strdup(call_data->initial_md_string);
custom_stats->trailing_md_str = gpr_strdup(call_data->trailing_md_string);
custom_stats->method_name = gpr_strdup(call_data->method_name);
custom_stats->call_destruction_token = 0xDEADD00D;
custom_stats->total_bytes =
call_data->final_info->stats.transport_stream_stats.outgoing
.data_bytes +
custom_stats->incoming_bytes =
call_data->final_info->stats.transport_stream_stats.incoming
.data_bytes;
custom_stats->outgoing_bytes =
call_data->final_info->stats.transport_stream_stats.outgoing
.data_bytes;
custom_stats->fully_processed = true;
break;
default:
abort();
}
gpr_mu_unlock(&custom_stats->mu);
}
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
@ -146,7 +165,9 @@ static void end_test(grpc_end2end_test_fixture *f) {
static void request_response_with_payload(grpc_end2end_test_fixture f,
const char *method_name,
const char *request_msg,
const char *response_msg) {
const char *response_msg,
grpc_metadata *initial_lr_metadata,
grpc_metadata *trailing_lr_metadata) {
gpr_slice request_payload_slice = gpr_slice_from_copied_string(request_msg);
gpr_slice response_payload_slice = gpr_slice_from_copied_string(response_msg);
grpc_call *c;
@ -184,7 +205,9 @@ static void request_response_with_payload(grpc_end2end_test_fixture f,
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
GPR_ASSERT(initial_lr_metadata != NULL);
op->data.send_initial_metadata.count = 1;
op->data.send_initial_metadata.metadata = initial_lr_metadata;
op->flags = 0;
op->reserved = NULL;
op++;
@ -256,7 +279,9 @@ static void request_response_with_payload(grpc_end2end_test_fixture f,
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
GPR_ASSERT(trailing_lr_metadata != NULL);
op->data.send_status_from_server.trailing_metadata_count = 1;
op->data.send_status_from_server.trailing_metadata = trailing_lr_metadata;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op->flags = 0;
@ -292,6 +317,7 @@ static void test_load_reporting_hook(grpc_end2end_test_config config) {
aggregated_bw_stats *aggr_stats_server =
gpr_malloc(sizeof(aggregated_bw_stats));
memset(aggr_stats_server, 0, sizeof(aggregated_bw_stats));
gpr_mu_init(&aggr_stats_server->mu);
grpc_load_reporting_config *server_lrc =
grpc_load_reporting_config_create(sample_fn, aggr_stats_server);
@ -305,27 +331,55 @@ static void test_load_reporting_hook(grpc_end2end_test_config config) {
begin_test(config, "test_load_reporting_hook", NULL, lr_server_args);
const char *method_name = "/gRPCFTW";
const char *request_msg = "so long!";
const char *response_msg = "I'm back!";
request_response_with_payload(f, method_name, request_msg, response_msg);
const char *request_msg = "the msg from the client";
const char *response_msg = "... and the response from the server";
grpc_metadata initial_lr_metadata;
grpc_metadata trailing_lr_metadata;
initial_lr_metadata.key = GRPC_LOAD_REPORTING_INITIAL_MD_KEY;
initial_lr_metadata.value = "client-token";
initial_lr_metadata.value_length = strlen(initial_lr_metadata.value);
memset(&initial_lr_metadata.internal_data, 0,
sizeof(initial_lr_metadata.internal_data));
trailing_lr_metadata.key = GRPC_LOAD_REPORTING_TRAILING_MD_KEY;
trailing_lr_metadata.value = "server-token";
trailing_lr_metadata.value_length = strlen(trailing_lr_metadata.value);
memset(&trailing_lr_metadata.internal_data, 0,
sizeof(trailing_lr_metadata.internal_data));
request_response_with_payload(f, method_name, request_msg, response_msg,
&initial_lr_metadata, &trailing_lr_metadata);
end_test(&f);
grpc_channel_args_destroy(lr_server_args);
config.tear_down_data(&f);
if (aggr_stats_server->fully_processed) {
GPR_ASSERT(aggr_stats_server->total_bytes ==
5 + strlen(request_msg) + strlen(response_msg));
GPR_ASSERT(aggr_stats_server->fully_processed);
GPR_ASSERT(aggr_stats_server->incoming_bytes ==
/* 5 FIXME */ /* compression bit(1) + msg length(4) */ +strlen(
request_msg));
GPR_ASSERT(aggr_stats_server->outgoing_bytes ==
5 /* compression bit(1) + msg length(4) */ + strlen(response_msg));
GPR_ASSERT(aggr_stats_server->channel_creation_token == 0xCAFEFACE);
GPR_ASSERT(aggr_stats_server->channel_destruction_token == 0xDEADFACE);
GPR_ASSERT(aggr_stats_server->call_id > 0);
GPR_ASSERT(aggr_stats_server->channel_id > 0);
GPR_ASSERT(aggr_stats_server->call_creation_token == 0xCAFED00D);
GPR_ASSERT(aggr_stats_server->call_destruction_token == 0xDEADD00D);
GPR_ASSERT(aggr_stats_server->channel_creation_token == 0xCAFEFACE);
GPR_ASSERT(aggr_stats_server->channel_destruction_token == 0xDEADFACE);
GPR_ASSERT(strcmp(aggr_stats_server->method_name, "/gRPCFTW") == 0);
}
GPR_ASSERT(aggr_stats_server->call_creation_token == 0xCAFED00D);
GPR_ASSERT(aggr_stats_server->call_destruction_token == 0xDEADD00D);
GPR_ASSERT(strcmp(aggr_stats_server->method_name, "/gRPCFTW") == 0);
GPR_ASSERT(aggr_stats_server->initial_md_str != NULL);
GPR_ASSERT(aggr_stats_server->trailing_md_str != NULL);
GPR_ASSERT(strcmp(aggr_stats_server->initial_md_str, "client-token") == 0);
GPR_ASSERT(strcmp(aggr_stats_server->trailing_md_str, "server-token") == 0);
gpr_free(aggr_stats_server->method_name);
gpr_mu_destroy(&aggr_stats_server->mu);
gpr_free(aggr_stats_server);
grpc_load_reporting_config_destroy(server_lrc);
}

@ -1 +1 @@
Subproject commit bdeb215cab2985195325fcd5e70c3fa751f46e0f
Subproject commit d4d13a4349e4e59d67f311185ddcc1890d956d7a
Loading…
Cancel
Save