Merge pull request #11626 from markdroth/consistent_failure_path

Clean up client_auth_filter and server_auth_filter.
pull/11719/head
Mark D. Roth 7 years ago committed by GitHub
commit d33ca70267
  1. 100
      src/core/lib/security/transport/client_auth_filter.c
  2. 143
      src/core/lib/security/transport/server_auth_filter.c

@ -49,7 +49,6 @@ typedef struct {
pollset_set so that work can progress when this call wants work to progress
*/
grpc_polling_entity *pollent;
grpc_transport_stream_op_batch op;
gpr_atm security_context_set;
gpr_mu security_context_mu;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
@ -92,11 +91,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
size_t num_md,
grpc_credentials_status status,
const char *error_details) {
grpc_call_element *elem = (grpc_call_element *)user_data;
grpc_transport_stream_op_batch *batch =
(grpc_transport_stream_op_batch *)user_data;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *op = &calld->op;
grpc_metadata_batch *mdb;
size_t i;
reset_auth_metadata_context(&calld->auth_md_context);
grpc_error *error = GRPC_ERROR_NONE;
if (status != GRPC_CREDENTIALS_OK) {
@ -108,9 +106,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED);
} else {
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
GPR_ASSERT(op->send_initial_metadata);
mdb = op->payload->send_initial_metadata.send_initial_metadata;
for (i = 0; i < num_md; i++) {
GPR_ASSERT(batch->send_initial_metadata);
grpc_metadata_batch *mdb =
batch->payload->send_initial_metadata.send_initial_metadata;
for (size_t i = 0; i < num_md; i++) {
add_error(&error,
grpc_metadata_batch_add_tail(
exec_ctx, mdb, &calld->md_links[i],
@ -120,9 +119,9 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
if (error == GRPC_ERROR_NONE) {
grpc_call_next_op(exec_ctx, elem, op);
grpc_call_next_op(exec_ctx, elem, batch);
} else {
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error);
}
}
@ -158,11 +157,11 @@ void build_auth_metadata_context(grpc_security_connector *sc,
static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_client_security_context *ctx =
(grpc_client_security_context *)op->payload
(grpc_client_security_context *)batch->payload
->context[GRPC_CONTEXT_SECURITY]
.value;
grpc_call_credentials *channel_call_creds =
@ -171,7 +170,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
if (channel_call_creds == NULL && !call_creds_has_md) {
/* Skip sending metadata altogether. */
grpc_call_next_op(exec_ctx, elem, op);
grpc_call_next_op(exec_ctx, elem, batch);
return;
}
@ -180,7 +179,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
ctx->creds, NULL);
if (calld->creds == NULL) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op,
exec_ctx, batch,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Incompatible credentials set on channel and call."),
@ -194,28 +193,29 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
calld->op = *op; /* Copy op (originates from the caller's stack). */
GPR_ASSERT(calld->pollent != NULL);
grpc_call_credentials_get_request_metadata(
exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
on_credentials_metadata, elem);
on_credentials_metadata, batch);
}
static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_security_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
grpc_transport_stream_op_batch *batch =
(grpc_transport_stream_op_batch *)user_data;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
if (status == GRPC_SECURITY_OK) {
send_security_metadata(exec_ctx, elem, &calld->op);
send_security_metadata(exec_ctx, elem, batch);
} else {
char *error_msg;
char *host = grpc_slice_to_c_string(calld->host);
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
host);
gpr_free(host);
grpc_call_element_signal_error(
exec_ctx, elem,
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAUTHENTICATED));
@ -223,35 +223,29 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
/* Called either:
- in response to an API call (or similar) from above, to send something
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("auth_start_transport_op", 0);
static void auth_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
GPR_TIMER_BEGIN("auth_start_transport_stream_op_batch", 0);
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_linked_mdelem *l;
grpc_client_security_context *sec_ctx = NULL;
if (!op->cancel_stream) {
if (!batch->cancel_stream) {
/* double checked lock over security context to ensure it's set once */
if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
gpr_mu_lock(&calld->security_context_mu);
if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
GPR_ASSERT(op->payload->context != NULL);
if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
op->payload->context[GRPC_CONTEXT_SECURITY].value =
GPR_ASSERT(batch->payload->context != NULL);
if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
batch->payload->context[GRPC_CONTEXT_SECURITY].value =
grpc_client_security_context_create();
op->payload->context[GRPC_CONTEXT_SECURITY].destroy =
batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_client_security_context_destroy;
}
sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value;
grpc_client_security_context *sec_ctx =
batch->payload->context[GRPC_CONTEXT_SECURITY].value;
GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
sec_ctx->auth_context =
GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
@ -261,9 +255,9 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
}
if (op->send_initial_metadata) {
for (l = op->payload->send_initial_metadata.send_initial_metadata->list
.head;
if (batch->send_initial_metadata) {
for (grpc_linked_mdelem *l = batch->payload->send_initial_metadata
.send_initial_metadata->list.head;
l != NULL; l = l->next) {
grpc_mdelem md = l->md;
/* Pointer comparison is OK for md_elems created from the same context.
@ -284,19 +278,19 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (calld->have_host) {
char *call_host = grpc_slice_to_c_string(calld->host);
calld->op = *op; /* Copy op (originates from the caller's stack). */
batch->handler_private.extra_arg = elem;
grpc_channel_security_connector_check_call_host(
exec_ctx, chand->security_connector, call_host, chand->auth_context,
on_host_checked, elem);
on_host_checked, batch);
gpr_free(call_host);
GPR_TIMER_END("auth_start_transport_op", 0);
GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
return; /* early exit */
}
}
/* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op);
GPR_TIMER_END("auth_start_transport_op", 0);
grpc_call_next_op(exec_ctx, elem, batch);
GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@ -379,7 +373,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
const grpc_channel_filter grpc_client_auth_filter = {
auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, set_pollset_or_pollset_set, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"};
auth_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
grpc_channel_next_get_info,
"client-auth"};

@ -27,14 +27,9 @@
#include "src/core/lib/slice/slice_internal.h"
typedef struct call_data {
grpc_metadata_batch *recv_initial_metadata;
/* Closure to call when finished with the auth_on_recv hook. */
grpc_closure *on_done_recv;
/* Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member after
handling it. */
grpc_closure auth_on_recv;
grpc_transport_stream_op_batch *transport_op;
grpc_transport_stream_op_batch *recv_initial_metadata_batch;
grpc_closure *original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
grpc_metadata_array md;
const grpc_metadata *consumed_md;
size_t num_consumed_md;
@ -90,125 +85,96 @@ static void on_md_processing_done(
grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
grpc_error *error = GRPC_ERROR_NONE;
if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
/* TODO(ctiller): propagate error */
GRPC_LOG_IF_ERROR(
"grpc_metadata_batch_filter",
grpc_metadata_batch_filter(&exec_ctx, calld->recv_initial_metadata,
remove_consumed_md, elem,
"Response metadata filtering error"));
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
GRPC_CLOSURE_SCHED(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE);
error = grpc_metadata_batch_filter(
&exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
remove_consumed_md, elem, "Response metadata filtering error");
} else {
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
error_details = error_details != NULL
? error_details
: "Authentication metadata processing failed.";
if (calld->transport_op->send_message) {
grpc_byte_stream_destroy(
&exec_ctx, calld->transport_op->payload->send_message.send_message);
calld->transport_op->payload->send_message.send_message = NULL;
if (error_details == NULL) {
error_details = "Authentication metadata processing failed.";
}
GRPC_CLOSURE_SCHED(
&exec_ctx, calld->on_done_recv,
error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
GRPC_ERROR_INT_GRPC_STATUS, status));
GRPC_ERROR_INT_GRPC_STATUS, status);
}
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready,
error);
grpc_exec_ctx_finish(&exec_ctx);
}
static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
if (error == GRPC_ERROR_NONE) {
if (chand->creds != NULL && chand->creds->processor.process != NULL) {
calld->md = metadata_batch_to_md_array(calld->recv_initial_metadata);
calld->md = metadata_batch_to_md_array(
batch->payload->recv_initial_metadata.recv_initial_metadata);
chand->creds->processor.process(
chand->creds->processor.state, calld->auth_context,
calld->md.metadata, calld->md.count, on_md_processing_done, elem);
return;
}
}
GRPC_CLOSURE_SCHED(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error));
GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
static void set_recv_ops_md_callbacks(grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
static void auth_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
calld->on_done_recv =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->auth_on_recv;
calld->transport_op = op;
if (batch->recv_initial_metadata) {
// Inject our callback.
calld->recv_initial_metadata_batch = batch;
calld->original_recv_initial_metadata_ready =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
}
/* Called either:
- in response to an API call (or similar) from above, to send something
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
set_recv_ops_md_callbacks(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
grpc_call_next_op(exec_ctx, elem, batch);
}
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_server_security_context *server_ctx = NULL;
/* initialize members */
memset(calld, 0, sizeof(*calld));
GRPC_CLOSURE_INIT(&calld->auth_on_recv, auth_on_recv, elem,
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
// Create server security context. Set its auth context from channel
// data and save it in the call context.
grpc_server_security_context *server_ctx =
grpc_server_security_context_create();
server_ctx->auth_context = grpc_auth_context_create(chand->auth_context);
calld->auth_context = server_ctx->auth_context;
if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) {
args->context[GRPC_CONTEXT_SECURITY].destroy(
args->context[GRPC_CONTEXT_SECURITY].value);
}
server_ctx = grpc_server_security_context_create();
server_ctx->auth_context = grpc_auth_context_create(chand->auth_context);
calld->auth_context = server_ctx->auth_context;
args->context[GRPC_CONTEXT_SECURITY].value = server_ctx;
args->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_server_security_context_destroy;
return GRPC_ERROR_NONE;
}
@ -221,19 +187,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
GPR_ASSERT(!args->is_last);
channel_data *chand = elem->channel_data;
grpc_auth_context *auth_context =
grpc_find_auth_context_in_args(args->channel_args);
grpc_server_credentials *creds =
grpc_find_server_credentials_in_args(args->channel_args);
/* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
GPR_ASSERT(!args->is_last);
GPR_ASSERT(auth_context != NULL);
/* initialize members */
chand->auth_context =
GRPC_AUTH_CONTEXT_REF(auth_context, "server_auth_filter");
grpc_server_credentials *creds =
grpc_find_server_credentials_in_args(args->channel_args);
chand->creds = grpc_server_credentials_ref(creds);
return GRPC_ERROR_NONE;
}
@ -241,14 +203,13 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
/* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "server_auth_filter");
grpc_server_credentials_unref(exec_ctx, chand->creds);
}
const grpc_channel_filter grpc_server_auth_filter = {
auth_start_transport_op,
auth_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,

Loading…
Cancel
Save